refactor nord-checker to remove file and sql operations.
This commit is contained in:
73
openvpn_manager.py
Normal file
73
openvpn_manager.py
Normal file
@@ -0,0 +1,73 @@
|
||||
import subprocess
|
||||
import time
|
||||
import datetime
|
||||
import re
|
||||
|
||||
def establish_vpn_connection(ovpn_file, credential_file, log_file):
|
||||
"""
|
||||
Establishes a VPN connection using the provided OVPN file and credentials.
|
||||
Returns True if the connection is successful, False otherwise.
|
||||
"""
|
||||
|
||||
# Read credentials
|
||||
with open(credential_file, 'r') as f:
|
||||
username = f.readline().strip()
|
||||
password = f.readline().strip()
|
||||
|
||||
# Establish VPN connection (capture stdout and stderr)
|
||||
openvpn_command = ["openvpn", "--config", ovpn_file,
|
||||
"--auth-user-pass", credential_file, "--daemon",
|
||||
"--log-append", log_file, "--verb", "3"]
|
||||
|
||||
process = subprocess.Popen(openvpn_command,
|
||||
stdin=subprocess.PIPE,
|
||||
stdout=subprocess.PIPE,
|
||||
stderr=subprocess.PIPE)
|
||||
stdout, stderr = process.communicate(input=password.encode())
|
||||
|
||||
# Check for errors
|
||||
if process.returncode != 0:
|
||||
return False, stderr.decode()
|
||||
else:
|
||||
return True, stdout.decode()
|
||||
|
||||
def is_vpn_active(ovpn_filename):
|
||||
"""
|
||||
Checks if the VPN connection associated with the given OVPN filename is active.
|
||||
Returns True if active, False otherwise.
|
||||
"""
|
||||
try:
|
||||
subprocess.run(["pgrep", "-f", ovpn_filename], check=True, capture_output=True)
|
||||
return True
|
||||
except subprocess.CalledProcessError:
|
||||
return False
|
||||
|
||||
def disconnect_vpn():
|
||||
"""
|
||||
Disconnects the active VPN connection.
|
||||
"""
|
||||
try:
|
||||
result = subprocess.run(["pgrep", "openvpn"], check=True, capture_output=True)
|
||||
pids = result.stdout.decode().strip().split('\n')
|
||||
for pid in pids:
|
||||
subprocess.run(["kill", pid])
|
||||
except subprocess.CalledProcessError:
|
||||
pass # No openvpn process found
|
||||
|
||||
def get_external_ip():
|
||||
"""
|
||||
Gets the external IP address using various services, with retries.
|
||||
Returns the external IP if successful, or None if unable to get it.
|
||||
"""
|
||||
|
||||
for service in ["ifconfig.me", "ifconfig.co", "ipinfo.io/ip"]:
|
||||
try:
|
||||
external_ip = subprocess.check_output(["curl", "-s", service]).decode().strip()
|
||||
if re.match(r"^[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}$", external_ip):
|
||||
return external_ip
|
||||
else:
|
||||
print(f"{datetime.datetime.now()} [Main Script]: Invalid response from {service}")
|
||||
except subprocess.CalledProcessError:
|
||||
print(f"{datetime.datetime.now()} [Main Script]: Error getting external IP from {service}")
|
||||
|
||||
return None # Unable to get external IP
|
||||
Reference in New Issue
Block a user