From f8eda211e607ae1cfb532a1bf37ff7fa82a3231e Mon Sep 17 00:00:00 2001 From: Ben Date: Fri, 26 Dec 2025 20:02:26 +0000 Subject: [PATCH] Fix gitea_api_call to accept both string and dict for params/body The gitea_api_call tool was failing when MCP clients passed JSON objects for the body or params parameters. The issue was that the tool signature only accepted string types, but MCP frameworks parse JSON strings into dicts before passing them to the tool. This fix updates the function signature to accept Union[str, dict] for both params and body parameters, and handles both cases appropriately. Fixes #1 --- server.py | 211 ++++++++++++++++++++++++++++++++++-------------------- 1 file changed, 133 insertions(+), 78 deletions(-) diff --git a/server.py b/server.py index 1ed5d9e..5425865 100644 --- a/server.py +++ b/server.py @@ -293,7 +293,9 @@ async def list_repo_commits( if sha: params["sha"] = sha - result = await client.request("GET", f"/repos/{owner}/{repo}/commits", params=params) + result = await client.request( + "GET", f"/repos/{owner}/{repo}/commits", params=params + ) return json.dumps(result) @@ -320,19 +322,23 @@ async def get_workflow_run_logs( """ # Step 1: List workflow runs to find the run and job ID tasks_result = await client.request("GET", f"/repos/{owner}/{repo}/actions/tasks") - + if isinstance(tasks_result, dict) and tasks_result.get("error"): return json.dumps(tasks_result) - + # Find the run matching the run_number, or use the most recent - workflow_runs = tasks_result.get("workflow_runs", []) if isinstance(tasks_result, dict) else [] - + workflow_runs = ( + tasks_result.get("workflow_runs", []) if isinstance(tasks_result, dict) else [] + ) + if not workflow_runs: - return json.dumps({ - "error": True, - "message": f"No workflow runs found in repository {owner}/{repo}" - }) - + return json.dumps( + { + "error": True, + "message": f"No workflow runs found in repository {owner}/{repo}", + } + ) + target_run = None if run_number is None: # Use the most recent run (first in the list) @@ -343,22 +349,24 @@ async def get_workflow_run_logs( if run.get("run_number") == run_number: target_run = run break - + if not target_run: - return json.dumps({ - "error": True, - "message": f"Workflow run #{run_number} not found in repository {owner}/{repo}", - "available_runs": [r.get("run_number") for r in workflow_runs[:10]] - }) - + return json.dumps( + { + "error": True, + "message": f"Workflow run #{run_number} not found in repository {owner}/{repo}", + "available_runs": [r.get("run_number") for r in workflow_runs[:10]], + } + ) + run_id = target_run.get("id") run_status = target_run.get("status") run_title = target_run.get("display_title", "") - + # Step 2: Get jobs list and find the job matching this run_number # Note: Gitea's jobs endpoint uses different IDs than tasks, so we match by html_url jobs_result = await client.request("GET", f"/repos/{owner}/{repo}/actions/jobs") - + job_id = None if isinstance(jobs_result, dict): jobs = jobs_result.get("jobs", []) @@ -369,55 +377,82 @@ async def get_workflow_run_logs( if run_url_pattern in html_url: job_id = job.get("id") break - + if not job_id: - return json.dumps({ - "run_number": run_number, - "run_id": run_id, - "status": run_status, - "title": run_title, - "error": "Could not find job ID for this run", - "detail": "The job may still be queued or the run data is not available" - }) - + return json.dumps( + { + "run_number": run_number, + "run_id": run_id, + "status": run_status, + "title": run_title, + "error": "Could not find job ID for this run", + "detail": "The job may still be queued or the run data is not available", + } + ) + # Step 3: Fetch the logs - logs_result = await client.request("GET", f"/repos/{owner}/{repo}/actions/jobs/{job_id}/logs") - + logs_result = await client.request( + "GET", f"/repos/{owner}/{repo}/actions/jobs/{job_id}/logs" + ) + if isinstance(logs_result, dict) and logs_result.get("error"): - return json.dumps({ - "run_number": run_number, - "run_id": run_id, - "status": run_status, - "title": run_title, - "error": "Failed to fetch logs", - "detail": logs_result.get("message", "Unknown error") - }) - + return json.dumps( + { + "run_number": run_number, + "run_id": run_id, + "status": run_status, + "title": run_title, + "error": "Failed to fetch logs", + "detail": logs_result.get("message", "Unknown error"), + } + ) + # Extract log content log_content = "" if isinstance(logs_result, dict): log_content = logs_result.get("content", "") elif isinstance(logs_result, str): log_content = logs_result - + lines = log_content.splitlines() total_lines = len(lines) - + # For failed jobs, try to find error context instead of just tail if run_status == "failure" and tail_lines > 0 and total_lines > tail_lines: # Error indicators to search for error_patterns = [ - "❌", "Error:", "ERROR:", "error:", "FAILED", "Failed", - "fatal:", "FATAL:", "Exception:", "exception:", - "Cannot find", "not found", "No such file", - "Permission denied", "command not found", - "npm ERR!", "pnpm ERR!", "yarn error", - "TypeScript error", "TS2", "TS1", # TypeScript errors - "SyntaxError", "ReferenceError", "TypeError", - "Build failed", "build failed", "Compilation failed", - "exit code 1", "exit code 2", "exited with", + "❌", + "Error:", + "ERROR:", + "error:", + "FAILED", + "Failed", + "fatal:", + "FATAL:", + "Exception:", + "exception:", + "Cannot find", + "not found", + "No such file", + "Permission denied", + "command not found", + "npm ERR!", + "pnpm ERR!", + "yarn error", + "TypeScript error", + "TS2", + "TS1", # TypeScript errors + "SyntaxError", + "ReferenceError", + "TypeError", + "Build failed", + "build failed", + "Compilation failed", + "exit code 1", + "exit code 2", + "exited with", ] - + # Find lines containing errors error_line_indices = [] for i, line in enumerate(lines): @@ -425,25 +460,25 @@ async def get_workflow_run_logs( if pattern in line: error_line_indices.append(i) break - + if error_line_indices: # Get unique error regions with context (5 lines before, 3 after) context_before = 5 context_after = 3 selected_lines = set() - + for idx in error_line_indices: start = max(0, idx - context_before) end = min(total_lines, idx + context_after + 1) for i in range(start, end): selected_lines.add(i) - + # Cap at tail_lines to avoid overwhelming output sorted_indices = sorted(selected_lines) if len(sorted_indices) > tail_lines: # Take the last error regions sorted_indices = sorted_indices[-tail_lines:] - + # Build output with line breaks between non-contiguous sections output_parts = [] prev_idx = -2 @@ -453,27 +488,38 @@ async def get_workflow_run_logs( output_parts.append("...") output_parts.append(lines[idx]) prev_idx = idx - - log_content = f"... (showing {len(sorted_indices)} error-relevant lines of {total_lines} total) ...\n" + "\n".join(output_parts) + + log_content = ( + f"... (showing {len(sorted_indices)} error-relevant lines of {total_lines} total) ...\n" + + "\n".join(output_parts) + ) else: # No errors found, fall back to tail lines = lines[-tail_lines:] - log_content = f"... (showing last {tail_lines} of {total_lines} lines) ...\n" + "\n".join(lines) + log_content = ( + f"... (showing last {tail_lines} of {total_lines} lines) ...\n" + + "\n".join(lines) + ) elif tail_lines > 0 and total_lines > tail_lines: # Not a failure or no smart extraction, just tail lines = lines[-tail_lines:] - log_content = f"... (showing last {tail_lines} of {total_lines} lines) ...\n" + "\n".join(lines) + log_content = ( + f"... (showing last {tail_lines} of {total_lines} lines) ...\n" + + "\n".join(lines) + ) else: log_content = "\n".join(lines) - - return json.dumps({ - "run_number": run_number, - "run_id": run_id, - "status": run_status, - "title": run_title, - "job_id": job_id, - "logs": log_content - }) + + return json.dumps( + { + "run_number": run_number, + "run_id": run_id, + "status": run_status, + "title": run_title, + "job_id": job_id, + "logs": log_content, + } + ) # ============================================================================= @@ -485,8 +531,8 @@ async def get_workflow_run_logs( async def gitea_api_call( endpoint: str, method: str = "GET", - params: str = "{}", - body: str = "{}", + params: str | dict[str, Any] = "{}", + body: str | dict[str, Any] = "{}", ) -> str: """Execute a raw API call to Gitea. @@ -497,16 +543,25 @@ async def gitea_api_call( Args: endpoint: API endpoint path (e.g., '/repos/owner/repo/releases') method: HTTP method (GET, POST, PUT, PATCH, DELETE) - params: JSON string of query parameters (optional) - body: JSON string of request body for POST/PUT/PATCH (optional) + params: JSON string or dict of query parameters (optional) + body: JSON string or dict of request body for POST/PUT/PATCH (optional) Example: gitea_api_call('/repos/myorg/myrepo/releases', 'POST', body='{{"tag_name": "v1.0.0", "name": "Release 1.0"}}') """.format(base_url=GITEA_URL) try: - params_dict = json.loads(params) if params else {} - body_dict = json.loads(body) if body else {} + # Handle params: accept both string and dict + if isinstance(params, str): + params_dict = json.loads(params) if params else {} + else: + params_dict = params or {} + + # Handle body: accept both string and dict + if isinstance(body, str): + body_dict = json.loads(body) if body else {} + else: + body_dict = body or {} except json.JSONDecodeError as e: return json.dumps({"error": True, "message": f"Invalid JSON: {e}"}) @@ -526,7 +581,7 @@ async def gitea_api_call( async def health_check(request): """Health check endpoint for Docker/Kubernetes. - + This is a liveness probe - it just confirms the server is running. Gitea connectivity is validated when tools are actually called. """ @@ -543,10 +598,10 @@ async def lifespan(app): def create_app() -> Starlette: """Create the Starlette application with health check and MCP.""" mcp_app = mcp.http_app() - + # Add health check route directly to the MCP app mcp_app.add_route("/health", health_check, methods=["GET"]) - + return mcp_app