Files
nordvpn/openvpn_manager.py
2024-11-03 19:44:08 -06:00

111 lines
4.4 KiB
Python

import subprocess
import time
import datetime
import re
import sys
def establish_vpn_connection(ovpn_file, credential_file, log_file, debug_print=None, timeout=15): # Changed timeout to 15
"""
Establishes a VPN connection using the provided OVPN file and credentials.
Returns True if the connection is successful, False otherwise.
"""
# Read credentials
with open(credential_file, 'r') as f:
username = f.readline().strip()
password = f.readline().strip()
# Establish VPN connection (capture stdout and stderr)
openvpn_command = ["/usr/sbin/openvpn", "--config", ovpn_file,
"--auth-user-pass", credential_file, "--verb", "1"]
if debug_print:
debug_print(f"OpenVPN command: {' '.join(openvpn_command)}") # Debug print
sys.stdout.flush() # Flush the output buffer
process = subprocess.Popen(openvpn_command,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
# Send the password to the process
process.stdin.write((password + '\n').encode())
process.stdin.flush()
start_time = time.time()
while True:
# Check if the process has exited
if process.poll() is not None:
debug_print(f"OpenVPN exited with a non-zero return code: {process.returncode}")
return False, process.stderr.read().decode()
try:
line = process.stdout.readline().decode()
except UnicodeDecodeError:
print(f"Decoding error: {line.strip()}") # Log the problematic line
continue # Skip to the next line
if line:
if debug_print:
debug_print(f"OpenVPN output: {line.strip()}")
if "Initialization Sequence Completed" in line:
debug_print("VPN Initialization Sequence Completed found in stdout.")
#print(f"{datetime.datetime.now()} [OpenVPN Manger]: VPN Initialization Sequence Completed found in stdout.")
return True, line
# Check for AUTH_FAILED message
if "AUTH: Received control message: AUTH_FAILED" in line:
debug_print("Authentication failed. Sleeping for 5 seconds.")
#print(f"{datetime.datetime.now()} [OpenVPN Manger]: AUTH: Received control message: AUTH_FAILED, sleeping for 15 seconds")
time.sleep(5)
return False, "AUTH: Received control message: AUTH_FAILED"
break
# Check for timeout
if time.time() - start_time > timeout:
debug_print(f"VPN connection timed out after {timeout} seconds.")
# print(f"{datetime.datetime.now()} [OpenVPN Manger]: VPN connection timed out after {timeout} seconds.")
process.kill()
return False, "VPN connection timed out."
time.sleep(0.1) # Small delay to avoid busy-waiting
def is_vpn_active(ovpn_filename):
"""
Checks if the VPN connection associated with the given OVPN filename is active.
Returns True if active, False otherwise.
"""
try:
subprocess.run(["pgrep", "-f", ovpn_filename], check=True, capture_output=True)
return True
except subprocess.CalledProcessError:
return False
def disconnect_vpn():
"""
Disconnects the active VPN connection.
"""
try:
result = subprocess.run(["pgrep", "openvpn"], check=True, capture_output=True)
pids = result.stdout.decode().strip().split('\n')
for pid in pids:
subprocess.run(["kill", pid])
except subprocess.CalledProcessError:
pass # No openvpn process found
def get_external_ip():
"""
Gets the external IP address using various services, with retries.
Returns the external IP if successful, or None if unable to get it.
"""
for service in ["ifconfig.me", "ifconfig.co", "ipinfo.io/ip"]:
try:
external_ip = subprocess.check_output(["curl", "-s", service]).decode().strip()
if re.match(r"^[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}$", external_ip):
return external_ip
else:
print(f"{datetime.datetime.now()} [Main Script]: Invalid response from {service}")
except subprocess.CalledProcessError:
print(f"{datetime.datetime.now()} [Main Script]: Error getting external IP from {service}")
return None # Unable to get external IP