88 lines
3.0 KiB
Python
88 lines
3.0 KiB
Python
import requests
|
|
from collections import defaultdict
|
|
from collections import Counter
|
|
from datetime import datetime
|
|
import os
|
|
import subprocess
|
|
|
|
def get_vpn_list():
|
|
"""Fetches and parses the VPN list from the ProtonVPN API.
|
|
|
|
Returns:
|
|
A list of dictionaries, where each dictionary represents a VPN server.
|
|
"""
|
|
|
|
url = "https://api.protonmail.ch/vpn/logicals"
|
|
response = requests.get(url)
|
|
response.raise_for_status() # Raise an exception for bad status codes
|
|
|
|
data = response.json()
|
|
return data["LogicalServers"]
|
|
|
|
def group_by_exit_ip_range(servers):
|
|
"""Groups servers by their /24 exit IP range and associated country.
|
|
|
|
Args:
|
|
servers: A list of server dictionaries.
|
|
|
|
Returns:
|
|
A dictionary where keys are /24 exit IP ranges and values are tuples of
|
|
(country_code, list of server names) within that range.
|
|
"""
|
|
|
|
ip_range_to_servers = defaultdict(lambda: (None, [])) # Default: (None, [])
|
|
for server in servers:
|
|
for entry in server['Servers']:
|
|
exit_ip = entry['ExitIP']
|
|
ip_range = ".".join(exit_ip.split(".")[:3]) + ".0/24"
|
|
country_code, server_names = ip_range_to_servers[ip_range]
|
|
if country_code is None: # First time seeing this IP range
|
|
country_code = server['ExitCountry']
|
|
server_names.append(server['Name'])
|
|
ip_range_to_servers[ip_range] = (country_code, server_names)
|
|
return ip_range_to_servers
|
|
|
|
if __name__ == "__main__":
|
|
vpn_list = get_vpn_list()
|
|
ip_range_groups = group_by_exit_ip_range(vpn_list)
|
|
|
|
# Set output directory and create it if it doesn't exist
|
|
output_dir = "output"
|
|
os.makedirs(output_dir, exist_ok=True)
|
|
|
|
# Define output file path
|
|
output_file = os.path.join(output_dir, "rbldnsd-proton.db")
|
|
|
|
# Write to rbldnsd-compliant file
|
|
with open(output_file, "w") as f:
|
|
f.write(":127.0.0.4:Listed, see https://protonvpn.com\n") # Header
|
|
# Iterate through IP ranges in sorted order
|
|
for ip_range in sorted(ip_range_groups.keys()):
|
|
country_code, server_names = ip_range_groups[ip_range]
|
|
f.write(f"{ip_range} ; ProtonVPN {country_code}\n")
|
|
|
|
|
|
# Calculate and print stats
|
|
total_ranges = len(ip_range_groups)
|
|
ranges_per_country = Counter(country_code for country_code, _ in ip_range_groups.values())
|
|
|
|
print(f"Total IP Ranges: {total_ranges}")
|
|
print("IP Ranges per Country:")
|
|
for country_code, count in ranges_per_country.items():
|
|
print(f" {country_code}: {count}")
|
|
|
|
# Git commands
|
|
try:
|
|
subprocess.run(["git", "add", output_file], check=True)
|
|
subprocess.run(["git", "commit", "-m", "Update ProtonVPN IP ranges"], check=True)
|
|
subprocess.run(["git", "push", "origin"], check=True)
|
|
print("Changes pushed to git repository.")
|
|
except subprocess.CalledProcessError as e:
|
|
print(f"Error running git commands: {e}")
|
|
|
|
# ping health system
|
|
try:
|
|
requests.get("https://health.ext.ben.io/ping/24296528-ffbf-4219-8ef5-07aa88aef414", auth=("local", "local"), timeout=10)
|
|
except requests.RequestException as e:
|
|
# Log ping failure here...
|
|
print("Ping failed: %s" % e) |