Files
zerogravity/scripts/parse-snapshot.py

476 lines
18 KiB
Python

#!/usr/bin/env python3
"""
Parse Go GODEBUG=http2debug=2 output into a clean, readable snapshot.
Usage:
python3 parse-snapshot.py < raw-http2-dump.log
python3 parse-snapshot.py /path/to/logfile
"""
import sys
import re
import json
import gzip
from collections import defaultdict
from io import BytesIO
# ── Colors ────────────────────────────────────────────────────────────────────
BOLD = "\033[1m"
DIM = "\033[2m"
RED = "\033[91m"
GREEN = "\033[92m"
YELLOW = "\033[93m"
CYAN = "\033[96m"
MAGENTA = "\033[95m"
NC = "\033[0m"
# ── Regexes ───────────────────────────────────────────────────────────────────
RE_ENCODING_HEADER = re.compile(
r'http2: Transport encoding header "([^"]+)" = "([^"]*)"'
)
RE_DECODED_HEADER = re.compile(
r'http2: decoded hpack field header field "([^"]+)" = "([^"]*)"'
)
RE_SERVER_ENCODING = re.compile(
r'http2: server encoding header "([^"]+)" = "([^"]*)"'
)
RE_WROTE_DATA = re.compile(
r'http2: Framer [^:]+: wrote DATA flags=(\S+) stream=(\d+) len=(\d+) data="(.*?)"'
)
RE_READ_DATA = re.compile(
r'http2: Framer [^:]+: read DATA flags=(\S+) stream=(\d+) len=(\d+) data="(.*?)"'
)
RE_TRANSPORT_CONN = re.compile(
r'http2: Transport creating client conn [^ ]+ to (.+)'
)
RE_SERVER_READ_DATA = re.compile(
r'http2: server read frame DATA flags=(\S+) stream=(\d+) len=(\d+) data="(.*?)"'
)
RE_WROTE_HEADERS = re.compile(
r'http2: Framer [^:]+: wrote HEADERS flags=(\S+) stream=(\d+)'
)
RE_TIMESTAMP = re.compile(r'^(\d{4}/\d{2}/\d{2} \d{2}:\d{2}:\d{2})')
RE_LS_LOG = re.compile(r'^[IWE]\d{4} ')
RE_MAXPROCS = re.compile(r'^.*maxprocs:')
RE_BYTES_OMITTED = re.compile(r'\((\d+) bytes omitted\)$')
# Known domain purposes
DOMAIN_INFO = {
"antigravity-unleash.goog": ("Feature Flags", "Unleash SDK — controls A/B tests, feature rollouts"),
"daily-cloudcode-pa.googleapis.com": ("LLM API (gRPC)", "Primary Gemini/Claude API endpoint"),
"cloudcode-pa.googleapis.com": ("LLM API (gRPC)", "Production Gemini/Claude API endpoint"),
"api.anthropic.com": ("Claude API", "Direct Anthropic API calls"),
"lh3.googleusercontent.com": ("Profile Picture", "User avatar image"),
"play.googleapis.com": ("Telemetry", "Google Play telemetry/logging"),
"firebaseinstallations.googleapis.com": ("Firebase", "Firebase installation tracking"),
"oauth2.googleapis.com": ("OAuth", "Token refresh/exchange"),
"speech.googleapis.com": ("Speech", "Voice input processing"),
"modelarmor.googleapis.com": ("Safety", "Content safety/filtering"),
}
class Request:
def __init__(self):
self.method = ""
self.path = ""
self.authority = ""
self.scheme = ""
self.headers = {}
self.data = b""
self.data_len = 0
self.stream_id = None
self.timestamp = ""
self.direction = "outgoing" # outgoing = LS→upstream, incoming = LS←upstream
class Snapshot:
def __init__(self):
self.connections = [] # (timestamp, target)
self.requests = [] # list of Request
self.responses = defaultdict(lambda: {"headers": {}, "data": b"", "data_len": 0})
self.ls_logs = []
def parse(self, lines):
current_headers = {}
current_direction = "outgoing"
current_stream = None
for line in lines:
line = line.rstrip()
# Skip empty
if not line:
continue
# LS process logs
if RE_LS_LOG.match(line) or RE_MAXPROCS.match(line):
self.ls_logs.append(line)
continue
# New connection
m = RE_TRANSPORT_CONN.search(line)
if m:
ts = ""
ts_m = RE_TIMESTAMP.match(line)
if ts_m:
ts = ts_m.group(1)
self.connections.append((ts, m.group(1)))
continue
# Outgoing headers (Transport encoding = LS sending to upstream)
m = RE_ENCODING_HEADER.search(line)
if m:
key, val = m.group(1), m.group(2)
if key == ":method":
# New request starting
if current_headers.get(":path"):
self._finalize_request(current_headers, "outgoing", line)
current_headers = {}
current_direction = "outgoing"
current_headers[key] = val
ts_m = RE_TIMESTAMP.match(line)
if ts_m and "timestamp" not in current_headers:
current_headers["timestamp"] = ts_m.group(1)
continue
# Incoming headers (decoded hpack = upstream responding, OR server receiving)
m = RE_DECODED_HEADER.search(line)
if m:
key, val = m.group(1), m.group(2)
if key == ":authority" and "server read frame" not in line:
# This is a request received by our LS
if current_headers.get(":path"):
self._finalize_request(current_headers, current_direction, line)
current_headers = {}
current_direction = "incoming"
current_headers[key] = val
continue
# Server encoding (our LS responding)
m = RE_SERVER_ENCODING.search(line)
if m:
continue # Skip server response headers for now
# Headers frame written (triggers finalization)
m = RE_WROTE_HEADERS.search(line)
if m:
current_stream = m.group(2)
if current_headers.get(":path") or current_headers.get(":method"):
req = self._finalize_request(current_headers, current_direction, line)
if req:
req.stream_id = current_stream
current_headers = {}
continue
# Data frames (wrote = LS sending, read = LS receiving)
for pattern, direction in [
(RE_WROTE_DATA, "sent"),
(RE_READ_DATA, "received"),
(RE_SERVER_READ_DATA, "server_received"),
]:
m = pattern.search(line)
if m:
flags, stream, length, data_str = (
m.group(1),
m.group(2),
int(m.group(3)),
m.group(4),
)
# Find matching request by stream
for req in reversed(self.requests):
if req.stream_id == stream:
raw = self._decode_data_str(data_str, line)
if direction == "sent" or direction == "server_received":
req.data += raw
req.data_len = max(req.data_len, length)
break
# Also check omitted bytes
om = RE_BYTES_OMITTED.search(line)
if om:
pass # length already captured
break
# Finalize any remaining headers
if current_headers.get(":path") or current_headers.get(":method"):
self._finalize_request(current_headers, current_direction, "")
def _finalize_request(self, headers, direction, _line):
req = Request()
req.method = headers.pop(":method", "GET")
req.path = headers.pop(":path", "/")
req.authority = headers.pop(":authority", "")
req.scheme = headers.pop(":scheme", "https")
req.timestamp = headers.pop("timestamp", "")
req.direction = direction
req.headers = {k: v for k, v in headers.items() if not k.startswith(":")}
self.requests.append(req)
return req
def _decode_data_str(self, s, full_line):
"""Decode escaped string from GODEBUG output back to bytes."""
try:
# Handle Go's escaped bytes
result = bytearray()
i = 0
while i < len(s):
if s[i] == "\\" and i + 1 < len(s):
if s[i + 1] == "x" and i + 3 < len(s):
result.append(int(s[i + 2 : i + 4], 16))
i += 4
elif s[i + 1] == "n":
result.append(10)
i += 2
elif s[i + 1] == "r":
result.append(13)
i += 2
elif s[i + 1] == "t":
result.append(9)
i += 2
elif s[i + 1] == "\\":
result.append(92)
i += 2
elif s[i + 1] == '"':
result.append(34)
i += 2
else:
result.append(ord(s[i]))
i += 1
else:
result.append(ord(s[i]))
i += 1
return bytes(result)
except Exception:
return s.encode("utf-8", errors="replace")
def render(self):
out = []
# Header
out.append(f"\n{BOLD}{CYAN}{'' * 70}{NC}")
out.append(f"{BOLD}{CYAN} STANDALONE LS TRAFFIC SNAPSHOT{NC}")
out.append(f"{BOLD}{CYAN}{'' * 70}{NC}\n")
# LS Logs
if self.ls_logs:
out.append(f"{BOLD}▸ Language Server Logs{NC}")
out.append(f"{DIM}{'' * 60}{NC}")
for log in self.ls_logs:
out.append(f" {DIM}{log}{NC}")
out.append("")
# Connections
if self.connections:
out.append(f"{BOLD}▸ Outbound Connections{NC}")
out.append(f"{DIM}{'' * 60}{NC}")
for ts, target in self.connections:
domain = target.split(":")[0] if ":" in target else target
info = DOMAIN_INFO.get(domain, ("Unknown", ""))
out.append(
f" {GREEN}{NC} {BOLD}{target}{NC} {DIM}({info[0]}){NC}"
)
if info[1]:
out.append(f" {DIM}{info[1]}{NC}")
out.append("")
# Group requests by domain
by_domain = defaultdict(list)
for req in self.requests:
by_domain[req.authority].append(req)
# Render each domain's requests
for domain, reqs in by_domain.items():
if domain.startswith("127.0.0.1"):
label = "Local (our requests to LS)"
color = DIM
else:
info = DOMAIN_INFO.get(domain, ("External", ""))
label = info[0]
color = YELLOW if "API" in info[0] else CYAN
out.append(f"{BOLD}{'' * 70}{NC}")
out.append(f"{BOLD}{color} {domain}{NC} {DIM}{label}{NC}")
out.append(f"{BOLD}{'' * 70}{NC}")
for i, req in enumerate(reqs):
arrow = "" if req.direction == "outgoing" else ""
method_color = GREEN if req.method == "GET" else YELLOW
out.append(f"\n {BOLD}{arrow} {method_color}{req.method}{NC} {req.path}")
# Important headers
interesting = [
"authorization",
"content-type",
"user-agent",
"unleash-appname",
"unleash-instanceid",
"unleash-sdk",
"x-goog-api-key",
"x-goog-api-client",
"grpc-encoding",
"te",
]
shown = False
for key in interesting:
if key in req.headers:
val = req.headers[key]
# Mask tokens partially
if key == "authorization" and len(val) > 30:
if val.startswith("Bearer "):
val = f"Bearer {val[7:20]}...{val[-10:]}"
elif len(val) > 40:
val = f"{val[:30]}...{val[-10:]}"
out.append(f" {DIM}{key}:{NC} {val}")
shown = True
# All other headers (collapsed)
other = {
k: v
for k, v in req.headers.items()
if k not in interesting and not k.startswith(":")
}
if other:
if not shown:
out.append(f" {DIM}Headers:{NC}")
for k, v in other.items():
out.append(f" {DIM}{k}:{NC} {v}")
# Body
if req.data:
out.append(self._render_body(req.data, req.data_len))
out.append("")
return "\n".join(out)
def _render_body(self, data, total_len):
"""Render body data in the most readable format possible."""
lines = []
# Try JSON
try:
text = data.decode("utf-8")
obj = json.loads(text)
pretty = json.dumps(obj, indent=2, ensure_ascii=False)
lines.append(f" {BOLD}Body ({len(data)} bytes, JSON):{NC}")
for l in pretty.split("\n")[:30]:
lines.append(f" {GREEN}{l}{NC}")
if len(pretty.split("\n")) > 30:
lines.append(f" {DIM}... ({len(pretty.split(chr(10))) - 30} more lines){NC}")
return "\n".join(lines)
except (json.JSONDecodeError, UnicodeDecodeError):
pass
# Try gzip
if data[:2] == b"\x1f\x8b":
try:
decompressed = gzip.decompress(data)
try:
text = decompressed.decode("utf-8")
try:
obj = json.loads(text)
pretty = json.dumps(obj, indent=2, ensure_ascii=False)
lines.append(
f" {BOLD}Body ({len(data)} bytes gzip → {len(decompressed)} bytes, JSON):{NC}"
)
for l in pretty.split("\n")[:50]:
lines.append(f" {GREEN}{l}{NC}")
if len(pretty.split("\n")) > 50:
lines.append(
f" {DIM}... ({len(pretty.split(chr(10))) - 50} more lines){NC}"
)
return "\n".join(lines)
except json.JSONDecodeError:
lines.append(
f" {BOLD}Body ({len(data)} bytes gzip → {len(decompressed)} bytes, text):{NC}"
)
for l in text.split("\n")[:20]:
lines.append(f" {l[:200]}")
return "\n".join(lines)
except UnicodeDecodeError:
lines.append(
f" {BOLD}Body ({len(data)} bytes gzip → {len(decompressed)} bytes, binary):{NC}"
)
lines.append(f" {DIM}{self._extract_strings(decompressed)}{NC}")
return "\n".join(lines)
except Exception:
pass
# Try protobuf (extract readable strings)
if data[:1] in (b"\x08", b"\x0a", b"\x10", b"\x12", b"\x18", b"\x1a", b"\x20", b"\x22"):
strings = self._extract_strings(data)
if strings:
lines.append(f" {BOLD}Body ({total_len} bytes, protobuf):{NC}")
lines.append(f" {DIM}Extracted strings:{NC}")
for s in strings.split(" | ")[:20]:
s = s.strip()
if len(s) > 3:
lines.append(f" {MAGENTA}{s}{NC}")
return "\n".join(lines)
# Try plain text
try:
text = data.decode("utf-8")
lines.append(f" {BOLD}Body ({len(data)} bytes, text):{NC}")
for l in text.split("\n")[:10]:
lines.append(f" {l[:200]}")
return "\n".join(lines)
except UnicodeDecodeError:
pass
# PNG
if data[:4] == b"\x89PNG":
lines.append(f" {BOLD}Body ({total_len} bytes, PNG image){NC}")
return "\n".join(lines)
# Binary fallback
lines.append(f" {BOLD}Body ({total_len} bytes, binary):{NC}")
strings = self._extract_strings(data)
if strings:
lines.append(f" {DIM}Extracted strings:{NC}")
for s in strings.split(" | ")[:15]:
s = s.strip()
if len(s) > 3:
lines.append(f" {MAGENTA}{s}{NC}")
else:
lines.append(f" {DIM}(no readable strings){NC}")
return "\n".join(lines)
def _extract_strings(self, data, min_len=4):
"""Extract printable ASCII strings from binary data."""
strings = []
current = bytearray()
for b in data:
if 32 <= b <= 126:
current.append(b)
else:
if len(current) >= min_len:
strings.append(current.decode("ascii"))
current = bytearray()
if len(current) >= min_len:
strings.append(current.decode("ascii"))
# Deduplicate while preserving order
seen = set()
unique = []
for s in strings:
if s not in seen:
seen.add(s)
unique.append(s)
return " | ".join(unique[:30])
def main():
if len(sys.argv) > 1:
with open(sys.argv[1]) as f:
lines = f.readlines()
else:
lines = sys.stdin.readlines()
snap = Snapshot()
snap.parse(lines)
print(snap.render())
if __name__ == "__main__":
main()