import subprocess import time import datetime import re import sys def establish_vpn_connection(ovpn_file, credential_file, log_file, debug_print=None, timeout=15): # Changed timeout to 15 """ Establishes a VPN connection using the provided OVPN file and credentials. Returns True if the connection is successful, False otherwise. """ # Read credentials with open(credential_file, 'r') as f: username = f.readline().strip() password = f.readline().strip() # Establish VPN connection (capture stdout and stderr) openvpn_command = ["/usr/sbin/openvpn", "--config", ovpn_file, "--auth-user-pass", credential_file, "--verb", "1"] if debug_print: debug_print(f"OpenVPN command: {' '.join(openvpn_command)}") # Debug print sys.stdout.flush() # Flush the output buffer process = subprocess.Popen(openvpn_command, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE) # Send the password to the process process.stdin.write((password + '\n').encode()) process.stdin.flush() start_time = time.time() while True: # Check if the process has exited if process.poll() is not None: debug_print(f"OpenVPN exited with a non-zero return code: {process.returncode}") return False, process.stderr.read().decode() try: line = process.stdout.readline().decode() except UnicodeDecodeError: print(f"Decoding error: {line.strip()}") # Log the problematic line continue # Skip to the next line if line: if debug_print: debug_print(f"OpenVPN output: {line.strip()}") if "Initialization Sequence Completed" in line: debug_print("VPN Initialization Sequence Completed found in stdout.") #print(f"{datetime.datetime.now()} [OpenVPN Manger]: VPN Initialization Sequence Completed found in stdout.") return True, line # Check for AUTH_FAILED message if "AUTH: Received control message: AUTH_FAILED" in line: debug_print("Authentication failed. Sleeping for 15 seconds.") #print(f"{datetime.datetime.now()} [OpenVPN Manger]: AUTH: Received control message: AUTH_FAILED, sleeping for 15 seconds") time.sleep(15) return False, "AUTH: Received control message: AUTH_FAILED" break # Check for timeout if time.time() - start_time > timeout: debug_print(f"VPN connection timed out after {timeout} seconds.") # print(f"{datetime.datetime.now()} [OpenVPN Manger]: VPN connection timed out after {timeout} seconds.") process.kill() return False, "VPN connection timed out." time.sleep(0.1) # Small delay to avoid busy-waiting def is_vpn_active(ovpn_filename): """ Checks if the VPN connection associated with the given OVPN filename is active. Returns True if active, False otherwise. """ try: subprocess.run(["pgrep", "-f", ovpn_filename], check=True, capture_output=True) return True except subprocess.CalledProcessError: return False def disconnect_vpn(): """ Disconnects the active VPN connection. """ try: result = subprocess.run(["pgrep", "openvpn"], check=True, capture_output=True) pids = result.stdout.decode().strip().split('\n') for pid in pids: subprocess.run(["kill", pid]) except subprocess.CalledProcessError: pass # No openvpn process found def get_external_ip(): """ Gets the external IP address using various services, with retries. Returns the external IP if successful, or None if unable to get it. """ for service in ["ifconfig.me", "ifconfig.co", "ipinfo.io/ip"]: try: external_ip = subprocess.check_output(["curl", "-s", service]).decode().strip() if re.match(r"^[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}$", external_ip): return external_ip else: print(f"{datetime.datetime.now()} [Main Script]: Invalid response from {service}") except subprocess.CalledProcessError: print(f"{datetime.datetime.now()} [Main Script]: Error getting external IP from {service}") return None # Unable to get external IP