""" Gitea MCP Server - Hybrid MCP Light implementation for Gitea API. Provides 5 curated tools plus an API pass-through for complete API coverage. """ import json import os from contextlib import asynccontextmanager from typing import Any from urllib.parse import urljoin import httpx from dotenv import load_dotenv from fastmcp import FastMCP from starlette.applications import Starlette from starlette.responses import JSONResponse from starlette.routing import Mount, Route load_dotenv() # Configuration GITEA_URL = os.getenv("GITEA_URL", "").rstrip("/") GITEA_TOKEN = os.getenv("GITEA_TOKEN", "") # Initialize MCP server mcp = FastMCP( "Gitea MCP", instructions="MCP server for Gitea API - provides repository, issue, and user management", ) class GiteaClient: """HTTP client for Gitea API with token authentication.""" def __init__(self, base_url: str, token: str): self.base_url = base_url.rstrip("/") self.api_url = f"{self.base_url}/api/v1" self.token = token self._client: httpx.AsyncClient | None = None async def _get_client(self) -> httpx.AsyncClient: if self._client is None: self._client = httpx.AsyncClient( headers={ "Authorization": f"token {self.token}", "Accept": "application/json", "Content-Type": "application/json", }, timeout=30.0, ) return self._client async def close(self): if self._client: await self._client.aclose() self._client = None async def request( self, method: str, endpoint: str, params: dict[str, Any] | None = None, json_body: dict[str, Any] | None = None, ) -> dict[str, Any] | list[Any]: """Execute an API request to Gitea.""" client = await self._get_client() url = f"{self.api_url}{endpoint}" try: response = await client.request( method=method.upper(), url=url, params=params, json=json_body, ) response.raise_for_status() if response.status_code == 204: return {"status": "success", "message": "No content"} # Try to parse as JSON, fall back to text for endpoints like /logs content_type = response.headers.get("content-type", "") if "application/json" in content_type: return response.json() else: # Return raw text wrapped in a dict for non-JSON responses return {"content": response.text, "content_type": content_type} except httpx.HTTPStatusError as e: return { "error": True, "status_code": e.response.status_code, "message": str(e), "detail": e.response.text[:500] if e.response.text else None, } except Exception as e: return {"error": True, "message": str(e)} async def health_check(self) -> bool: """Check if Gitea is accessible.""" try: client = await self._get_client() response = await client.get(f"{self.api_url}/version") return response.status_code == 200 except Exception: return False # Global client instance client = GiteaClient(GITEA_URL, GITEA_TOKEN) # ============================================================================= # Curated API Reference # ============================================================================= API_REFERENCE = """ # Gitea API Quick Reference For complete API documentation, see: {base_url}/api/swagger ## Common Endpoints for `gitea_api_call` tool ### Repository Operations - GET `/repos/search?q={{keyword}}&limit=10` - Search repositories - GET `/repos/{{owner}}/{{repo}}` - Get repository details - POST `/repos/{{owner}}/{{repo}}` - Create repository (body: name, description, private) - PATCH `/repos/{{owner}}/{{repo}}` - Update repository - DELETE `/repos/{{owner}}/{{repo}}` - Delete repository - GET `/repos/{{owner}}/{{repo}}/branches` - List branches - GET `/repos/{{owner}}/{{repo}}/commits?limit=20` - List commits ### Issues - GET `/repos/{{owner}}/{{repo}}/issues?state=open` - List issues - POST `/repos/{{owner}}/{{repo}}/issues` - Create issue (body: title, body) - GET `/repos/{{owner}}/{{repo}}/issues/{{index}}` - Get issue by number - PATCH `/repos/{{owner}}/{{repo}}/issues/{{index}}` - Update issue - POST `/repos/{{owner}}/{{repo}}/issues/{{index}}/comments` - Add comment (body: body) ### Pull Requests - GET `/repos/{{owner}}/{{repo}}/pulls?state=open` - List PRs - POST `/repos/{{owner}}/{{repo}}/pulls` - Create PR (body: title, head, base) - GET `/repos/{{owner}}/{{repo}}/pulls/{{index}}` - Get PR by number - POST `/repos/{{owner}}/{{repo}}/pulls/{{index}}/merge` - Merge PR ### File Operations - GET `/repos/{{owner}}/{{repo}}/contents/{{filepath}}?ref={{branch}}` - Get file content - PUT `/repos/{{owner}}/{{repo}}/contents/{{filepath}}` - Create/update file - DELETE `/repos/{{owner}}/{{repo}}/contents/{{filepath}}` - Delete file ### Releases & Tags - GET `/repos/{{owner}}/{{repo}}/releases` - List releases - GET `/repos/{{owner}}/{{repo}}/releases/latest` - Get latest release - POST `/repos/{{owner}}/{{repo}}/releases` - Create release - GET `/repos/{{owner}}/{{repo}}/tags` - List tags - POST `/repos/{{owner}}/{{repo}}/tags` - Create tag ### Actions/CI (Gitea Actions) - GET `/repos/{{owner}}/{{repo}}/actions/tasks` - List all workflow runs - GET `/repos/{{owner}}/{{repo}}/actions/runs/{{run_id}}/jobs` - List jobs for a run - GET `/repos/{{owner}}/{{repo}}/actions/jobs/{{job_id}}/logs` - Get job logs - Use `get_workflow_run_logs` tool for easy log retrieval by run number ### User & Organizations - GET `/user` - Get authenticated user - GET `/users/{{username}}` - Get user by username - GET `/user/repos?limit=50` - List authenticated user's repos - GET `/orgs/{{org}}` - Get organization - GET `/orgs/{{org}}/repos` - List organization repos - GET `/orgs/{{org}}/members` - List organization members ### Pagination Most list endpoints support `page` and `limit` query parameters. Default limit is usually 20-50 items. """.format(base_url=GITEA_URL) @mcp.resource("gitea://api-reference") def get_api_reference() -> str: """Returns the Gitea API quick reference for using the gitea_api_call tool.""" return API_REFERENCE # ============================================================================= # MCP Tools - Curated Operations # ============================================================================= @mcp.tool() async def get_my_user_info() -> str: """Get information about the authenticated user. Returns the current user's profile including username, email, and permissions. This is useful for determining the authenticated identity before other operations. """ result = await client.request("GET", "/user") return json.dumps(result) @mcp.tool() async def search_repos( keyword: str, limit: int = 20, private: bool | None = None, archived: bool | None = None, ) -> str: """Search for repositories by keyword. Args: keyword: Search term to find in repository names/descriptions limit: Maximum number of results (default: 20, max: 50) private: Filter by private status (optional) archived: Filter by archived status (optional) """ params: dict[str, Any] = {"q": keyword, "limit": min(limit, 50)} if private is not None: params["private"] = private if archived is not None: params["archived"] = archived result = await client.request("GET", "/repos/search", params=params) return json.dumps(result) @mcp.tool() async def list_my_repos( limit: int = 50, page: int = 1, ) -> str: """List repositories owned by or accessible to the authenticated user. Args: limit: Maximum number of results per page (default: 50) page: Page number for pagination (default: 1) """ params = {"limit": limit, "page": page} result = await client.request("GET", "/user/repos", params=params) return json.dumps(result) @mcp.tool() async def get_repo(owner: str, repo: str) -> str: """Get detailed information about a specific repository. Args: owner: Repository owner (username or organization) repo: Repository name """ result = await client.request("GET", f"/repos/{owner}/{repo}") return json.dumps(result) @mcp.tool() async def list_repo_issues( owner: str, repo: str, state: str = "open", limit: int = 30, page: int = 1, ) -> str: """List issues for a repository. Args: owner: Repository owner (username or organization) repo: Repository name state: Issue state filter: 'open', 'closed', or 'all' (default: 'open') limit: Maximum number of results (default: 30) page: Page number for pagination (default: 1) """ params = {"state": state, "limit": limit, "page": page} result = await client.request("GET", f"/repos/{owner}/{repo}/issues", params=params) return json.dumps(result) @mcp.tool() async def list_repo_commits( owner: str, repo: str, sha: str | None = None, limit: int = 30, page: int = 1, ) -> str: """List commits for a repository. Args: owner: Repository owner (username or organization) repo: Repository name sha: SHA or branch to start listing from (default: default branch) limit: Maximum number of results (default: 30) page: Page number for pagination (default: 1) """ params: dict[str, Any] = {"limit": limit, "page": page} if sha: params["sha"] = sha result = await client.request("GET", f"/repos/{owner}/{repo}/commits", params=params) return json.dumps(result) @mcp.tool() async def get_workflow_run_logs( owner: str, repo: str, run_number: int | None = None, tail_lines: int = 50, ) -> str: """Get the logs for a specific workflow run by run number. This tool fetches the logs from Gitea Actions workflow runs. Since logs can be very large, it returns only the last N lines by default. Args: owner: Repository owner (username or organization) repo: Repository name run_number: The workflow run number (visible in UI, e.g., #21). If not specified, uses the most recent run. tail_lines: Number of lines to return from the end of the log (default: 50, use 0 for all) Returns: The workflow run status and log content (last N lines) """ # Step 1: List workflow runs to find the run and job ID tasks_result = await client.request("GET", f"/repos/{owner}/{repo}/actions/tasks") if isinstance(tasks_result, dict) and tasks_result.get("error"): return json.dumps(tasks_result) # Find the run matching the run_number, or use the most recent workflow_runs = tasks_result.get("workflow_runs", []) if isinstance(tasks_result, dict) else [] if not workflow_runs: return json.dumps({ "error": True, "message": f"No workflow runs found in repository {owner}/{repo}" }) target_run = None if run_number is None: # Use the most recent run (first in the list) target_run = workflow_runs[0] run_number = target_run.get("run_number") else: for run in workflow_runs: if run.get("run_number") == run_number: target_run = run break if not target_run: return json.dumps({ "error": True, "message": f"Workflow run #{run_number} not found in repository {owner}/{repo}", "available_runs": [r.get("run_number") for r in workflow_runs[:10]] }) run_id = target_run.get("id") run_status = target_run.get("status") run_title = target_run.get("display_title", "") # Step 2: Get jobs list and find the job matching this run_number # Note: Gitea's jobs endpoint uses different IDs than tasks, so we match by html_url jobs_result = await client.request("GET", f"/repos/{owner}/{repo}/actions/jobs") job_id = None if isinstance(jobs_result, dict): jobs = jobs_result.get("jobs", []) # Find job whose html_url contains /runs/{run_number}/ run_url_pattern = f"/runs/{run_number}/" for job in jobs: html_url = job.get("html_url", "") if run_url_pattern in html_url: job_id = job.get("id") break if not job_id: return json.dumps({ "run_number": run_number, "run_id": run_id, "status": run_status, "title": run_title, "error": "Could not find job ID for this run", "detail": "The job may still be queued or the run data is not available" }) # Step 3: Fetch the logs logs_result = await client.request("GET", f"/repos/{owner}/{repo}/actions/jobs/{job_id}/logs") if isinstance(logs_result, dict) and logs_result.get("error"): return json.dumps({ "run_number": run_number, "run_id": run_id, "status": run_status, "title": run_title, "error": "Failed to fetch logs", "detail": logs_result.get("message", "Unknown error") }) # Extract log content log_content = "" if isinstance(logs_result, dict): log_content = logs_result.get("content", "") elif isinstance(logs_result, str): log_content = logs_result lines = log_content.splitlines() total_lines = len(lines) # For failed jobs, try to find error context instead of just tail if run_status == "failure" and tail_lines > 0 and total_lines > tail_lines: # Error indicators to search for error_patterns = [ "❌", "Error:", "ERROR:", "error:", "FAILED", "Failed", "fatal:", "FATAL:", "Exception:", "exception:", "Cannot find", "not found", "No such file", "Permission denied", "command not found", "npm ERR!", "pnpm ERR!", "yarn error", "TypeScript error", "TS2", "TS1", # TypeScript errors "SyntaxError", "ReferenceError", "TypeError", "Build failed", "build failed", "Compilation failed", "exit code 1", "exit code 2", "exited with", ] # Find lines containing errors error_line_indices = [] for i, line in enumerate(lines): for pattern in error_patterns: if pattern in line: error_line_indices.append(i) break if error_line_indices: # Get unique error regions with context (5 lines before, 3 after) context_before = 5 context_after = 3 selected_lines = set() for idx in error_line_indices: start = max(0, idx - context_before) end = min(total_lines, idx + context_after + 1) for i in range(start, end): selected_lines.add(i) # Cap at tail_lines to avoid overwhelming output sorted_indices = sorted(selected_lines) if len(sorted_indices) > tail_lines: # Take the last error regions sorted_indices = sorted_indices[-tail_lines:] # Build output with line breaks between non-contiguous sections output_parts = [] prev_idx = -2 for idx in sorted_indices: if idx > prev_idx + 1: if output_parts: output_parts.append("...") output_parts.append(lines[idx]) prev_idx = idx log_content = f"... (showing {len(sorted_indices)} error-relevant lines of {total_lines} total) ...\n" + "\n".join(output_parts) else: # No errors found, fall back to tail lines = lines[-tail_lines:] log_content = f"... (showing last {tail_lines} of {total_lines} lines) ...\n" + "\n".join(lines) elif tail_lines > 0 and total_lines > tail_lines: # Not a failure or no smart extraction, just tail lines = lines[-tail_lines:] log_content = f"... (showing last {tail_lines} of {total_lines} lines) ...\n" + "\n".join(lines) else: log_content = "\n".join(lines) return json.dumps({ "run_number": run_number, "run_id": run_id, "status": run_status, "title": run_title, "job_id": job_id, "logs": log_content }) # ============================================================================= # API Pass-through Tool # ============================================================================= @mcp.tool() async def gitea_api_call( endpoint: str, method: str = "GET", params: str = "{}", body: str = "{}", ) -> str: """Execute a raw API call to Gitea. Use this for any operation not covered by the other tools. Refer to the 'gitea://api-reference' resource for common endpoints, or see the full API docs at {base_url}/api/swagger Args: endpoint: API endpoint path (e.g., '/repos/owner/repo/releases') method: HTTP method (GET, POST, PUT, PATCH, DELETE) params: JSON string of query parameters (optional) body: JSON string of request body for POST/PUT/PATCH (optional) Example: gitea_api_call('/repos/myorg/myrepo/releases', 'POST', body='{{"tag_name": "v1.0.0", "name": "Release 1.0"}}') """.format(base_url=GITEA_URL) try: params_dict = json.loads(params) if params else {} body_dict = json.loads(body) if body else {} except json.JSONDecodeError as e: return json.dumps({"error": True, "message": f"Invalid JSON: {e}"}) result = await client.request( method=method, endpoint=endpoint, params=params_dict if params_dict else None, json_body=body_dict if body_dict else None, ) return json.dumps(result) # ============================================================================= # Starlette Wrapper for Health Checks # ============================================================================= async def health_check(request): """Health check endpoint for Docker/Kubernetes. This is a liveness probe - it just confirms the server is running. Gitea connectivity is validated when tools are actually called. """ return JSONResponse({"status": "ok", "gitea_url": GITEA_URL}) @asynccontextmanager async def lifespan(app): """Manage client lifecycle.""" yield await client.close() def create_app() -> Starlette: """Create the Starlette application with health check and MCP.""" mcp_app = mcp.http_app() # Add health check route directly to the MCP app mcp_app.add_route("/health", health_check, methods=["GET"]) return mcp_app app = create_app() if __name__ == "__main__": import uvicorn port = int(os.getenv("PORT", "8000")) uvicorn.run(app, host="0.0.0.0", port=port)