476 lines
18 KiB
Python
476 lines
18 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Parse Go GODEBUG=http2debug=2 output into a clean, readable snapshot.
|
|
|
|
Usage:
|
|
python3 parse-snapshot.py < raw-http2-dump.log
|
|
python3 parse-snapshot.py /path/to/logfile
|
|
"""
|
|
|
|
import sys
|
|
import re
|
|
import json
|
|
import gzip
|
|
from collections import defaultdict
|
|
from io import BytesIO
|
|
|
|
# ── Colors ────────────────────────────────────────────────────────────────────
|
|
BOLD = "\033[1m"
|
|
DIM = "\033[2m"
|
|
RED = "\033[91m"
|
|
GREEN = "\033[92m"
|
|
YELLOW = "\033[93m"
|
|
CYAN = "\033[96m"
|
|
MAGENTA = "\033[95m"
|
|
NC = "\033[0m"
|
|
|
|
# ── Regexes ───────────────────────────────────────────────────────────────────
|
|
RE_ENCODING_HEADER = re.compile(
|
|
r'http2: Transport encoding header "([^"]+)" = "([^"]*)"'
|
|
)
|
|
RE_DECODED_HEADER = re.compile(
|
|
r'http2: decoded hpack field header field "([^"]+)" = "([^"]*)"'
|
|
)
|
|
RE_SERVER_ENCODING = re.compile(
|
|
r'http2: server encoding header "([^"]+)" = "([^"]*)"'
|
|
)
|
|
RE_WROTE_DATA = re.compile(
|
|
r'http2: Framer [^:]+: wrote DATA flags=(\S+) stream=(\d+) len=(\d+) data="(.*?)"'
|
|
)
|
|
RE_READ_DATA = re.compile(
|
|
r'http2: Framer [^:]+: read DATA flags=(\S+) stream=(\d+) len=(\d+) data="(.*?)"'
|
|
)
|
|
RE_TRANSPORT_CONN = re.compile(
|
|
r'http2: Transport creating client conn [^ ]+ to (.+)'
|
|
)
|
|
RE_SERVER_READ_DATA = re.compile(
|
|
r'http2: server read frame DATA flags=(\S+) stream=(\d+) len=(\d+) data="(.*?)"'
|
|
)
|
|
RE_WROTE_HEADERS = re.compile(
|
|
r'http2: Framer [^:]+: wrote HEADERS flags=(\S+) stream=(\d+)'
|
|
)
|
|
RE_TIMESTAMP = re.compile(r'^(\d{4}/\d{2}/\d{2} \d{2}:\d{2}:\d{2})')
|
|
RE_LS_LOG = re.compile(r'^[IWE]\d{4} ')
|
|
RE_MAXPROCS = re.compile(r'^.*maxprocs:')
|
|
RE_BYTES_OMITTED = re.compile(r'\((\d+) bytes omitted\)$')
|
|
|
|
# Known domain purposes
|
|
DOMAIN_INFO = {
|
|
"antigravity-unleash.goog": ("Feature Flags", "Unleash SDK — controls A/B tests, feature rollouts"),
|
|
"daily-cloudcode-pa.googleapis.com": ("LLM API (gRPC)", "Primary Gemini/Claude API endpoint"),
|
|
"cloudcode-pa.googleapis.com": ("LLM API (gRPC)", "Production Gemini/Claude API endpoint"),
|
|
"api.anthropic.com": ("Claude API", "Direct Anthropic API calls"),
|
|
"lh3.googleusercontent.com": ("Profile Picture", "User avatar image"),
|
|
"play.googleapis.com": ("Telemetry", "Google Play telemetry/logging"),
|
|
"firebaseinstallations.googleapis.com": ("Firebase", "Firebase installation tracking"),
|
|
"oauth2.googleapis.com": ("OAuth", "Token refresh/exchange"),
|
|
"speech.googleapis.com": ("Speech", "Voice input processing"),
|
|
"modelarmor.googleapis.com": ("Safety", "Content safety/filtering"),
|
|
}
|
|
|
|
|
|
class Request:
|
|
def __init__(self):
|
|
self.method = ""
|
|
self.path = ""
|
|
self.authority = ""
|
|
self.scheme = ""
|
|
self.headers = {}
|
|
self.data = b""
|
|
self.data_len = 0
|
|
self.stream_id = None
|
|
self.timestamp = ""
|
|
self.direction = "outgoing" # outgoing = LS→upstream, incoming = LS←upstream
|
|
|
|
|
|
class Snapshot:
|
|
def __init__(self):
|
|
self.connections = [] # (timestamp, target)
|
|
self.requests = [] # list of Request
|
|
self.responses = defaultdict(lambda: {"headers": {}, "data": b"", "data_len": 0})
|
|
self.ls_logs = []
|
|
|
|
def parse(self, lines):
|
|
current_headers = {}
|
|
current_direction = "outgoing"
|
|
current_stream = None
|
|
|
|
for line in lines:
|
|
line = line.rstrip()
|
|
|
|
# Skip empty
|
|
if not line:
|
|
continue
|
|
|
|
# LS process logs
|
|
if RE_LS_LOG.match(line) or RE_MAXPROCS.match(line):
|
|
self.ls_logs.append(line)
|
|
continue
|
|
|
|
# New connection
|
|
m = RE_TRANSPORT_CONN.search(line)
|
|
if m:
|
|
ts = ""
|
|
ts_m = RE_TIMESTAMP.match(line)
|
|
if ts_m:
|
|
ts = ts_m.group(1)
|
|
self.connections.append((ts, m.group(1)))
|
|
continue
|
|
|
|
# Outgoing headers (Transport encoding = LS sending to upstream)
|
|
m = RE_ENCODING_HEADER.search(line)
|
|
if m:
|
|
key, val = m.group(1), m.group(2)
|
|
if key == ":method":
|
|
# New request starting
|
|
if current_headers.get(":path"):
|
|
self._finalize_request(current_headers, "outgoing", line)
|
|
current_headers = {}
|
|
current_direction = "outgoing"
|
|
current_headers[key] = val
|
|
ts_m = RE_TIMESTAMP.match(line)
|
|
if ts_m and "timestamp" not in current_headers:
|
|
current_headers["timestamp"] = ts_m.group(1)
|
|
continue
|
|
|
|
# Incoming headers (decoded hpack = upstream responding, OR server receiving)
|
|
m = RE_DECODED_HEADER.search(line)
|
|
if m:
|
|
key, val = m.group(1), m.group(2)
|
|
if key == ":authority" and "server read frame" not in line:
|
|
# This is a request received by our LS
|
|
if current_headers.get(":path"):
|
|
self._finalize_request(current_headers, current_direction, line)
|
|
current_headers = {}
|
|
current_direction = "incoming"
|
|
current_headers[key] = val
|
|
continue
|
|
|
|
# Server encoding (our LS responding)
|
|
m = RE_SERVER_ENCODING.search(line)
|
|
if m:
|
|
continue # Skip server response headers for now
|
|
|
|
# Headers frame written (triggers finalization)
|
|
m = RE_WROTE_HEADERS.search(line)
|
|
if m:
|
|
current_stream = m.group(2)
|
|
if current_headers.get(":path") or current_headers.get(":method"):
|
|
req = self._finalize_request(current_headers, current_direction, line)
|
|
if req:
|
|
req.stream_id = current_stream
|
|
current_headers = {}
|
|
continue
|
|
|
|
# Data frames (wrote = LS sending, read = LS receiving)
|
|
for pattern, direction in [
|
|
(RE_WROTE_DATA, "sent"),
|
|
(RE_READ_DATA, "received"),
|
|
(RE_SERVER_READ_DATA, "server_received"),
|
|
]:
|
|
m = pattern.search(line)
|
|
if m:
|
|
flags, stream, length, data_str = (
|
|
m.group(1),
|
|
m.group(2),
|
|
int(m.group(3)),
|
|
m.group(4),
|
|
)
|
|
# Find matching request by stream
|
|
for req in reversed(self.requests):
|
|
if req.stream_id == stream:
|
|
raw = self._decode_data_str(data_str, line)
|
|
if direction == "sent" or direction == "server_received":
|
|
req.data += raw
|
|
req.data_len = max(req.data_len, length)
|
|
break
|
|
# Also check omitted bytes
|
|
om = RE_BYTES_OMITTED.search(line)
|
|
if om:
|
|
pass # length already captured
|
|
break
|
|
|
|
# Finalize any remaining headers
|
|
if current_headers.get(":path") or current_headers.get(":method"):
|
|
self._finalize_request(current_headers, current_direction, "")
|
|
|
|
def _finalize_request(self, headers, direction, _line):
|
|
req = Request()
|
|
req.method = headers.pop(":method", "GET")
|
|
req.path = headers.pop(":path", "/")
|
|
req.authority = headers.pop(":authority", "")
|
|
req.scheme = headers.pop(":scheme", "https")
|
|
req.timestamp = headers.pop("timestamp", "")
|
|
req.direction = direction
|
|
req.headers = {k: v for k, v in headers.items() if not k.startswith(":")}
|
|
self.requests.append(req)
|
|
return req
|
|
|
|
def _decode_data_str(self, s, full_line):
|
|
"""Decode escaped string from GODEBUG output back to bytes."""
|
|
try:
|
|
# Handle Go's escaped bytes
|
|
result = bytearray()
|
|
i = 0
|
|
while i < len(s):
|
|
if s[i] == "\\" and i + 1 < len(s):
|
|
if s[i + 1] == "x" and i + 3 < len(s):
|
|
result.append(int(s[i + 2 : i + 4], 16))
|
|
i += 4
|
|
elif s[i + 1] == "n":
|
|
result.append(10)
|
|
i += 2
|
|
elif s[i + 1] == "r":
|
|
result.append(13)
|
|
i += 2
|
|
elif s[i + 1] == "t":
|
|
result.append(9)
|
|
i += 2
|
|
elif s[i + 1] == "\\":
|
|
result.append(92)
|
|
i += 2
|
|
elif s[i + 1] == '"':
|
|
result.append(34)
|
|
i += 2
|
|
else:
|
|
result.append(ord(s[i]))
|
|
i += 1
|
|
else:
|
|
result.append(ord(s[i]))
|
|
i += 1
|
|
return bytes(result)
|
|
except Exception:
|
|
return s.encode("utf-8", errors="replace")
|
|
|
|
def render(self):
|
|
out = []
|
|
|
|
# Header
|
|
out.append(f"\n{BOLD}{CYAN}{'═' * 70}{NC}")
|
|
out.append(f"{BOLD}{CYAN} STANDALONE LS TRAFFIC SNAPSHOT{NC}")
|
|
out.append(f"{BOLD}{CYAN}{'═' * 70}{NC}\n")
|
|
|
|
# LS Logs
|
|
if self.ls_logs:
|
|
out.append(f"{BOLD}▸ Language Server Logs{NC}")
|
|
out.append(f"{DIM}{'─' * 60}{NC}")
|
|
for log in self.ls_logs:
|
|
out.append(f" {DIM}{log}{NC}")
|
|
out.append("")
|
|
|
|
# Connections
|
|
if self.connections:
|
|
out.append(f"{BOLD}▸ Outbound Connections{NC}")
|
|
out.append(f"{DIM}{'─' * 60}{NC}")
|
|
for ts, target in self.connections:
|
|
domain = target.split(":")[0] if ":" in target else target
|
|
info = DOMAIN_INFO.get(domain, ("Unknown", ""))
|
|
out.append(
|
|
f" {GREEN}→{NC} {BOLD}{target}{NC} {DIM}({info[0]}){NC}"
|
|
)
|
|
if info[1]:
|
|
out.append(f" {DIM}{info[1]}{NC}")
|
|
out.append("")
|
|
|
|
# Group requests by domain
|
|
by_domain = defaultdict(list)
|
|
for req in self.requests:
|
|
by_domain[req.authority].append(req)
|
|
|
|
# Render each domain's requests
|
|
for domain, reqs in by_domain.items():
|
|
if domain.startswith("127.0.0.1"):
|
|
label = "Local (our requests to LS)"
|
|
color = DIM
|
|
else:
|
|
info = DOMAIN_INFO.get(domain, ("External", ""))
|
|
label = info[0]
|
|
color = YELLOW if "API" in info[0] else CYAN
|
|
|
|
out.append(f"{BOLD}{'═' * 70}{NC}")
|
|
out.append(f"{BOLD}{color} {domain}{NC} {DIM}— {label}{NC}")
|
|
out.append(f"{BOLD}{'═' * 70}{NC}")
|
|
|
|
for i, req in enumerate(reqs):
|
|
arrow = "→" if req.direction == "outgoing" else "←"
|
|
method_color = GREEN if req.method == "GET" else YELLOW
|
|
|
|
out.append(f"\n {BOLD}{arrow} {method_color}{req.method}{NC} {req.path}")
|
|
|
|
# Important headers
|
|
interesting = [
|
|
"authorization",
|
|
"content-type",
|
|
"user-agent",
|
|
"unleash-appname",
|
|
"unleash-instanceid",
|
|
"unleash-sdk",
|
|
"x-goog-api-key",
|
|
"x-goog-api-client",
|
|
"grpc-encoding",
|
|
"te",
|
|
]
|
|
shown = False
|
|
for key in interesting:
|
|
if key in req.headers:
|
|
val = req.headers[key]
|
|
# Mask tokens partially
|
|
if key == "authorization" and len(val) > 30:
|
|
if val.startswith("Bearer "):
|
|
val = f"Bearer {val[7:20]}...{val[-10:]}"
|
|
elif len(val) > 40:
|
|
val = f"{val[:30]}...{val[-10:]}"
|
|
out.append(f" {DIM}{key}:{NC} {val}")
|
|
shown = True
|
|
|
|
# All other headers (collapsed)
|
|
other = {
|
|
k: v
|
|
for k, v in req.headers.items()
|
|
if k not in interesting and not k.startswith(":")
|
|
}
|
|
if other:
|
|
if not shown:
|
|
out.append(f" {DIM}Headers:{NC}")
|
|
for k, v in other.items():
|
|
out.append(f" {DIM}{k}:{NC} {v}")
|
|
|
|
# Body
|
|
if req.data:
|
|
out.append(self._render_body(req.data, req.data_len))
|
|
|
|
out.append("")
|
|
|
|
return "\n".join(out)
|
|
|
|
def _render_body(self, data, total_len):
|
|
"""Render body data in the most readable format possible."""
|
|
lines = []
|
|
|
|
# Try JSON
|
|
try:
|
|
text = data.decode("utf-8")
|
|
obj = json.loads(text)
|
|
pretty = json.dumps(obj, indent=2, ensure_ascii=False)
|
|
lines.append(f" {BOLD}Body ({len(data)} bytes, JSON):{NC}")
|
|
for l in pretty.split("\n")[:30]:
|
|
lines.append(f" {GREEN}{l}{NC}")
|
|
if len(pretty.split("\n")) > 30:
|
|
lines.append(f" {DIM}... ({len(pretty.split(chr(10))) - 30} more lines){NC}")
|
|
return "\n".join(lines)
|
|
except (json.JSONDecodeError, UnicodeDecodeError):
|
|
pass
|
|
|
|
# Try gzip
|
|
if data[:2] == b"\x1f\x8b":
|
|
try:
|
|
decompressed = gzip.decompress(data)
|
|
try:
|
|
text = decompressed.decode("utf-8")
|
|
try:
|
|
obj = json.loads(text)
|
|
pretty = json.dumps(obj, indent=2, ensure_ascii=False)
|
|
lines.append(
|
|
f" {BOLD}Body ({len(data)} bytes gzip → {len(decompressed)} bytes, JSON):{NC}"
|
|
)
|
|
for l in pretty.split("\n")[:50]:
|
|
lines.append(f" {GREEN}{l}{NC}")
|
|
if len(pretty.split("\n")) > 50:
|
|
lines.append(
|
|
f" {DIM}... ({len(pretty.split(chr(10))) - 50} more lines){NC}"
|
|
)
|
|
return "\n".join(lines)
|
|
except json.JSONDecodeError:
|
|
lines.append(
|
|
f" {BOLD}Body ({len(data)} bytes gzip → {len(decompressed)} bytes, text):{NC}"
|
|
)
|
|
for l in text.split("\n")[:20]:
|
|
lines.append(f" {l[:200]}")
|
|
return "\n".join(lines)
|
|
except UnicodeDecodeError:
|
|
lines.append(
|
|
f" {BOLD}Body ({len(data)} bytes gzip → {len(decompressed)} bytes, binary):{NC}"
|
|
)
|
|
lines.append(f" {DIM}{self._extract_strings(decompressed)}{NC}")
|
|
return "\n".join(lines)
|
|
except Exception:
|
|
pass
|
|
|
|
# Try protobuf (extract readable strings)
|
|
if data[:1] in (b"\x08", b"\x0a", b"\x10", b"\x12", b"\x18", b"\x1a", b"\x20", b"\x22"):
|
|
strings = self._extract_strings(data)
|
|
if strings:
|
|
lines.append(f" {BOLD}Body ({total_len} bytes, protobuf):{NC}")
|
|
lines.append(f" {DIM}Extracted strings:{NC}")
|
|
for s in strings.split(" | ")[:20]:
|
|
s = s.strip()
|
|
if len(s) > 3:
|
|
lines.append(f" {MAGENTA}{s}{NC}")
|
|
return "\n".join(lines)
|
|
|
|
# Try plain text
|
|
try:
|
|
text = data.decode("utf-8")
|
|
lines.append(f" {BOLD}Body ({len(data)} bytes, text):{NC}")
|
|
for l in text.split("\n")[:10]:
|
|
lines.append(f" {l[:200]}")
|
|
return "\n".join(lines)
|
|
except UnicodeDecodeError:
|
|
pass
|
|
|
|
# PNG
|
|
if data[:4] == b"\x89PNG":
|
|
lines.append(f" {BOLD}Body ({total_len} bytes, PNG image){NC}")
|
|
return "\n".join(lines)
|
|
|
|
# Binary fallback
|
|
lines.append(f" {BOLD}Body ({total_len} bytes, binary):{NC}")
|
|
strings = self._extract_strings(data)
|
|
if strings:
|
|
lines.append(f" {DIM}Extracted strings:{NC}")
|
|
for s in strings.split(" | ")[:15]:
|
|
s = s.strip()
|
|
if len(s) > 3:
|
|
lines.append(f" {MAGENTA}{s}{NC}")
|
|
else:
|
|
lines.append(f" {DIM}(no readable strings){NC}")
|
|
return "\n".join(lines)
|
|
|
|
def _extract_strings(self, data, min_len=4):
|
|
"""Extract printable ASCII strings from binary data."""
|
|
strings = []
|
|
current = bytearray()
|
|
for b in data:
|
|
if 32 <= b <= 126:
|
|
current.append(b)
|
|
else:
|
|
if len(current) >= min_len:
|
|
strings.append(current.decode("ascii"))
|
|
current = bytearray()
|
|
if len(current) >= min_len:
|
|
strings.append(current.decode("ascii"))
|
|
# Deduplicate while preserving order
|
|
seen = set()
|
|
unique = []
|
|
for s in strings:
|
|
if s not in seen:
|
|
seen.add(s)
|
|
unique.append(s)
|
|
return " | ".join(unique[:30])
|
|
|
|
|
|
def main():
|
|
if len(sys.argv) > 1:
|
|
with open(sys.argv[1]) as f:
|
|
lines = f.readlines()
|
|
else:
|
|
lines = sys.stdin.readlines()
|
|
|
|
snap = Snapshot()
|
|
snap.parse(lines)
|
|
print(snap.render())
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|