diff --git a/docker-compose.dev.yml b/docker-compose.dev.yml deleted file mode 100644 index 9a732e9..0000000 --- a/docker-compose.dev.yml +++ /dev/null @@ -1,17 +0,0 @@ -# Development compose file - builds from local source -# Usage: docker compose -f docker-compose.dev.yml up --build -services: - gitea-mcp: - build: . - container_name: gitea-mcp-dev - restart: "no" - ports: - - "8000:8000" - env_file: - - .env - healthcheck: - test: ["CMD", "curl", "-f", "http://localhost:8000/health"] - interval: 30s - timeout: 10s - retries: 3 - start_period: 5s diff --git a/server.py b/server.py index 983be97..f1585b5 100644 --- a/server.py +++ b/server.py @@ -156,8 +156,10 @@ For complete API documentation, see: {base_url}/api/swagger - POST `/repos/{{owner}}/{{repo}}/tags` - Create tag ### Actions/CI (Gitea Actions) -- GET `/repos/{{owner}}/{{repo}}/actions/runs` - List workflow runs -- GET `/repos/{{owner}}/{{repo}}/actions/jobs?status={{status}}` - List jobs +- GET `/repos/{{owner}}/{{repo}}/actions/tasks` - List all workflow runs +- GET `/repos/{{owner}}/{{repo}}/actions/runs/{{run_id}}/jobs` - List jobs for a run +- GET `/repos/{{owner}}/{{repo}}/actions/jobs/{{job_id}}/logs` - Get job logs +- Use `get_workflow_run_logs` tool for easy log retrieval by run number ### User & Organizations - GET `/user` - Get authenticated user @@ -295,6 +297,118 @@ async def list_repo_commits( return json.dumps(result) +@mcp.tool() +async def get_workflow_run_logs( + owner: str, + repo: str, + run_number: int | None = None, + tail_lines: int = 50, +) -> str: + """Get the logs for a specific workflow run by run number. + + This tool fetches the logs from Gitea Actions workflow runs. + Since logs can be very large, it returns only the last N lines by default. + + Args: + owner: Repository owner (username or organization) + repo: Repository name + run_number: The workflow run number (visible in UI, e.g., #21). If not specified, uses the most recent run. + tail_lines: Number of lines to return from the end of the log (default: 50, use 0 for all) + + Returns: + The workflow run status and log content (last N lines) + """ + # Step 1: List workflow runs to find the run and job ID + tasks_result = await client.request("GET", f"/repos/{owner}/{repo}/actions/tasks") + + if isinstance(tasks_result, dict) and tasks_result.get("error"): + return json.dumps(tasks_result) + + # Find the run matching the run_number, or use the most recent + workflow_runs = tasks_result.get("workflow_runs", []) if isinstance(tasks_result, dict) else [] + + if not workflow_runs: + return json.dumps({ + "error": True, + "message": f"No workflow runs found in repository {owner}/{repo}" + }) + + target_run = None + if run_number is None: + # Use the most recent run (first in the list) + target_run = workflow_runs[0] + run_number = target_run.get("run_number") + else: + for run in workflow_runs: + if run.get("run_number") == run_number: + target_run = run + break + + if not target_run: + return json.dumps({ + "error": True, + "message": f"Workflow run #{run_number} not found in repository {owner}/{repo}", + "available_runs": [r.get("run_number") for r in workflow_runs[:10]] + }) + + run_id = target_run.get("id") + run_status = target_run.get("status") + run_title = target_run.get("display_title", "") + + # Step 2: Get jobs for this run to find the job ID + # Note: Gitea's API uses the internal run ID, not run_number for jobs endpoint + jobs_result = await client.request("GET", f"/repos/{owner}/{repo}/actions/runs/{run_id}/jobs") + + job_id = None + if isinstance(jobs_result, dict): + jobs = jobs_result.get("jobs", []) + if jobs: + job_id = jobs[0].get("id") + + # If jobs endpoint didn't work, try to use run_id as job_id (they're sometimes the same) + if not job_id: + job_id = run_id + + # Step 3: Fetch the logs + logs_result = await client.request("GET", f"/repos/{owner}/{repo}/actions/jobs/{job_id}/logs") + + if isinstance(logs_result, dict) and logs_result.get("error"): + return json.dumps({ + "run_number": run_number, + "run_id": run_id, + "status": run_status, + "title": run_title, + "error": "Failed to fetch logs", + "detail": logs_result.get("message", "Unknown error") + }) + + # Extract log content + log_content = "" + if isinstance(logs_result, dict): + log_content = logs_result.get("content", "") + elif isinstance(logs_result, str): + log_content = logs_result + + # Apply tail if requested + if tail_lines > 0 and log_content: + lines = log_content.splitlines() + total_lines = len(lines) + if total_lines > tail_lines: + lines = lines[-tail_lines:] + log_content = f"... (showing last {tail_lines} of {total_lines} lines) ...\n" + "\n".join(lines) + else: + log_content = "\n".join(lines) + + return json.dumps({ + "run_number": run_number, + "run_id": run_id, + "status": run_status, + "title": run_title, + "job_id": job_id, + "logs": log_content + }) + + # ============================================================================= # API Pass-through Tool # =============================================================================