update change checker to leverage diffs
This commit is contained in:
@@ -1,42 +1,127 @@
|
||||
import requests
|
||||
import datetime
|
||||
import re
|
||||
|
||||
# Changedetection.io details
|
||||
base_url = "http://change.ext.ben.io"
|
||||
auth_cookie = "6abdf17c692daf27a79c9d23a63c7d96"
|
||||
base_url = "http://change.local.ben.io:5000"
|
||||
api_key = "de64270b89aa65ed388d54960ef19948"
|
||||
|
||||
# Headers for the requests
|
||||
headers = {
|
||||
"Cookie": f"sessionid={auth_cookie}"
|
||||
"x-api-key": api_key
|
||||
}
|
||||
|
||||
# Function to get system information
|
||||
def get_system_info():
|
||||
url = f"{base_url}/api/v1/system/info"
|
||||
print(f"Testing URL: {url}")
|
||||
url = f"{base_url}/api/v1/systeminfo"
|
||||
print(f"Testing URL: {url}")
|
||||
response = requests.get(url, headers=headers)
|
||||
|
||||
if response.status_code == 200:
|
||||
system_info = response.json()
|
||||
print("System Information:")
|
||||
print(system_info)
|
||||
print(system_info)
|
||||
else:
|
||||
print(f"Error getting system information: {response.status_code}")
|
||||
print(f"Full response: {response}")
|
||||
|
||||
# Function to list watch IDs
|
||||
def list_watch_ids():
|
||||
url = f"{base_url}/api/v1/watches"
|
||||
# Function to find the watch ID for "Nord VPN List"
|
||||
def find_nordvpn_list_watch_id():
|
||||
url = f"{base_url}/api/v1/watch"
|
||||
print(f"Testing URL: {url}")
|
||||
response = requests.get(url, headers=headers)
|
||||
|
||||
if response.status_code == 200:
|
||||
watches = response.json()
|
||||
watch_ids = [watch['uuid'] for watch in watches]
|
||||
print("Available Watch IDs:")
|
||||
print(watch_ids)
|
||||
|
||||
nordvpn_list_watch_id = None
|
||||
for watch_id, watch_data in watches.items():
|
||||
if watch_data.get('title') == "Nord VPN List":
|
||||
nordvpn_list_watch_id = watch_id
|
||||
break
|
||||
|
||||
if nordvpn_list_watch_id:
|
||||
print(f"Found Nord VPN List watch with ID: {nordvpn_list_watch_id}")
|
||||
return nordvpn_list_watch_id
|
||||
else:
|
||||
print("Nord VPN List watch not found.")
|
||||
return None
|
||||
|
||||
else:
|
||||
print(f"Error listing watch IDs: {response.status_code}")
|
||||
print(f"Full response: {response}")
|
||||
return None
|
||||
|
||||
def list_historical_snapshots(watch_id):
|
||||
url = f"{base_url}/api/v1/watch/{watch_id}/history"
|
||||
print(f"Testing URL: {url}")
|
||||
response = requests.get(url, headers=headers)
|
||||
|
||||
if response.status_code == 200:
|
||||
snapshots = response.json()
|
||||
|
||||
total_snapshots = len(snapshots)
|
||||
print(f"Total Historical Snapshots: {total_snapshots}")
|
||||
|
||||
if snapshots:
|
||||
last_2_timestamps = get_last_two_timestamps(snapshots)
|
||||
snapshot_1, snapshot_2 = fetch_snapshot_details(watch_id, last_2_timestamps)
|
||||
new_servers = process_snapshots(snapshot_1, snapshot_2)
|
||||
print_results(new_servers)
|
||||
else:
|
||||
print(f"No historical snapshots found for Watch ID {watch_id}")
|
||||
|
||||
else:
|
||||
print(f"Error listing historical snapshots: {response.status_code}")
|
||||
print(f"Full response: {response}")
|
||||
|
||||
def get_last_two_timestamps(snapshots):
|
||||
return sorted(snapshots.keys(), reverse=True)[:2]
|
||||
|
||||
def fetch_snapshot_details(watch_id, timestamps):
|
||||
snapshot_1 = get_snapshot_details(watch_id, timestamps[0])
|
||||
print(f"Fetching snapshot details for timestamp {timestamps[0]}")
|
||||
snapshot_2 = get_snapshot_details(watch_id, timestamps[1])
|
||||
print(f"Fetching snapshot details for timestamp {timestamps[1]}")
|
||||
return snapshot_1, snapshot_2
|
||||
|
||||
def process_snapshots(snapshot_1, snapshot_2):
|
||||
new_lines = [line for line in snapshot_1.splitlines() if line not in snapshot_2.splitlines()]
|
||||
pattern = r"\*?\s*([a-z0-9-]+\.nordvpn\.com)"
|
||||
formatted_new_lines = []
|
||||
for line in new_lines:
|
||||
match = re.search(pattern, line)
|
||||
if match:
|
||||
formatted_new_lines.append(match.group(1) + ".tcp.ovpn")
|
||||
return formatted_new_lines
|
||||
|
||||
def print_results(new_servers):
|
||||
print(f"\nTotal number of changed lines: {len(new_servers)}")
|
||||
if new_servers:
|
||||
print("\nNew servers found:")
|
||||
for server in new_servers:
|
||||
print(server)
|
||||
else:
|
||||
print("\nNo new servers found.")
|
||||
|
||||
def get_snapshot_details(watch_id, timestamp):
|
||||
url = f"{base_url}/api/v1/watch/{watch_id}/history/{timestamp}"
|
||||
print(f"Testing URL: {url}")
|
||||
response = requests.get(url, headers=headers)
|
||||
|
||||
if response.status_code == 200:
|
||||
snapshot_details = response.text
|
||||
return snapshot_details
|
||||
|
||||
else:
|
||||
print(f"Error getting snapshot details: {response.status_code}")
|
||||
print(f"Full response: {response}")
|
||||
|
||||
|
||||
# Main execution
|
||||
if __name__ == "__main__":
|
||||
get_system_info()
|
||||
list_watch_ids()
|
||||
|
||||
nordvpn_list_watch_id = find_nordvpn_list_watch_id()
|
||||
if nordvpn_list_watch_id:
|
||||
list_historical_snapshots(nordvpn_list_watch_id)
|
||||
Reference in New Issue
Block a user