Files
gitea-mcp-custom/server.py
Ben f8eda211e6
All checks were successful
Build and Push Gitea MCP Docker Image / build (push) Successful in 17s
Fix gitea_api_call to accept both string and dict for params/body
The gitea_api_call tool was failing when MCP clients passed JSON objects
for the body or params parameters. The issue was that the tool signature
only accepted string types, but MCP frameworks parse JSON strings into
dicts before passing them to the tool.

This fix updates the function signature to accept Union[str, dict] for
both params and body parameters, and handles both cases appropriately.

Fixes #1
2025-12-26 20:02:26 +00:00

615 lines
20 KiB
Python

"""
Gitea MCP Server - Hybrid MCP Light implementation for Gitea API.
Provides 5 curated tools plus an API pass-through for complete API coverage.
"""
import json
import os
from contextlib import asynccontextmanager
from typing import Any
from urllib.parse import urljoin
import httpx
from dotenv import load_dotenv
from fastmcp import FastMCP
from starlette.applications import Starlette
from starlette.responses import JSONResponse
from starlette.routing import Mount, Route
load_dotenv()
# Configuration
GITEA_URL = os.getenv("GITEA_URL", "").rstrip("/")
GITEA_TOKEN = os.getenv("GITEA_TOKEN", "")
# Initialize MCP server
mcp = FastMCP(
"Gitea MCP",
instructions="MCP server for Gitea API - provides repository, issue, and user management",
)
class GiteaClient:
"""HTTP client for Gitea API with token authentication."""
def __init__(self, base_url: str, token: str):
self.base_url = base_url.rstrip("/")
self.api_url = f"{self.base_url}/api/v1"
self.token = token
self._client: httpx.AsyncClient | None = None
async def _get_client(self) -> httpx.AsyncClient:
if self._client is None:
self._client = httpx.AsyncClient(
headers={
"Authorization": f"token {self.token}",
"Accept": "application/json",
"Content-Type": "application/json",
},
timeout=30.0,
)
return self._client
async def close(self):
if self._client:
await self._client.aclose()
self._client = None
async def request(
self,
method: str,
endpoint: str,
params: dict[str, Any] | None = None,
json_body: dict[str, Any] | None = None,
) -> dict[str, Any] | list[Any]:
"""Execute an API request to Gitea."""
client = await self._get_client()
url = f"{self.api_url}{endpoint}"
try:
response = await client.request(
method=method.upper(),
url=url,
params=params,
json=json_body,
)
response.raise_for_status()
if response.status_code == 204:
return {"status": "success", "message": "No content"}
# Try to parse as JSON, fall back to text for endpoints like /logs
content_type = response.headers.get("content-type", "")
if "application/json" in content_type:
return response.json()
else:
# Return raw text wrapped in a dict for non-JSON responses
return {"content": response.text, "content_type": content_type}
except httpx.HTTPStatusError as e:
return {
"error": True,
"status_code": e.response.status_code,
"message": str(e),
"detail": e.response.text[:500] if e.response.text else None,
}
except Exception as e:
return {"error": True, "message": str(e)}
async def health_check(self) -> bool:
"""Check if Gitea is accessible."""
try:
client = await self._get_client()
response = await client.get(f"{self.api_url}/version")
return response.status_code == 200
except Exception:
return False
# Global client instance
client = GiteaClient(GITEA_URL, GITEA_TOKEN)
# =============================================================================
# Curated API Reference
# =============================================================================
API_REFERENCE = """
# Gitea API Quick Reference
For complete API documentation, see: {base_url}/api/swagger
## Common Endpoints for `gitea_api_call` tool
### Repository Operations
- GET `/repos/search?q={{keyword}}&limit=10` - Search repositories
- GET `/repos/{{owner}}/{{repo}}` - Get repository details
- POST `/repos/{{owner}}/{{repo}}` - Create repository (body: name, description, private)
- PATCH `/repos/{{owner}}/{{repo}}` - Update repository
- DELETE `/repos/{{owner}}/{{repo}}` - Delete repository
- GET `/repos/{{owner}}/{{repo}}/branches` - List branches
- GET `/repos/{{owner}}/{{repo}}/commits?limit=20` - List commits
### Issues
- GET `/repos/{{owner}}/{{repo}}/issues?state=open` - List issues
- POST `/repos/{{owner}}/{{repo}}/issues` - Create issue (body: title, body)
- GET `/repos/{{owner}}/{{repo}}/issues/{{index}}` - Get issue by number
- PATCH `/repos/{{owner}}/{{repo}}/issues/{{index}}` - Update issue
- POST `/repos/{{owner}}/{{repo}}/issues/{{index}}/comments` - Add comment (body: body)
### Pull Requests
- GET `/repos/{{owner}}/{{repo}}/pulls?state=open` - List PRs
- POST `/repos/{{owner}}/{{repo}}/pulls` - Create PR (body: title, head, base)
- GET `/repos/{{owner}}/{{repo}}/pulls/{{index}}` - Get PR by number
- POST `/repos/{{owner}}/{{repo}}/pulls/{{index}}/merge` - Merge PR
### File Operations
- GET `/repos/{{owner}}/{{repo}}/contents/{{filepath}}?ref={{branch}}` - Get file content
- PUT `/repos/{{owner}}/{{repo}}/contents/{{filepath}}` - Create/update file
- DELETE `/repos/{{owner}}/{{repo}}/contents/{{filepath}}` - Delete file
### Releases & Tags
- GET `/repos/{{owner}}/{{repo}}/releases` - List releases
- GET `/repos/{{owner}}/{{repo}}/releases/latest` - Get latest release
- POST `/repos/{{owner}}/{{repo}}/releases` - Create release
- GET `/repos/{{owner}}/{{repo}}/tags` - List tags
- POST `/repos/{{owner}}/{{repo}}/tags` - Create tag
### Actions/CI (Gitea Actions)
- GET `/repos/{{owner}}/{{repo}}/actions/tasks` - List all workflow runs
- GET `/repos/{{owner}}/{{repo}}/actions/runs/{{run_id}}/jobs` - List jobs for a run
- GET `/repos/{{owner}}/{{repo}}/actions/jobs/{{job_id}}/logs` - Get job logs
- Use `get_workflow_run_logs` tool for easy log retrieval by run number
### User & Organizations
- GET `/user` - Get authenticated user
- GET `/users/{{username}}` - Get user by username
- GET `/user/repos?limit=50` - List authenticated user's repos
- GET `/orgs/{{org}}` - Get organization
- GET `/orgs/{{org}}/repos` - List organization repos
- GET `/orgs/{{org}}/members` - List organization members
### Pagination
Most list endpoints support `page` and `limit` query parameters.
Default limit is usually 20-50 items.
""".format(base_url=GITEA_URL)
@mcp.resource("gitea://api-reference")
def get_api_reference() -> str:
"""Returns the Gitea API quick reference for using the gitea_api_call tool."""
return API_REFERENCE
# =============================================================================
# MCP Tools - Curated Operations
# =============================================================================
@mcp.tool()
async def get_my_user_info() -> str:
"""Get information about the authenticated user.
Returns the current user's profile including username, email, and permissions.
This is useful for determining the authenticated identity before other operations.
"""
result = await client.request("GET", "/user")
return json.dumps(result)
@mcp.tool()
async def search_repos(
keyword: str,
limit: int = 20,
private: bool | None = None,
archived: bool | None = None,
) -> str:
"""Search for repositories by keyword.
Args:
keyword: Search term to find in repository names/descriptions
limit: Maximum number of results (default: 20, max: 50)
private: Filter by private status (optional)
archived: Filter by archived status (optional)
"""
params: dict[str, Any] = {"q": keyword, "limit": min(limit, 50)}
if private is not None:
params["private"] = private
if archived is not None:
params["archived"] = archived
result = await client.request("GET", "/repos/search", params=params)
return json.dumps(result)
@mcp.tool()
async def list_my_repos(
limit: int = 50,
page: int = 1,
) -> str:
"""List repositories owned by or accessible to the authenticated user.
Args:
limit: Maximum number of results per page (default: 50)
page: Page number for pagination (default: 1)
"""
params = {"limit": limit, "page": page}
result = await client.request("GET", "/user/repos", params=params)
return json.dumps(result)
@mcp.tool()
async def get_repo(owner: str, repo: str) -> str:
"""Get detailed information about a specific repository.
Args:
owner: Repository owner (username or organization)
repo: Repository name
"""
result = await client.request("GET", f"/repos/{owner}/{repo}")
return json.dumps(result)
@mcp.tool()
async def list_repo_issues(
owner: str,
repo: str,
state: str = "open",
limit: int = 30,
page: int = 1,
) -> str:
"""List issues for a repository.
Args:
owner: Repository owner (username or organization)
repo: Repository name
state: Issue state filter: 'open', 'closed', or 'all' (default: 'open')
limit: Maximum number of results (default: 30)
page: Page number for pagination (default: 1)
"""
params = {"state": state, "limit": limit, "page": page}
result = await client.request("GET", f"/repos/{owner}/{repo}/issues", params=params)
return json.dumps(result)
@mcp.tool()
async def list_repo_commits(
owner: str,
repo: str,
sha: str | None = None,
limit: int = 30,
page: int = 1,
) -> str:
"""List commits for a repository.
Args:
owner: Repository owner (username or organization)
repo: Repository name
sha: SHA or branch to start listing from (default: default branch)
limit: Maximum number of results (default: 30)
page: Page number for pagination (default: 1)
"""
params: dict[str, Any] = {"limit": limit, "page": page}
if sha:
params["sha"] = sha
result = await client.request(
"GET", f"/repos/{owner}/{repo}/commits", params=params
)
return json.dumps(result)
@mcp.tool()
async def get_workflow_run_logs(
owner: str,
repo: str,
run_number: int | None = None,
tail_lines: int = 50,
) -> str:
"""Get the logs for a specific workflow run by run number.
This tool fetches the logs from Gitea Actions workflow runs.
Since logs can be very large, it returns only the last N lines by default.
Args:
owner: Repository owner (username or organization)
repo: Repository name
run_number: The workflow run number (visible in UI, e.g., #21). If not specified, uses the most recent run.
tail_lines: Number of lines to return from the end of the log (default: 50, use 0 for all)
Returns:
The workflow run status and log content (last N lines)
"""
# Step 1: List workflow runs to find the run and job ID
tasks_result = await client.request("GET", f"/repos/{owner}/{repo}/actions/tasks")
if isinstance(tasks_result, dict) and tasks_result.get("error"):
return json.dumps(tasks_result)
# Find the run matching the run_number, or use the most recent
workflow_runs = (
tasks_result.get("workflow_runs", []) if isinstance(tasks_result, dict) else []
)
if not workflow_runs:
return json.dumps(
{
"error": True,
"message": f"No workflow runs found in repository {owner}/{repo}",
}
)
target_run = None
if run_number is None:
# Use the most recent run (first in the list)
target_run = workflow_runs[0]
run_number = target_run.get("run_number")
else:
for run in workflow_runs:
if run.get("run_number") == run_number:
target_run = run
break
if not target_run:
return json.dumps(
{
"error": True,
"message": f"Workflow run #{run_number} not found in repository {owner}/{repo}",
"available_runs": [r.get("run_number") for r in workflow_runs[:10]],
}
)
run_id = target_run.get("id")
run_status = target_run.get("status")
run_title = target_run.get("display_title", "")
# Step 2: Get jobs list and find the job matching this run_number
# Note: Gitea's jobs endpoint uses different IDs than tasks, so we match by html_url
jobs_result = await client.request("GET", f"/repos/{owner}/{repo}/actions/jobs")
job_id = None
if isinstance(jobs_result, dict):
jobs = jobs_result.get("jobs", [])
# Find job whose html_url contains /runs/{run_number}/
run_url_pattern = f"/runs/{run_number}/"
for job in jobs:
html_url = job.get("html_url", "")
if run_url_pattern in html_url:
job_id = job.get("id")
break
if not job_id:
return json.dumps(
{
"run_number": run_number,
"run_id": run_id,
"status": run_status,
"title": run_title,
"error": "Could not find job ID for this run",
"detail": "The job may still be queued or the run data is not available",
}
)
# Step 3: Fetch the logs
logs_result = await client.request(
"GET", f"/repos/{owner}/{repo}/actions/jobs/{job_id}/logs"
)
if isinstance(logs_result, dict) and logs_result.get("error"):
return json.dumps(
{
"run_number": run_number,
"run_id": run_id,
"status": run_status,
"title": run_title,
"error": "Failed to fetch logs",
"detail": logs_result.get("message", "Unknown error"),
}
)
# Extract log content
log_content = ""
if isinstance(logs_result, dict):
log_content = logs_result.get("content", "")
elif isinstance(logs_result, str):
log_content = logs_result
lines = log_content.splitlines()
total_lines = len(lines)
# For failed jobs, try to find error context instead of just tail
if run_status == "failure" and tail_lines > 0 and total_lines > tail_lines:
# Error indicators to search for
error_patterns = [
"",
"Error:",
"ERROR:",
"error:",
"FAILED",
"Failed",
"fatal:",
"FATAL:",
"Exception:",
"exception:",
"Cannot find",
"not found",
"No such file",
"Permission denied",
"command not found",
"npm ERR!",
"pnpm ERR!",
"yarn error",
"TypeScript error",
"TS2",
"TS1", # TypeScript errors
"SyntaxError",
"ReferenceError",
"TypeError",
"Build failed",
"build failed",
"Compilation failed",
"exit code 1",
"exit code 2",
"exited with",
]
# Find lines containing errors
error_line_indices = []
for i, line in enumerate(lines):
for pattern in error_patterns:
if pattern in line:
error_line_indices.append(i)
break
if error_line_indices:
# Get unique error regions with context (5 lines before, 3 after)
context_before = 5
context_after = 3
selected_lines = set()
for idx in error_line_indices:
start = max(0, idx - context_before)
end = min(total_lines, idx + context_after + 1)
for i in range(start, end):
selected_lines.add(i)
# Cap at tail_lines to avoid overwhelming output
sorted_indices = sorted(selected_lines)
if len(sorted_indices) > tail_lines:
# Take the last error regions
sorted_indices = sorted_indices[-tail_lines:]
# Build output with line breaks between non-contiguous sections
output_parts = []
prev_idx = -2
for idx in sorted_indices:
if idx > prev_idx + 1:
if output_parts:
output_parts.append("...")
output_parts.append(lines[idx])
prev_idx = idx
log_content = (
f"... (showing {len(sorted_indices)} error-relevant lines of {total_lines} total) ...\n"
+ "\n".join(output_parts)
)
else:
# No errors found, fall back to tail
lines = lines[-tail_lines:]
log_content = (
f"... (showing last {tail_lines} of {total_lines} lines) ...\n"
+ "\n".join(lines)
)
elif tail_lines > 0 and total_lines > tail_lines:
# Not a failure or no smart extraction, just tail
lines = lines[-tail_lines:]
log_content = (
f"... (showing last {tail_lines} of {total_lines} lines) ...\n"
+ "\n".join(lines)
)
else:
log_content = "\n".join(lines)
return json.dumps(
{
"run_number": run_number,
"run_id": run_id,
"status": run_status,
"title": run_title,
"job_id": job_id,
"logs": log_content,
}
)
# =============================================================================
# API Pass-through Tool
# =============================================================================
@mcp.tool()
async def gitea_api_call(
endpoint: str,
method: str = "GET",
params: str | dict[str, Any] = "{}",
body: str | dict[str, Any] = "{}",
) -> str:
"""Execute a raw API call to Gitea.
Use this for any operation not covered by the other tools.
Refer to the 'gitea://api-reference' resource for common endpoints,
or see the full API docs at {base_url}/api/swagger
Args:
endpoint: API endpoint path (e.g., '/repos/owner/repo/releases')
method: HTTP method (GET, POST, PUT, PATCH, DELETE)
params: JSON string or dict of query parameters (optional)
body: JSON string or dict of request body for POST/PUT/PATCH (optional)
Example:
gitea_api_call('/repos/myorg/myrepo/releases', 'POST',
body='{{"tag_name": "v1.0.0", "name": "Release 1.0"}}')
""".format(base_url=GITEA_URL)
try:
# Handle params: accept both string and dict
if isinstance(params, str):
params_dict = json.loads(params) if params else {}
else:
params_dict = params or {}
# Handle body: accept both string and dict
if isinstance(body, str):
body_dict = json.loads(body) if body else {}
else:
body_dict = body or {}
except json.JSONDecodeError as e:
return json.dumps({"error": True, "message": f"Invalid JSON: {e}"})
result = await client.request(
method=method,
endpoint=endpoint,
params=params_dict if params_dict else None,
json_body=body_dict if body_dict else None,
)
return json.dumps(result)
# =============================================================================
# Starlette Wrapper for Health Checks
# =============================================================================
async def health_check(request):
"""Health check endpoint for Docker/Kubernetes.
This is a liveness probe - it just confirms the server is running.
Gitea connectivity is validated when tools are actually called.
"""
return JSONResponse({"status": "ok", "gitea_url": GITEA_URL})
@asynccontextmanager
async def lifespan(app):
"""Manage client lifecycle."""
yield
await client.close()
def create_app() -> Starlette:
"""Create the Starlette application with health check and MCP."""
mcp_app = mcp.http_app()
# Add health check route directly to the MCP app
mcp_app.add_route("/health", health_check, methods=["GET"])
return mcp_app
app = create_app()
if __name__ == "__main__":
import uvicorn
port = int(os.getenv("PORT", "8000"))
uvicorn.run(app, host="0.0.0.0", port=port)