All checks were successful
Build and Push Docker Image / build (push) Successful in 1m32s
- 3 specific tools: ping, list_domains, list_dns_records - 1 pass-through tool: porkbun_api for full API access - Safety system with READ/WRITE/INFRA access levels - Embedded API documentation as MCP resource - Starlette wrapper with /health endpoint - Gitea Actions CI workflow for Docker build
419 lines
11 KiB
Python
419 lines
11 KiB
Python
"""Porkbun DNS MCP Light Server.
|
|
|
|
A lightweight MCP server for managing DNS records via the Porkbun API.
|
|
Implements a safety system with READ, WRITE, and INFRA access levels.
|
|
"""
|
|
|
|
import json
|
|
import os
|
|
from typing import Any
|
|
|
|
import httpx
|
|
from dotenv import load_dotenv
|
|
from fastmcp import FastMCP
|
|
from starlette.applications import Starlette
|
|
from starlette.routing import Mount, Route
|
|
from starlette.responses import JSONResponse
|
|
|
|
load_dotenv()
|
|
|
|
# Configuration
|
|
PORKBUN_API_KEY = os.getenv("PORKBUN_API_KEY", "")
|
|
PORKBUN_SECRET_KEY = os.getenv("PORKBUN_SECRET_KEY", "")
|
|
PORKBUN_API_BASE = os.getenv("PORKBUN_API_BASE", "https://api.porkbun.com/api/json/v3")
|
|
|
|
# Safety controls (defaults to read-only)
|
|
ALLOW_WRITES = os.getenv("PORKBUN_ALLOW_WRITES", "false").lower() == "true"
|
|
ALLOW_INFRA = os.getenv("PORKBUN_ALLOW_INFRA", "false").lower() == "true"
|
|
|
|
# Endpoint classifications for safety system
|
|
INFRA_ENDPOINTS = [
|
|
"/domain/updateNs/",
|
|
"/domain/createGlue/",
|
|
"/domain/updateGlue/",
|
|
"/domain/deleteGlue/",
|
|
]
|
|
|
|
WRITE_ENDPOINTS = [
|
|
"/dns/create/",
|
|
"/dns/edit/",
|
|
"/dns/editByNameType/",
|
|
"/dns/delete/",
|
|
"/dns/deleteByNameType/",
|
|
"/dns/createDnssecRecord/",
|
|
"/dns/deleteDnssecRecord/",
|
|
"/domain/addUrlForward/",
|
|
"/domain/deleteUrlForward/",
|
|
]
|
|
|
|
# API Documentation for agent self-service
|
|
API_DOCS = """
|
|
# Porkbun API Reference
|
|
|
|
Base URL: https://api.porkbun.com/api/json/v3
|
|
All endpoints use POST method. Authentication is handled automatically.
|
|
|
|
## Safety Levels
|
|
- READ: Default, all retrieve/list operations
|
|
- WRITE: Requires PORKBUN_ALLOW_WRITES=true - DNS record modifications
|
|
- INFRA: Requires PORKBUN_ALLOW_INFRA=true - Nameserver and glue record changes
|
|
|
|
## DNS Record Management
|
|
|
|
### Create DNS Record
|
|
Endpoint: /dns/create/{DOMAIN}
|
|
Body: {"name": "subdomain", "type": "A", "content": "1.2.3.4", "ttl": "600", "prio": "0"}
|
|
- name: Subdomain (blank for root, * for wildcard)
|
|
- type: A, AAAA, MX, CNAME, ALIAS, TXT, NS, SRV, TLSA, CAA, HTTPS, SVCB, SSHFP
|
|
- content: Record value
|
|
- ttl: Time to live (minimum 600)
|
|
- prio: Priority (for MX, SRV)
|
|
|
|
### Edit DNS Record by ID
|
|
Endpoint: /dns/edit/{DOMAIN}/{RECORD_ID}
|
|
Body: {"name": "subdomain", "type": "A", "content": "1.2.3.5", "ttl": "600"}
|
|
|
|
### Edit DNS Record by Name/Type
|
|
Endpoint: /dns/editByNameType/{DOMAIN}/{TYPE}/{SUBDOMAIN}
|
|
Body: {"content": "1.2.3.5", "ttl": "600"}
|
|
|
|
### Delete DNS Record by ID
|
|
Endpoint: /dns/delete/{DOMAIN}/{RECORD_ID}
|
|
Body: {}
|
|
|
|
### Delete DNS Record by Name/Type
|
|
Endpoint: /dns/deleteByNameType/{DOMAIN}/{TYPE}/{SUBDOMAIN}
|
|
Body: {}
|
|
|
|
### Retrieve All DNS Records
|
|
Endpoint: /dns/retrieve/{DOMAIN}
|
|
Body: {}
|
|
|
|
### Retrieve DNS Record by ID
|
|
Endpoint: /dns/retrieve/{DOMAIN}/{RECORD_ID}
|
|
Body: {}
|
|
|
|
### Retrieve DNS Records by Name/Type
|
|
Endpoint: /dns/retrieveByNameType/{DOMAIN}/{TYPE}/{SUBDOMAIN}
|
|
Body: {}
|
|
|
|
## DNSSEC Management
|
|
|
|
### Create DNSSEC Record
|
|
Endpoint: /dns/createDnssecRecord/{DOMAIN}
|
|
Body: {"keyTag": "64087", "alg": "13", "digestType": "2", "digest": "..."}
|
|
|
|
### Get DNSSEC Records
|
|
Endpoint: /dns/getDnssecRecords/{DOMAIN}
|
|
Body: {}
|
|
|
|
### Delete DNSSEC Record
|
|
Endpoint: /dns/deleteDnssecRecord/{DOMAIN}/{KEYTAG}
|
|
Body: {}
|
|
|
|
## Domain Management
|
|
|
|
### List All Domains
|
|
Endpoint: /domain/listAll
|
|
Body: {"start": "0", "includeLabels": "yes"}
|
|
|
|
### Get Nameservers
|
|
Endpoint: /domain/getNs/{DOMAIN}
|
|
Body: {}
|
|
|
|
### Update Nameservers (INFRA level)
|
|
Endpoint: /domain/updateNs/{DOMAIN}
|
|
Body: {"ns": ["ns1.example.com", "ns2.example.com"]}
|
|
|
|
### Check Domain Availability
|
|
Endpoint: /domain/checkDomain/{DOMAIN}
|
|
Body: {}
|
|
|
|
## URL Forwarding
|
|
|
|
### Add URL Forward
|
|
Endpoint: /domain/addUrlForward/{DOMAIN}
|
|
Body: {"subdomain": "", "location": "https://example.com", "type": "temporary", "includePath": "no", "wildcard": "yes"}
|
|
|
|
### Get URL Forwarding
|
|
Endpoint: /domain/getUrlForwarding/{DOMAIN}
|
|
Body: {}
|
|
|
|
### Delete URL Forward
|
|
Endpoint: /domain/deleteUrlForward/{DOMAIN}/{RECORD_ID}
|
|
Body: {}
|
|
|
|
## Glue Records (INFRA level)
|
|
|
|
### Create Glue Record
|
|
Endpoint: /domain/createGlue/{DOMAIN}/{SUBDOMAIN}
|
|
Body: {"ips": ["192.168.1.1", "2001:db8::1"]}
|
|
|
|
### Update Glue Record
|
|
Endpoint: /domain/updateGlue/{DOMAIN}/{SUBDOMAIN}
|
|
Body: {"ips": ["192.168.1.2"]}
|
|
|
|
### Delete Glue Record
|
|
Endpoint: /domain/deleteGlue/{DOMAIN}/{SUBDOMAIN}
|
|
Body: {}
|
|
|
|
### Get Glue Records
|
|
Endpoint: /domain/getGlue/{DOMAIN}
|
|
Body: {}
|
|
|
|
## SSL Certificates
|
|
|
|
### Retrieve SSL Bundle
|
|
Endpoint: /ssl/retrieve/{DOMAIN}
|
|
Body: {}
|
|
Returns: certificatechain, privatekey, publickey
|
|
|
|
## Utility
|
|
|
|
### Ping (Test Auth)
|
|
Endpoint: /ping
|
|
Body: {}
|
|
Returns: Your public IP address
|
|
"""
|
|
|
|
|
|
class PorkbunClient:
|
|
"""HTTP client for Porkbun API with safety controls."""
|
|
|
|
def __init__(self, api_key: str, secret_key: str, base_url: str):
|
|
self.api_key = api_key
|
|
self.secret_key = secret_key
|
|
self.base_url = base_url.rstrip("/")
|
|
self.client = httpx.Client(timeout=30.0)
|
|
|
|
def _get_auth_body(self) -> dict[str, str]:
|
|
"""Return authentication payload."""
|
|
return {
|
|
"apikey": self.api_key,
|
|
"secretapikey": self.secret_key,
|
|
}
|
|
|
|
def _classify_endpoint(self, endpoint: str) -> str:
|
|
"""Classify endpoint by safety level."""
|
|
if any(endpoint.startswith(e) for e in INFRA_ENDPOINTS):
|
|
return "INFRA"
|
|
if any(endpoint.startswith(e) for e in WRITE_ENDPOINTS):
|
|
return "WRITE"
|
|
return "READ"
|
|
|
|
def _validate_safety(self, endpoint: str) -> tuple[bool, str | None]:
|
|
"""Check if endpoint is allowed under current safety settings."""
|
|
level = self._classify_endpoint(endpoint)
|
|
|
|
if level == "INFRA":
|
|
if not ALLOW_INFRA:
|
|
return False, (
|
|
"INFRA operation blocked. This endpoint modifies domain infrastructure "
|
|
"(nameservers/glue records). Set PORKBUN_ALLOW_INFRA=true to enable."
|
|
)
|
|
if not ALLOW_WRITES:
|
|
return False, (
|
|
"INFRA operations require PORKBUN_ALLOW_WRITES=true in addition to "
|
|
"PORKBUN_ALLOW_INFRA=true."
|
|
)
|
|
|
|
if level == "WRITE":
|
|
if not ALLOW_WRITES:
|
|
return False, (
|
|
"WRITE operation blocked. This endpoint modifies DNS records. "
|
|
"Set PORKBUN_ALLOW_WRITES=true to enable."
|
|
)
|
|
|
|
return True, None
|
|
|
|
def request(
|
|
self, endpoint: str, body: dict[str, Any] | None = None
|
|
) -> dict[str, Any]:
|
|
"""Execute API request with safety checks."""
|
|
# Normalize endpoint
|
|
if not endpoint.startswith("/"):
|
|
endpoint = f"/{endpoint}"
|
|
|
|
# Safety validation
|
|
allowed, error_msg = self._validate_safety(endpoint)
|
|
if not allowed:
|
|
return {
|
|
"status": "BLOCKED",
|
|
"reason": error_msg,
|
|
"endpoint": endpoint,
|
|
"category": self._classify_endpoint(endpoint),
|
|
}
|
|
|
|
# Build request body with auth
|
|
request_body = self._get_auth_body()
|
|
if body:
|
|
request_body.update(body)
|
|
|
|
try:
|
|
response = self.client.post(
|
|
f"{self.base_url}{endpoint}",
|
|
json=request_body,
|
|
)
|
|
return response.json()
|
|
except httpx.HTTPError as e:
|
|
return {"status": "ERROR", "message": f"HTTP error: {e}"}
|
|
except json.JSONDecodeError:
|
|
return {"status": "ERROR", "message": "Invalid JSON response from API"}
|
|
|
|
def ping(self) -> dict[str, Any]:
|
|
"""Test API connectivity."""
|
|
return self.request("/ping")
|
|
|
|
def close(self):
|
|
"""Close HTTP client."""
|
|
self.client.close()
|
|
|
|
|
|
# Initialize client and MCP
|
|
client = PorkbunClient(PORKBUN_API_KEY, PORKBUN_SECRET_KEY, PORKBUN_API_BASE)
|
|
mcp = FastMCP("porkbun-dns")
|
|
|
|
|
|
# --- MCP Resources ---
|
|
|
|
|
|
@mcp.resource("porkbun://api-reference")
|
|
def get_api_docs() -> str:
|
|
"""Returns the Porkbun API documentation for using the porkbun_api tool."""
|
|
return API_DOCS
|
|
|
|
|
|
@mcp.resource("porkbun://safety-status")
|
|
def get_safety_status() -> str:
|
|
"""Returns current safety configuration."""
|
|
return json.dumps(
|
|
{
|
|
"allow_writes": ALLOW_WRITES,
|
|
"allow_infra": ALLOW_INFRA,
|
|
"mode": "INFRA" if ALLOW_INFRA else ("WRITE" if ALLOW_WRITES else "READ"),
|
|
},
|
|
indent=2,
|
|
)
|
|
|
|
|
|
# --- MCP Tools ---
|
|
|
|
|
|
@mcp.tool()
|
|
def ping() -> str:
|
|
"""Test API connectivity and return your public IP address.
|
|
|
|
Useful for verifying credentials and for dynamic DNS scenarios.
|
|
"""
|
|
result = client.ping()
|
|
return json.dumps(result, indent=2)
|
|
|
|
|
|
@mcp.tool()
|
|
def list_domains(start: int = 0, include_labels: bool = False) -> str:
|
|
"""List all domains in your Porkbun account.
|
|
|
|
Args:
|
|
start: Pagination offset (domains returned in chunks of 1000)
|
|
include_labels: Include domain labels in response
|
|
|
|
Returns:
|
|
JSON string with domain list and details
|
|
"""
|
|
body = {"start": str(start)}
|
|
if include_labels:
|
|
body["includeLabels"] = "yes"
|
|
|
|
result = client.request("/domain/listAll", body)
|
|
return json.dumps(result, indent=2)
|
|
|
|
|
|
@mcp.tool()
|
|
def list_dns_records(domain: str, record_id: str | None = None) -> str:
|
|
"""Retrieve DNS records for a domain.
|
|
|
|
Args:
|
|
domain: The domain name (e.g., 'example.com')
|
|
record_id: Optional specific record ID to retrieve
|
|
|
|
Returns:
|
|
JSON string with DNS records
|
|
"""
|
|
endpoint = f"/dns/retrieve/{domain}"
|
|
if record_id:
|
|
endpoint = f"{endpoint}/{record_id}"
|
|
|
|
result = client.request(endpoint)
|
|
return json.dumps(result, indent=2)
|
|
|
|
|
|
@mcp.tool()
|
|
def porkbun_api(endpoint: str, body: str = "{}") -> str:
|
|
"""Execute a raw Porkbun API call.
|
|
|
|
Use the 'porkbun://api-reference' resource for endpoint documentation.
|
|
Safety controls apply: WRITE and INFRA operations may be blocked.
|
|
|
|
Args:
|
|
endpoint: API path (e.g., '/dns/create/example.com')
|
|
body: JSON string of additional parameters (auth is added automatically)
|
|
|
|
Returns:
|
|
JSON string with API response
|
|
"""
|
|
try:
|
|
parsed_body = json.loads(body) if body else {}
|
|
except json.JSONDecodeError as e:
|
|
return json.dumps({"status": "ERROR", "message": f"Invalid JSON body: {e}"})
|
|
|
|
result = client.request(endpoint, parsed_body)
|
|
return json.dumps(result, indent=2)
|
|
|
|
|
|
# --- Health Check & App ---
|
|
|
|
|
|
async def health(request):
|
|
"""Health check endpoint for Docker/orchestration."""
|
|
if not PORKBUN_API_KEY or not PORKBUN_SECRET_KEY:
|
|
return JSONResponse(
|
|
{"status": "unhealthy", "reason": "Missing API credentials"},
|
|
status_code=503,
|
|
)
|
|
|
|
# Test API connectivity
|
|
result = client.ping()
|
|
if result.get("status") == "SUCCESS":
|
|
return JSONResponse(
|
|
{
|
|
"status": "healthy",
|
|
"mode": "INFRA"
|
|
if ALLOW_INFRA
|
|
else ("WRITE" if ALLOW_WRITES else "READ"),
|
|
}
|
|
)
|
|
|
|
return JSONResponse(
|
|
{"status": "unhealthy", "reason": result.get("message", "API ping failed")},
|
|
status_code=503,
|
|
)
|
|
|
|
|
|
def create_app() -> Starlette:
|
|
"""Create the ASGI application."""
|
|
mcp_app = mcp.http_app()
|
|
return Starlette(
|
|
routes=[
|
|
Route("/health", health),
|
|
Mount("/", app=mcp_app),
|
|
],
|
|
lifespan=mcp_app.lifespan,
|
|
)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
import uvicorn
|
|
|
|
port = int(os.getenv("PORT", "8000"))
|
|
uvicorn.run(create_app(), host="0.0.0.0", port=port)
|