#!/usr/bin/env python3 """ Parse Go GODEBUG=http2debug=2 output into a clean, readable snapshot. Usage: python3 parse-snapshot.py < raw-http2-dump.log python3 parse-snapshot.py /path/to/logfile """ import sys import re import json import gzip from collections import defaultdict from io import BytesIO # ── Colors ──────────────────────────────────────────────────────────────────── BOLD = "\033[1m" DIM = "\033[2m" RED = "\033[91m" GREEN = "\033[92m" YELLOW = "\033[93m" CYAN = "\033[96m" MAGENTA = "\033[95m" NC = "\033[0m" # ── Regexes ─────────────────────────────────────────────────────────────────── RE_ENCODING_HEADER = re.compile( r'http2: Transport encoding header "([^"]+)" = "([^"]*)"' ) RE_DECODED_HEADER = re.compile( r'http2: decoded hpack field header field "([^"]+)" = "([^"]*)"' ) RE_SERVER_ENCODING = re.compile( r'http2: server encoding header "([^"]+)" = "([^"]*)"' ) RE_WROTE_DATA = re.compile( r'http2: Framer [^:]+: wrote DATA flags=(\S+) stream=(\d+) len=(\d+) data="(.*?)"' ) RE_READ_DATA = re.compile( r'http2: Framer [^:]+: read DATA flags=(\S+) stream=(\d+) len=(\d+) data="(.*?)"' ) RE_TRANSPORT_CONN = re.compile( r'http2: Transport creating client conn [^ ]+ to (.+)' ) RE_SERVER_READ_DATA = re.compile( r'http2: server read frame DATA flags=(\S+) stream=(\d+) len=(\d+) data="(.*?)"' ) RE_WROTE_HEADERS = re.compile( r'http2: Framer [^:]+: wrote HEADERS flags=(\S+) stream=(\d+)' ) RE_TIMESTAMP = re.compile(r'^(\d{4}/\d{2}/\d{2} \d{2}:\d{2}:\d{2})') RE_LS_LOG = re.compile(r'^[IWE]\d{4} ') RE_MAXPROCS = re.compile(r'^.*maxprocs:') RE_BYTES_OMITTED = re.compile(r'\((\d+) bytes omitted\)$') # Known domain purposes DOMAIN_INFO = { "antigravity-unleash.goog": ("Feature Flags", "Unleash SDK — controls A/B tests, feature rollouts"), "daily-cloudcode-pa.googleapis.com": ("LLM API (gRPC)", "Primary Gemini/Claude API endpoint"), "cloudcode-pa.googleapis.com": ("LLM API (gRPC)", "Production Gemini/Claude API endpoint"), "api.anthropic.com": ("Claude API", "Direct Anthropic API calls"), "lh3.googleusercontent.com": ("Profile Picture", "User avatar image"), "play.googleapis.com": ("Telemetry", "Google Play telemetry/logging"), "firebaseinstallations.googleapis.com": ("Firebase", "Firebase installation tracking"), "oauth2.googleapis.com": ("OAuth", "Token refresh/exchange"), "speech.googleapis.com": ("Speech", "Voice input processing"), "modelarmor.googleapis.com": ("Safety", "Content safety/filtering"), } class Request: def __init__(self): self.method = "" self.path = "" self.authority = "" self.scheme = "" self.headers = {} self.data = b"" self.data_len = 0 self.stream_id = None self.timestamp = "" self.direction = "outgoing" # outgoing = LS→upstream, incoming = LS←upstream class Snapshot: def __init__(self): self.connections = [] # (timestamp, target) self.requests = [] # list of Request self.responses = defaultdict(lambda: {"headers": {}, "data": b"", "data_len": 0}) self.ls_logs = [] def parse(self, lines): current_headers = {} current_direction = "outgoing" current_stream = None for line in lines: line = line.rstrip() # Skip empty if not line: continue # LS process logs if RE_LS_LOG.match(line) or RE_MAXPROCS.match(line): self.ls_logs.append(line) continue # New connection m = RE_TRANSPORT_CONN.search(line) if m: ts = "" ts_m = RE_TIMESTAMP.match(line) if ts_m: ts = ts_m.group(1) self.connections.append((ts, m.group(1))) continue # Outgoing headers (Transport encoding = LS sending to upstream) m = RE_ENCODING_HEADER.search(line) if m: key, val = m.group(1), m.group(2) if key == ":method": # New request starting if current_headers.get(":path"): self._finalize_request(current_headers, "outgoing", line) current_headers = {} current_direction = "outgoing" current_headers[key] = val ts_m = RE_TIMESTAMP.match(line) if ts_m and "timestamp" not in current_headers: current_headers["timestamp"] = ts_m.group(1) continue # Incoming headers (decoded hpack = upstream responding, OR server receiving) m = RE_DECODED_HEADER.search(line) if m: key, val = m.group(1), m.group(2) if key == ":authority" and "server read frame" not in line: # This is a request received by our LS if current_headers.get(":path"): self._finalize_request(current_headers, current_direction, line) current_headers = {} current_direction = "incoming" current_headers[key] = val continue # Server encoding (our LS responding) m = RE_SERVER_ENCODING.search(line) if m: continue # Skip server response headers for now # Headers frame written (triggers finalization) m = RE_WROTE_HEADERS.search(line) if m: current_stream = m.group(2) if current_headers.get(":path") or current_headers.get(":method"): req = self._finalize_request(current_headers, current_direction, line) if req: req.stream_id = current_stream current_headers = {} continue # Data frames (wrote = LS sending, read = LS receiving) for pattern, direction in [ (RE_WROTE_DATA, "sent"), (RE_READ_DATA, "received"), (RE_SERVER_READ_DATA, "server_received"), ]: m = pattern.search(line) if m: flags, stream, length, data_str = ( m.group(1), m.group(2), int(m.group(3)), m.group(4), ) # Find matching request by stream for req in reversed(self.requests): if req.stream_id == stream: raw = self._decode_data_str(data_str, line) if direction == "sent" or direction == "server_received": req.data += raw req.data_len = max(req.data_len, length) break # Also check omitted bytes om = RE_BYTES_OMITTED.search(line) if om: pass # length already captured break # Finalize any remaining headers if current_headers.get(":path") or current_headers.get(":method"): self._finalize_request(current_headers, current_direction, "") def _finalize_request(self, headers, direction, _line): req = Request() req.method = headers.pop(":method", "GET") req.path = headers.pop(":path", "/") req.authority = headers.pop(":authority", "") req.scheme = headers.pop(":scheme", "https") req.timestamp = headers.pop("timestamp", "") req.direction = direction req.headers = {k: v for k, v in headers.items() if not k.startswith(":")} self.requests.append(req) return req def _decode_data_str(self, s, full_line): """Decode escaped string from GODEBUG output back to bytes.""" try: # Handle Go's escaped bytes result = bytearray() i = 0 while i < len(s): if s[i] == "\\" and i + 1 < len(s): if s[i + 1] == "x" and i + 3 < len(s): result.append(int(s[i + 2 : i + 4], 16)) i += 4 elif s[i + 1] == "n": result.append(10) i += 2 elif s[i + 1] == "r": result.append(13) i += 2 elif s[i + 1] == "t": result.append(9) i += 2 elif s[i + 1] == "\\": result.append(92) i += 2 elif s[i + 1] == '"': result.append(34) i += 2 else: result.append(ord(s[i])) i += 1 else: result.append(ord(s[i])) i += 1 return bytes(result) except Exception: return s.encode("utf-8", errors="replace") def render(self): out = [] # Header out.append(f"\n{BOLD}{CYAN}{'═' * 70}{NC}") out.append(f"{BOLD}{CYAN} STANDALONE LS TRAFFIC SNAPSHOT{NC}") out.append(f"{BOLD}{CYAN}{'═' * 70}{NC}\n") # LS Logs if self.ls_logs: out.append(f"{BOLD}▸ Language Server Logs{NC}") out.append(f"{DIM}{'─' * 60}{NC}") for log in self.ls_logs: out.append(f" {DIM}{log}{NC}") out.append("") # Connections if self.connections: out.append(f"{BOLD}▸ Outbound Connections{NC}") out.append(f"{DIM}{'─' * 60}{NC}") for ts, target in self.connections: domain = target.split(":")[0] if ":" in target else target info = DOMAIN_INFO.get(domain, ("Unknown", "")) out.append( f" {GREEN}→{NC} {BOLD}{target}{NC} {DIM}({info[0]}){NC}" ) if info[1]: out.append(f" {DIM}{info[1]}{NC}") out.append("") # Group requests by domain by_domain = defaultdict(list) for req in self.requests: by_domain[req.authority].append(req) # Render each domain's requests for domain, reqs in by_domain.items(): if domain.startswith("127.0.0.1"): label = "Local (our requests to LS)" color = DIM else: info = DOMAIN_INFO.get(domain, ("External", "")) label = info[0] color = YELLOW if "API" in info[0] else CYAN out.append(f"{BOLD}{'═' * 70}{NC}") out.append(f"{BOLD}{color} {domain}{NC} {DIM}— {label}{NC}") out.append(f"{BOLD}{'═' * 70}{NC}") for i, req in enumerate(reqs): arrow = "→" if req.direction == "outgoing" else "←" method_color = GREEN if req.method == "GET" else YELLOW out.append(f"\n {BOLD}{arrow} {method_color}{req.method}{NC} {req.path}") # Important headers interesting = [ "authorization", "content-type", "user-agent", "unleash-appname", "unleash-instanceid", "unleash-sdk", "x-goog-api-key", "x-goog-api-client", "grpc-encoding", "te", ] shown = False for key in interesting: if key in req.headers: val = req.headers[key] # Mask tokens partially if key == "authorization" and len(val) > 30: if val.startswith("Bearer "): val = f"Bearer {val[7:20]}...{val[-10:]}" elif len(val) > 40: val = f"{val[:30]}...{val[-10:]}" out.append(f" {DIM}{key}:{NC} {val}") shown = True # All other headers (collapsed) other = { k: v for k, v in req.headers.items() if k not in interesting and not k.startswith(":") } if other: if not shown: out.append(f" {DIM}Headers:{NC}") for k, v in other.items(): out.append(f" {DIM}{k}:{NC} {v}") # Body if req.data: out.append(self._render_body(req.data, req.data_len)) out.append("") return "\n".join(out) def _render_body(self, data, total_len): """Render body data in the most readable format possible.""" lines = [] # Try JSON try: text = data.decode("utf-8") obj = json.loads(text) pretty = json.dumps(obj, indent=2, ensure_ascii=False) lines.append(f" {BOLD}Body ({len(data)} bytes, JSON):{NC}") for l in pretty.split("\n")[:30]: lines.append(f" {GREEN}{l}{NC}") if len(pretty.split("\n")) > 30: lines.append(f" {DIM}... ({len(pretty.split(chr(10))) - 30} more lines){NC}") return "\n".join(lines) except (json.JSONDecodeError, UnicodeDecodeError): pass # Try gzip if data[:2] == b"\x1f\x8b": try: decompressed = gzip.decompress(data) try: text = decompressed.decode("utf-8") try: obj = json.loads(text) pretty = json.dumps(obj, indent=2, ensure_ascii=False) lines.append( f" {BOLD}Body ({len(data)} bytes gzip → {len(decompressed)} bytes, JSON):{NC}" ) for l in pretty.split("\n")[:50]: lines.append(f" {GREEN}{l}{NC}") if len(pretty.split("\n")) > 50: lines.append( f" {DIM}... ({len(pretty.split(chr(10))) - 50} more lines){NC}" ) return "\n".join(lines) except json.JSONDecodeError: lines.append( f" {BOLD}Body ({len(data)} bytes gzip → {len(decompressed)} bytes, text):{NC}" ) for l in text.split("\n")[:20]: lines.append(f" {l[:200]}") return "\n".join(lines) except UnicodeDecodeError: lines.append( f" {BOLD}Body ({len(data)} bytes gzip → {len(decompressed)} bytes, binary):{NC}" ) lines.append(f" {DIM}{self._extract_strings(decompressed)}{NC}") return "\n".join(lines) except Exception: pass # Try protobuf (extract readable strings) if data[:1] in (b"\x08", b"\x0a", b"\x10", b"\x12", b"\x18", b"\x1a", b"\x20", b"\x22"): strings = self._extract_strings(data) if strings: lines.append(f" {BOLD}Body ({total_len} bytes, protobuf):{NC}") lines.append(f" {DIM}Extracted strings:{NC}") for s in strings.split(" | ")[:20]: s = s.strip() if len(s) > 3: lines.append(f" {MAGENTA}{s}{NC}") return "\n".join(lines) # Try plain text try: text = data.decode("utf-8") lines.append(f" {BOLD}Body ({len(data)} bytes, text):{NC}") for l in text.split("\n")[:10]: lines.append(f" {l[:200]}") return "\n".join(lines) except UnicodeDecodeError: pass # PNG if data[:4] == b"\x89PNG": lines.append(f" {BOLD}Body ({total_len} bytes, PNG image){NC}") return "\n".join(lines) # Binary fallback lines.append(f" {BOLD}Body ({total_len} bytes, binary):{NC}") strings = self._extract_strings(data) if strings: lines.append(f" {DIM}Extracted strings:{NC}") for s in strings.split(" | ")[:15]: s = s.strip() if len(s) > 3: lines.append(f" {MAGENTA}{s}{NC}") else: lines.append(f" {DIM}(no readable strings){NC}") return "\n".join(lines) def _extract_strings(self, data, min_len=4): """Extract printable ASCII strings from binary data.""" strings = [] current = bytearray() for b in data: if 32 <= b <= 126: current.append(b) else: if len(current) >= min_len: strings.append(current.decode("ascii")) current = bytearray() if len(current) >= min_len: strings.append(current.decode("ascii")) # Deduplicate while preserving order seen = set() unique = [] for s in strings: if s not in seen: seen.add(s) unique.append(s) return " | ".join(unique[:30]) def main(): if len(sys.argv) > 1: with open(sys.argv[1]) as f: lines = f.readlines() else: lines = sys.stdin.readlines() snap = Snapshot() snap.parse(lines) print(snap.render()) if __name__ == "__main__": main()