diff --git a/server.py b/server.py index 1ed5d9e..5425865 100644 --- a/server.py +++ b/server.py @@ -293,7 +293,9 @@ async def list_repo_commits( if sha: params["sha"] = sha - result = await client.request("GET", f"/repos/{owner}/{repo}/commits", params=params) + result = await client.request( + "GET", f"/repos/{owner}/{repo}/commits", params=params + ) return json.dumps(result) @@ -320,19 +322,23 @@ async def get_workflow_run_logs( """ # Step 1: List workflow runs to find the run and job ID tasks_result = await client.request("GET", f"/repos/{owner}/{repo}/actions/tasks") - + if isinstance(tasks_result, dict) and tasks_result.get("error"): return json.dumps(tasks_result) - + # Find the run matching the run_number, or use the most recent - workflow_runs = tasks_result.get("workflow_runs", []) if isinstance(tasks_result, dict) else [] - + workflow_runs = ( + tasks_result.get("workflow_runs", []) if isinstance(tasks_result, dict) else [] + ) + if not workflow_runs: - return json.dumps({ - "error": True, - "message": f"No workflow runs found in repository {owner}/{repo}" - }) - + return json.dumps( + { + "error": True, + "message": f"No workflow runs found in repository {owner}/{repo}", + } + ) + target_run = None if run_number is None: # Use the most recent run (first in the list) @@ -343,22 +349,24 @@ async def get_workflow_run_logs( if run.get("run_number") == run_number: target_run = run break - + if not target_run: - return json.dumps({ - "error": True, - "message": f"Workflow run #{run_number} not found in repository {owner}/{repo}", - "available_runs": [r.get("run_number") for r in workflow_runs[:10]] - }) - + return json.dumps( + { + "error": True, + "message": f"Workflow run #{run_number} not found in repository {owner}/{repo}", + "available_runs": [r.get("run_number") for r in workflow_runs[:10]], + } + ) + run_id = target_run.get("id") run_status = target_run.get("status") run_title = target_run.get("display_title", "") - + # Step 2: Get jobs list and find the job matching this run_number # Note: Gitea's jobs endpoint uses different IDs than tasks, so we match by html_url jobs_result = await client.request("GET", f"/repos/{owner}/{repo}/actions/jobs") - + job_id = None if isinstance(jobs_result, dict): jobs = jobs_result.get("jobs", []) @@ -369,55 +377,82 @@ async def get_workflow_run_logs( if run_url_pattern in html_url: job_id = job.get("id") break - + if not job_id: - return json.dumps({ - "run_number": run_number, - "run_id": run_id, - "status": run_status, - "title": run_title, - "error": "Could not find job ID for this run", - "detail": "The job may still be queued or the run data is not available" - }) - + return json.dumps( + { + "run_number": run_number, + "run_id": run_id, + "status": run_status, + "title": run_title, + "error": "Could not find job ID for this run", + "detail": "The job may still be queued or the run data is not available", + } + ) + # Step 3: Fetch the logs - logs_result = await client.request("GET", f"/repos/{owner}/{repo}/actions/jobs/{job_id}/logs") - + logs_result = await client.request( + "GET", f"/repos/{owner}/{repo}/actions/jobs/{job_id}/logs" + ) + if isinstance(logs_result, dict) and logs_result.get("error"): - return json.dumps({ - "run_number": run_number, - "run_id": run_id, - "status": run_status, - "title": run_title, - "error": "Failed to fetch logs", - "detail": logs_result.get("message", "Unknown error") - }) - + return json.dumps( + { + "run_number": run_number, + "run_id": run_id, + "status": run_status, + "title": run_title, + "error": "Failed to fetch logs", + "detail": logs_result.get("message", "Unknown error"), + } + ) + # Extract log content log_content = "" if isinstance(logs_result, dict): log_content = logs_result.get("content", "") elif isinstance(logs_result, str): log_content = logs_result - + lines = log_content.splitlines() total_lines = len(lines) - + # For failed jobs, try to find error context instead of just tail if run_status == "failure" and tail_lines > 0 and total_lines > tail_lines: # Error indicators to search for error_patterns = [ - "❌", "Error:", "ERROR:", "error:", "FAILED", "Failed", - "fatal:", "FATAL:", "Exception:", "exception:", - "Cannot find", "not found", "No such file", - "Permission denied", "command not found", - "npm ERR!", "pnpm ERR!", "yarn error", - "TypeScript error", "TS2", "TS1", # TypeScript errors - "SyntaxError", "ReferenceError", "TypeError", - "Build failed", "build failed", "Compilation failed", - "exit code 1", "exit code 2", "exited with", + "❌", + "Error:", + "ERROR:", + "error:", + "FAILED", + "Failed", + "fatal:", + "FATAL:", + "Exception:", + "exception:", + "Cannot find", + "not found", + "No such file", + "Permission denied", + "command not found", + "npm ERR!", + "pnpm ERR!", + "yarn error", + "TypeScript error", + "TS2", + "TS1", # TypeScript errors + "SyntaxError", + "ReferenceError", + "TypeError", + "Build failed", + "build failed", + "Compilation failed", + "exit code 1", + "exit code 2", + "exited with", ] - + # Find lines containing errors error_line_indices = [] for i, line in enumerate(lines): @@ -425,25 +460,25 @@ async def get_workflow_run_logs( if pattern in line: error_line_indices.append(i) break - + if error_line_indices: # Get unique error regions with context (5 lines before, 3 after) context_before = 5 context_after = 3 selected_lines = set() - + for idx in error_line_indices: start = max(0, idx - context_before) end = min(total_lines, idx + context_after + 1) for i in range(start, end): selected_lines.add(i) - + # Cap at tail_lines to avoid overwhelming output sorted_indices = sorted(selected_lines) if len(sorted_indices) > tail_lines: # Take the last error regions sorted_indices = sorted_indices[-tail_lines:] - + # Build output with line breaks between non-contiguous sections output_parts = [] prev_idx = -2 @@ -453,27 +488,38 @@ async def get_workflow_run_logs( output_parts.append("...") output_parts.append(lines[idx]) prev_idx = idx - - log_content = f"... (showing {len(sorted_indices)} error-relevant lines of {total_lines} total) ...\n" + "\n".join(output_parts) + + log_content = ( + f"... (showing {len(sorted_indices)} error-relevant lines of {total_lines} total) ...\n" + + "\n".join(output_parts) + ) else: # No errors found, fall back to tail lines = lines[-tail_lines:] - log_content = f"... (showing last {tail_lines} of {total_lines} lines) ...\n" + "\n".join(lines) + log_content = ( + f"... (showing last {tail_lines} of {total_lines} lines) ...\n" + + "\n".join(lines) + ) elif tail_lines > 0 and total_lines > tail_lines: # Not a failure or no smart extraction, just tail lines = lines[-tail_lines:] - log_content = f"... (showing last {tail_lines} of {total_lines} lines) ...\n" + "\n".join(lines) + log_content = ( + f"... (showing last {tail_lines} of {total_lines} lines) ...\n" + + "\n".join(lines) + ) else: log_content = "\n".join(lines) - - return json.dumps({ - "run_number": run_number, - "run_id": run_id, - "status": run_status, - "title": run_title, - "job_id": job_id, - "logs": log_content - }) + + return json.dumps( + { + "run_number": run_number, + "run_id": run_id, + "status": run_status, + "title": run_title, + "job_id": job_id, + "logs": log_content, + } + ) # ============================================================================= @@ -485,8 +531,8 @@ async def get_workflow_run_logs( async def gitea_api_call( endpoint: str, method: str = "GET", - params: str = "{}", - body: str = "{}", + params: str | dict[str, Any] = "{}", + body: str | dict[str, Any] = "{}", ) -> str: """Execute a raw API call to Gitea. @@ -497,16 +543,25 @@ async def gitea_api_call( Args: endpoint: API endpoint path (e.g., '/repos/owner/repo/releases') method: HTTP method (GET, POST, PUT, PATCH, DELETE) - params: JSON string of query parameters (optional) - body: JSON string of request body for POST/PUT/PATCH (optional) + params: JSON string or dict of query parameters (optional) + body: JSON string or dict of request body for POST/PUT/PATCH (optional) Example: gitea_api_call('/repos/myorg/myrepo/releases', 'POST', body='{{"tag_name": "v1.0.0", "name": "Release 1.0"}}') """.format(base_url=GITEA_URL) try: - params_dict = json.loads(params) if params else {} - body_dict = json.loads(body) if body else {} + # Handle params: accept both string and dict + if isinstance(params, str): + params_dict = json.loads(params) if params else {} + else: + params_dict = params or {} + + # Handle body: accept both string and dict + if isinstance(body, str): + body_dict = json.loads(body) if body else {} + else: + body_dict = body or {} except json.JSONDecodeError as e: return json.dumps({"error": True, "message": f"Invalid JSON: {e}"}) @@ -526,7 +581,7 @@ async def gitea_api_call( async def health_check(request): """Health check endpoint for Docker/Kubernetes. - + This is a liveness probe - it just confirms the server is running. Gitea connectivity is validated when tools are actually called. """ @@ -543,10 +598,10 @@ async def lifespan(app): def create_app() -> Starlette: """Create the Starlette application with health check and MCP.""" mcp_app = mcp.http_app() - + # Add health check route directly to the MCP app mcp_app.add_route("/health", health_check, methods=["GET"]) - + return mcp_app