All checks were successful
Build and Push Gitea MCP Docker Image / build (push) Successful in 17s
The gitea_api_call tool was failing when MCP clients passed JSON objects for the body or params parameters. The issue was that the tool signature only accepted string types, but MCP frameworks parse JSON strings into dicts before passing them to the tool. This fix updates the function signature to accept Union[str, dict] for both params and body parameters, and handles both cases appropriately. Fixes #1
615 lines
20 KiB
Python
615 lines
20 KiB
Python
"""
|
|
Gitea MCP Server - Hybrid MCP Light implementation for Gitea API.
|
|
|
|
Provides 5 curated tools plus an API pass-through for complete API coverage.
|
|
"""
|
|
|
|
import json
|
|
import os
|
|
from contextlib import asynccontextmanager
|
|
from typing import Any
|
|
from urllib.parse import urljoin
|
|
|
|
import httpx
|
|
from dotenv import load_dotenv
|
|
from fastmcp import FastMCP
|
|
from starlette.applications import Starlette
|
|
from starlette.responses import JSONResponse
|
|
from starlette.routing import Mount, Route
|
|
|
|
load_dotenv()
|
|
|
|
# Configuration
|
|
GITEA_URL = os.getenv("GITEA_URL", "").rstrip("/")
|
|
GITEA_TOKEN = os.getenv("GITEA_TOKEN", "")
|
|
|
|
# Initialize MCP server
|
|
mcp = FastMCP(
|
|
"Gitea MCP",
|
|
instructions="MCP server for Gitea API - provides repository, issue, and user management",
|
|
)
|
|
|
|
|
|
class GiteaClient:
|
|
"""HTTP client for Gitea API with token authentication."""
|
|
|
|
def __init__(self, base_url: str, token: str):
|
|
self.base_url = base_url.rstrip("/")
|
|
self.api_url = f"{self.base_url}/api/v1"
|
|
self.token = token
|
|
self._client: httpx.AsyncClient | None = None
|
|
|
|
async def _get_client(self) -> httpx.AsyncClient:
|
|
if self._client is None:
|
|
self._client = httpx.AsyncClient(
|
|
headers={
|
|
"Authorization": f"token {self.token}",
|
|
"Accept": "application/json",
|
|
"Content-Type": "application/json",
|
|
},
|
|
timeout=30.0,
|
|
)
|
|
return self._client
|
|
|
|
async def close(self):
|
|
if self._client:
|
|
await self._client.aclose()
|
|
self._client = None
|
|
|
|
async def request(
|
|
self,
|
|
method: str,
|
|
endpoint: str,
|
|
params: dict[str, Any] | None = None,
|
|
json_body: dict[str, Any] | None = None,
|
|
) -> dict[str, Any] | list[Any]:
|
|
"""Execute an API request to Gitea."""
|
|
client = await self._get_client()
|
|
url = f"{self.api_url}{endpoint}"
|
|
|
|
try:
|
|
response = await client.request(
|
|
method=method.upper(),
|
|
url=url,
|
|
params=params,
|
|
json=json_body,
|
|
)
|
|
response.raise_for_status()
|
|
|
|
if response.status_code == 204:
|
|
return {"status": "success", "message": "No content"}
|
|
|
|
# Try to parse as JSON, fall back to text for endpoints like /logs
|
|
content_type = response.headers.get("content-type", "")
|
|
if "application/json" in content_type:
|
|
return response.json()
|
|
else:
|
|
# Return raw text wrapped in a dict for non-JSON responses
|
|
return {"content": response.text, "content_type": content_type}
|
|
except httpx.HTTPStatusError as e:
|
|
return {
|
|
"error": True,
|
|
"status_code": e.response.status_code,
|
|
"message": str(e),
|
|
"detail": e.response.text[:500] if e.response.text else None,
|
|
}
|
|
except Exception as e:
|
|
return {"error": True, "message": str(e)}
|
|
|
|
async def health_check(self) -> bool:
|
|
"""Check if Gitea is accessible."""
|
|
try:
|
|
client = await self._get_client()
|
|
response = await client.get(f"{self.api_url}/version")
|
|
return response.status_code == 200
|
|
except Exception:
|
|
return False
|
|
|
|
|
|
# Global client instance
|
|
client = GiteaClient(GITEA_URL, GITEA_TOKEN)
|
|
|
|
|
|
# =============================================================================
|
|
# Curated API Reference
|
|
# =============================================================================
|
|
|
|
API_REFERENCE = """
|
|
# Gitea API Quick Reference
|
|
|
|
For complete API documentation, see: {base_url}/api/swagger
|
|
|
|
## Common Endpoints for `gitea_api_call` tool
|
|
|
|
### Repository Operations
|
|
- GET `/repos/search?q={{keyword}}&limit=10` - Search repositories
|
|
- GET `/repos/{{owner}}/{{repo}}` - Get repository details
|
|
- POST `/repos/{{owner}}/{{repo}}` - Create repository (body: name, description, private)
|
|
- PATCH `/repos/{{owner}}/{{repo}}` - Update repository
|
|
- DELETE `/repos/{{owner}}/{{repo}}` - Delete repository
|
|
- GET `/repos/{{owner}}/{{repo}}/branches` - List branches
|
|
- GET `/repos/{{owner}}/{{repo}}/commits?limit=20` - List commits
|
|
|
|
### Issues
|
|
- GET `/repos/{{owner}}/{{repo}}/issues?state=open` - List issues
|
|
- POST `/repos/{{owner}}/{{repo}}/issues` - Create issue (body: title, body)
|
|
- GET `/repos/{{owner}}/{{repo}}/issues/{{index}}` - Get issue by number
|
|
- PATCH `/repos/{{owner}}/{{repo}}/issues/{{index}}` - Update issue
|
|
- POST `/repos/{{owner}}/{{repo}}/issues/{{index}}/comments` - Add comment (body: body)
|
|
|
|
### Pull Requests
|
|
- GET `/repos/{{owner}}/{{repo}}/pulls?state=open` - List PRs
|
|
- POST `/repos/{{owner}}/{{repo}}/pulls` - Create PR (body: title, head, base)
|
|
- GET `/repos/{{owner}}/{{repo}}/pulls/{{index}}` - Get PR by number
|
|
- POST `/repos/{{owner}}/{{repo}}/pulls/{{index}}/merge` - Merge PR
|
|
|
|
### File Operations
|
|
- GET `/repos/{{owner}}/{{repo}}/contents/{{filepath}}?ref={{branch}}` - Get file content
|
|
- PUT `/repos/{{owner}}/{{repo}}/contents/{{filepath}}` - Create/update file
|
|
- DELETE `/repos/{{owner}}/{{repo}}/contents/{{filepath}}` - Delete file
|
|
|
|
### Releases & Tags
|
|
- GET `/repos/{{owner}}/{{repo}}/releases` - List releases
|
|
- GET `/repos/{{owner}}/{{repo}}/releases/latest` - Get latest release
|
|
- POST `/repos/{{owner}}/{{repo}}/releases` - Create release
|
|
- GET `/repos/{{owner}}/{{repo}}/tags` - List tags
|
|
- POST `/repos/{{owner}}/{{repo}}/tags` - Create tag
|
|
|
|
### Actions/CI (Gitea Actions)
|
|
- GET `/repos/{{owner}}/{{repo}}/actions/tasks` - List all workflow runs
|
|
- GET `/repos/{{owner}}/{{repo}}/actions/runs/{{run_id}}/jobs` - List jobs for a run
|
|
- GET `/repos/{{owner}}/{{repo}}/actions/jobs/{{job_id}}/logs` - Get job logs
|
|
- Use `get_workflow_run_logs` tool for easy log retrieval by run number
|
|
|
|
### User & Organizations
|
|
- GET `/user` - Get authenticated user
|
|
- GET `/users/{{username}}` - Get user by username
|
|
- GET `/user/repos?limit=50` - List authenticated user's repos
|
|
- GET `/orgs/{{org}}` - Get organization
|
|
- GET `/orgs/{{org}}/repos` - List organization repos
|
|
- GET `/orgs/{{org}}/members` - List organization members
|
|
|
|
### Pagination
|
|
Most list endpoints support `page` and `limit` query parameters.
|
|
Default limit is usually 20-50 items.
|
|
""".format(base_url=GITEA_URL)
|
|
|
|
|
|
@mcp.resource("gitea://api-reference")
|
|
def get_api_reference() -> str:
|
|
"""Returns the Gitea API quick reference for using the gitea_api_call tool."""
|
|
return API_REFERENCE
|
|
|
|
|
|
# =============================================================================
|
|
# MCP Tools - Curated Operations
|
|
# =============================================================================
|
|
|
|
|
|
@mcp.tool()
|
|
async def get_my_user_info() -> str:
|
|
"""Get information about the authenticated user.
|
|
|
|
Returns the current user's profile including username, email, and permissions.
|
|
This is useful for determining the authenticated identity before other operations.
|
|
"""
|
|
result = await client.request("GET", "/user")
|
|
return json.dumps(result)
|
|
|
|
|
|
@mcp.tool()
|
|
async def search_repos(
|
|
keyword: str,
|
|
limit: int = 20,
|
|
private: bool | None = None,
|
|
archived: bool | None = None,
|
|
) -> str:
|
|
"""Search for repositories by keyword.
|
|
|
|
Args:
|
|
keyword: Search term to find in repository names/descriptions
|
|
limit: Maximum number of results (default: 20, max: 50)
|
|
private: Filter by private status (optional)
|
|
archived: Filter by archived status (optional)
|
|
"""
|
|
params: dict[str, Any] = {"q": keyword, "limit": min(limit, 50)}
|
|
if private is not None:
|
|
params["private"] = private
|
|
if archived is not None:
|
|
params["archived"] = archived
|
|
|
|
result = await client.request("GET", "/repos/search", params=params)
|
|
return json.dumps(result)
|
|
|
|
|
|
@mcp.tool()
|
|
async def list_my_repos(
|
|
limit: int = 50,
|
|
page: int = 1,
|
|
) -> str:
|
|
"""List repositories owned by or accessible to the authenticated user.
|
|
|
|
Args:
|
|
limit: Maximum number of results per page (default: 50)
|
|
page: Page number for pagination (default: 1)
|
|
"""
|
|
params = {"limit": limit, "page": page}
|
|
result = await client.request("GET", "/user/repos", params=params)
|
|
return json.dumps(result)
|
|
|
|
|
|
@mcp.tool()
|
|
async def get_repo(owner: str, repo: str) -> str:
|
|
"""Get detailed information about a specific repository.
|
|
|
|
Args:
|
|
owner: Repository owner (username or organization)
|
|
repo: Repository name
|
|
"""
|
|
result = await client.request("GET", f"/repos/{owner}/{repo}")
|
|
return json.dumps(result)
|
|
|
|
|
|
@mcp.tool()
|
|
async def list_repo_issues(
|
|
owner: str,
|
|
repo: str,
|
|
state: str = "open",
|
|
limit: int = 30,
|
|
page: int = 1,
|
|
) -> str:
|
|
"""List issues for a repository.
|
|
|
|
Args:
|
|
owner: Repository owner (username or organization)
|
|
repo: Repository name
|
|
state: Issue state filter: 'open', 'closed', or 'all' (default: 'open')
|
|
limit: Maximum number of results (default: 30)
|
|
page: Page number for pagination (default: 1)
|
|
"""
|
|
params = {"state": state, "limit": limit, "page": page}
|
|
result = await client.request("GET", f"/repos/{owner}/{repo}/issues", params=params)
|
|
return json.dumps(result)
|
|
|
|
|
|
@mcp.tool()
|
|
async def list_repo_commits(
|
|
owner: str,
|
|
repo: str,
|
|
sha: str | None = None,
|
|
limit: int = 30,
|
|
page: int = 1,
|
|
) -> str:
|
|
"""List commits for a repository.
|
|
|
|
Args:
|
|
owner: Repository owner (username or organization)
|
|
repo: Repository name
|
|
sha: SHA or branch to start listing from (default: default branch)
|
|
limit: Maximum number of results (default: 30)
|
|
page: Page number for pagination (default: 1)
|
|
"""
|
|
params: dict[str, Any] = {"limit": limit, "page": page}
|
|
if sha:
|
|
params["sha"] = sha
|
|
|
|
result = await client.request(
|
|
"GET", f"/repos/{owner}/{repo}/commits", params=params
|
|
)
|
|
return json.dumps(result)
|
|
|
|
|
|
@mcp.tool()
|
|
async def get_workflow_run_logs(
|
|
owner: str,
|
|
repo: str,
|
|
run_number: int | None = None,
|
|
tail_lines: int = 50,
|
|
) -> str:
|
|
"""Get the logs for a specific workflow run by run number.
|
|
|
|
This tool fetches the logs from Gitea Actions workflow runs.
|
|
Since logs can be very large, it returns only the last N lines by default.
|
|
|
|
Args:
|
|
owner: Repository owner (username or organization)
|
|
repo: Repository name
|
|
run_number: The workflow run number (visible in UI, e.g., #21). If not specified, uses the most recent run.
|
|
tail_lines: Number of lines to return from the end of the log (default: 50, use 0 for all)
|
|
|
|
Returns:
|
|
The workflow run status and log content (last N lines)
|
|
"""
|
|
# Step 1: List workflow runs to find the run and job ID
|
|
tasks_result = await client.request("GET", f"/repos/{owner}/{repo}/actions/tasks")
|
|
|
|
if isinstance(tasks_result, dict) and tasks_result.get("error"):
|
|
return json.dumps(tasks_result)
|
|
|
|
# Find the run matching the run_number, or use the most recent
|
|
workflow_runs = (
|
|
tasks_result.get("workflow_runs", []) if isinstance(tasks_result, dict) else []
|
|
)
|
|
|
|
if not workflow_runs:
|
|
return json.dumps(
|
|
{
|
|
"error": True,
|
|
"message": f"No workflow runs found in repository {owner}/{repo}",
|
|
}
|
|
)
|
|
|
|
target_run = None
|
|
if run_number is None:
|
|
# Use the most recent run (first in the list)
|
|
target_run = workflow_runs[0]
|
|
run_number = target_run.get("run_number")
|
|
else:
|
|
for run in workflow_runs:
|
|
if run.get("run_number") == run_number:
|
|
target_run = run
|
|
break
|
|
|
|
if not target_run:
|
|
return json.dumps(
|
|
{
|
|
"error": True,
|
|
"message": f"Workflow run #{run_number} not found in repository {owner}/{repo}",
|
|
"available_runs": [r.get("run_number") for r in workflow_runs[:10]],
|
|
}
|
|
)
|
|
|
|
run_id = target_run.get("id")
|
|
run_status = target_run.get("status")
|
|
run_title = target_run.get("display_title", "")
|
|
|
|
# Step 2: Get jobs list and find the job matching this run_number
|
|
# Note: Gitea's jobs endpoint uses different IDs than tasks, so we match by html_url
|
|
jobs_result = await client.request("GET", f"/repos/{owner}/{repo}/actions/jobs")
|
|
|
|
job_id = None
|
|
if isinstance(jobs_result, dict):
|
|
jobs = jobs_result.get("jobs", [])
|
|
# Find job whose html_url contains /runs/{run_number}/
|
|
run_url_pattern = f"/runs/{run_number}/"
|
|
for job in jobs:
|
|
html_url = job.get("html_url", "")
|
|
if run_url_pattern in html_url:
|
|
job_id = job.get("id")
|
|
break
|
|
|
|
if not job_id:
|
|
return json.dumps(
|
|
{
|
|
"run_number": run_number,
|
|
"run_id": run_id,
|
|
"status": run_status,
|
|
"title": run_title,
|
|
"error": "Could not find job ID for this run",
|
|
"detail": "The job may still be queued or the run data is not available",
|
|
}
|
|
)
|
|
|
|
# Step 3: Fetch the logs
|
|
logs_result = await client.request(
|
|
"GET", f"/repos/{owner}/{repo}/actions/jobs/{job_id}/logs"
|
|
)
|
|
|
|
if isinstance(logs_result, dict) and logs_result.get("error"):
|
|
return json.dumps(
|
|
{
|
|
"run_number": run_number,
|
|
"run_id": run_id,
|
|
"status": run_status,
|
|
"title": run_title,
|
|
"error": "Failed to fetch logs",
|
|
"detail": logs_result.get("message", "Unknown error"),
|
|
}
|
|
)
|
|
|
|
# Extract log content
|
|
log_content = ""
|
|
if isinstance(logs_result, dict):
|
|
log_content = logs_result.get("content", "")
|
|
elif isinstance(logs_result, str):
|
|
log_content = logs_result
|
|
|
|
lines = log_content.splitlines()
|
|
total_lines = len(lines)
|
|
|
|
# For failed jobs, try to find error context instead of just tail
|
|
if run_status == "failure" and tail_lines > 0 and total_lines > tail_lines:
|
|
# Error indicators to search for
|
|
error_patterns = [
|
|
"❌",
|
|
"Error:",
|
|
"ERROR:",
|
|
"error:",
|
|
"FAILED",
|
|
"Failed",
|
|
"fatal:",
|
|
"FATAL:",
|
|
"Exception:",
|
|
"exception:",
|
|
"Cannot find",
|
|
"not found",
|
|
"No such file",
|
|
"Permission denied",
|
|
"command not found",
|
|
"npm ERR!",
|
|
"pnpm ERR!",
|
|
"yarn error",
|
|
"TypeScript error",
|
|
"TS2",
|
|
"TS1", # TypeScript errors
|
|
"SyntaxError",
|
|
"ReferenceError",
|
|
"TypeError",
|
|
"Build failed",
|
|
"build failed",
|
|
"Compilation failed",
|
|
"exit code 1",
|
|
"exit code 2",
|
|
"exited with",
|
|
]
|
|
|
|
# Find lines containing errors
|
|
error_line_indices = []
|
|
for i, line in enumerate(lines):
|
|
for pattern in error_patterns:
|
|
if pattern in line:
|
|
error_line_indices.append(i)
|
|
break
|
|
|
|
if error_line_indices:
|
|
# Get unique error regions with context (5 lines before, 3 after)
|
|
context_before = 5
|
|
context_after = 3
|
|
selected_lines = set()
|
|
|
|
for idx in error_line_indices:
|
|
start = max(0, idx - context_before)
|
|
end = min(total_lines, idx + context_after + 1)
|
|
for i in range(start, end):
|
|
selected_lines.add(i)
|
|
|
|
# Cap at tail_lines to avoid overwhelming output
|
|
sorted_indices = sorted(selected_lines)
|
|
if len(sorted_indices) > tail_lines:
|
|
# Take the last error regions
|
|
sorted_indices = sorted_indices[-tail_lines:]
|
|
|
|
# Build output with line breaks between non-contiguous sections
|
|
output_parts = []
|
|
prev_idx = -2
|
|
for idx in sorted_indices:
|
|
if idx > prev_idx + 1:
|
|
if output_parts:
|
|
output_parts.append("...")
|
|
output_parts.append(lines[idx])
|
|
prev_idx = idx
|
|
|
|
log_content = (
|
|
f"... (showing {len(sorted_indices)} error-relevant lines of {total_lines} total) ...\n"
|
|
+ "\n".join(output_parts)
|
|
)
|
|
else:
|
|
# No errors found, fall back to tail
|
|
lines = lines[-tail_lines:]
|
|
log_content = (
|
|
f"... (showing last {tail_lines} of {total_lines} lines) ...\n"
|
|
+ "\n".join(lines)
|
|
)
|
|
elif tail_lines > 0 and total_lines > tail_lines:
|
|
# Not a failure or no smart extraction, just tail
|
|
lines = lines[-tail_lines:]
|
|
log_content = (
|
|
f"... (showing last {tail_lines} of {total_lines} lines) ...\n"
|
|
+ "\n".join(lines)
|
|
)
|
|
else:
|
|
log_content = "\n".join(lines)
|
|
|
|
return json.dumps(
|
|
{
|
|
"run_number": run_number,
|
|
"run_id": run_id,
|
|
"status": run_status,
|
|
"title": run_title,
|
|
"job_id": job_id,
|
|
"logs": log_content,
|
|
}
|
|
)
|
|
|
|
|
|
# =============================================================================
|
|
# API Pass-through Tool
|
|
# =============================================================================
|
|
|
|
|
|
@mcp.tool()
|
|
async def gitea_api_call(
|
|
endpoint: str,
|
|
method: str = "GET",
|
|
params: str | dict[str, Any] = "{}",
|
|
body: str | dict[str, Any] = "{}",
|
|
) -> str:
|
|
"""Execute a raw API call to Gitea.
|
|
|
|
Use this for any operation not covered by the other tools.
|
|
Refer to the 'gitea://api-reference' resource for common endpoints,
|
|
or see the full API docs at {base_url}/api/swagger
|
|
|
|
Args:
|
|
endpoint: API endpoint path (e.g., '/repos/owner/repo/releases')
|
|
method: HTTP method (GET, POST, PUT, PATCH, DELETE)
|
|
params: JSON string or dict of query parameters (optional)
|
|
body: JSON string or dict of request body for POST/PUT/PATCH (optional)
|
|
|
|
Example:
|
|
gitea_api_call('/repos/myorg/myrepo/releases', 'POST',
|
|
body='{{"tag_name": "v1.0.0", "name": "Release 1.0"}}')
|
|
""".format(base_url=GITEA_URL)
|
|
try:
|
|
# Handle params: accept both string and dict
|
|
if isinstance(params, str):
|
|
params_dict = json.loads(params) if params else {}
|
|
else:
|
|
params_dict = params or {}
|
|
|
|
# Handle body: accept both string and dict
|
|
if isinstance(body, str):
|
|
body_dict = json.loads(body) if body else {}
|
|
else:
|
|
body_dict = body or {}
|
|
except json.JSONDecodeError as e:
|
|
return json.dumps({"error": True, "message": f"Invalid JSON: {e}"})
|
|
|
|
result = await client.request(
|
|
method=method,
|
|
endpoint=endpoint,
|
|
params=params_dict if params_dict else None,
|
|
json_body=body_dict if body_dict else None,
|
|
)
|
|
return json.dumps(result)
|
|
|
|
|
|
# =============================================================================
|
|
# Starlette Wrapper for Health Checks
|
|
# =============================================================================
|
|
|
|
|
|
async def health_check(request):
|
|
"""Health check endpoint for Docker/Kubernetes.
|
|
|
|
This is a liveness probe - it just confirms the server is running.
|
|
Gitea connectivity is validated when tools are actually called.
|
|
"""
|
|
return JSONResponse({"status": "ok", "gitea_url": GITEA_URL})
|
|
|
|
|
|
@asynccontextmanager
|
|
async def lifespan(app):
|
|
"""Manage client lifecycle."""
|
|
yield
|
|
await client.close()
|
|
|
|
|
|
def create_app() -> Starlette:
|
|
"""Create the Starlette application with health check and MCP."""
|
|
mcp_app = mcp.http_app()
|
|
|
|
# Add health check route directly to the MCP app
|
|
mcp_app.add_route("/health", health_check, methods=["GET"])
|
|
|
|
return mcp_app
|
|
|
|
|
|
app = create_app()
|
|
|
|
if __name__ == "__main__":
|
|
import uvicorn
|
|
|
|
port = int(os.getenv("PORT", "8000"))
|
|
uvicorn.run(app, host="0.0.0.0", port=port)
|