"""Porkbun DNS MCP Light Server. A lightweight MCP server for managing DNS records via the Porkbun API. Implements a safety system with READ, WRITE, and INFRA access levels. """ import json import os from typing import Any import httpx from dotenv import load_dotenv from fastmcp import FastMCP from starlette.applications import Starlette from starlette.routing import Mount, Route from starlette.responses import JSONResponse load_dotenv() # Configuration PORKBUN_API_KEY = os.getenv("PORKBUN_API_KEY", "") PORKBUN_SECRET_KEY = os.getenv("PORKBUN_SECRET_KEY", "") PORKBUN_API_BASE = os.getenv("PORKBUN_API_BASE", "https://api.porkbun.com/api/json/v3") # Safety controls (defaults to read-only) ALLOW_WRITES = os.getenv("PORKBUN_ALLOW_WRITES", "false").lower() == "true" ALLOW_INFRA = os.getenv("PORKBUN_ALLOW_INFRA", "false").lower() == "true" # Endpoint classifications for safety system INFRA_ENDPOINTS = [ "/domain/updateNs/", "/domain/createGlue/", "/domain/updateGlue/", "/domain/deleteGlue/", ] WRITE_ENDPOINTS = [ "/dns/create/", "/dns/edit/", "/dns/editByNameType/", "/dns/delete/", "/dns/deleteByNameType/", "/dns/createDnssecRecord/", "/dns/deleteDnssecRecord/", "/domain/addUrlForward/", "/domain/deleteUrlForward/", ] # API Documentation for agent self-service API_DOCS = """ # Porkbun API Reference Base URL: https://api.porkbun.com/api/json/v3 All endpoints use POST method. Authentication is handled automatically. ## Safety Levels - READ: Default, all retrieve/list operations - WRITE: Requires PORKBUN_ALLOW_WRITES=true - DNS record modifications - INFRA: Requires PORKBUN_ALLOW_INFRA=true - Nameserver and glue record changes ## DNS Record Management ### Create DNS Record Endpoint: /dns/create/{DOMAIN} Body: {"name": "subdomain", "type": "A", "content": "1.2.3.4", "ttl": "600", "prio": "0"} - name: Subdomain (blank for root, * for wildcard) - type: A, AAAA, MX, CNAME, ALIAS, TXT, NS, SRV, TLSA, CAA, HTTPS, SVCB, SSHFP - content: Record value - ttl: Time to live (minimum 600) - prio: Priority (for MX, SRV) ### Edit DNS Record by ID Endpoint: /dns/edit/{DOMAIN}/{RECORD_ID} Body: {"name": "subdomain", "type": "A", "content": "1.2.3.5", "ttl": "600"} ### Edit DNS Record by Name/Type Endpoint: /dns/editByNameType/{DOMAIN}/{TYPE}/{SUBDOMAIN} Body: {"content": "1.2.3.5", "ttl": "600"} ### Delete DNS Record by ID Endpoint: /dns/delete/{DOMAIN}/{RECORD_ID} Body: {} ### Delete DNS Record by Name/Type Endpoint: /dns/deleteByNameType/{DOMAIN}/{TYPE}/{SUBDOMAIN} Body: {} ### Retrieve All DNS Records Endpoint: /dns/retrieve/{DOMAIN} Body: {} ### Retrieve DNS Record by ID Endpoint: /dns/retrieve/{DOMAIN}/{RECORD_ID} Body: {} ### Retrieve DNS Records by Name/Type Endpoint: /dns/retrieveByNameType/{DOMAIN}/{TYPE}/{SUBDOMAIN} Body: {} ## DNSSEC Management ### Create DNSSEC Record Endpoint: /dns/createDnssecRecord/{DOMAIN} Body: {"keyTag": "64087", "alg": "13", "digestType": "2", "digest": "..."} ### Get DNSSEC Records Endpoint: /dns/getDnssecRecords/{DOMAIN} Body: {} ### Delete DNSSEC Record Endpoint: /dns/deleteDnssecRecord/{DOMAIN}/{KEYTAG} Body: {} ## Domain Management ### List All Domains Endpoint: /domain/listAll Body: {"start": "0", "includeLabels": "yes"} ### Get Nameservers Endpoint: /domain/getNs/{DOMAIN} Body: {} ### Update Nameservers (INFRA level) Endpoint: /domain/updateNs/{DOMAIN} Body: {"ns": ["ns1.example.com", "ns2.example.com"]} ### Check Domain Availability Endpoint: /domain/checkDomain/{DOMAIN} Body: {} ## URL Forwarding ### Add URL Forward Endpoint: /domain/addUrlForward/{DOMAIN} Body: {"subdomain": "", "location": "https://example.com", "type": "temporary", "includePath": "no", "wildcard": "yes"} ### Get URL Forwarding Endpoint: /domain/getUrlForwarding/{DOMAIN} Body: {} ### Delete URL Forward Endpoint: /domain/deleteUrlForward/{DOMAIN}/{RECORD_ID} Body: {} ## Glue Records (INFRA level) ### Create Glue Record Endpoint: /domain/createGlue/{DOMAIN}/{SUBDOMAIN} Body: {"ips": ["192.168.1.1", "2001:db8::1"]} ### Update Glue Record Endpoint: /domain/updateGlue/{DOMAIN}/{SUBDOMAIN} Body: {"ips": ["192.168.1.2"]} ### Delete Glue Record Endpoint: /domain/deleteGlue/{DOMAIN}/{SUBDOMAIN} Body: {} ### Get Glue Records Endpoint: /domain/getGlue/{DOMAIN} Body: {} ## SSL Certificates ### Retrieve SSL Bundle Endpoint: /ssl/retrieve/{DOMAIN} Body: {} Returns: certificatechain, privatekey, publickey ## Utility ### Ping (Test Auth) Endpoint: /ping Body: {} Returns: Your public IP address """ class PorkbunClient: """HTTP client for Porkbun API with safety controls.""" def __init__(self, api_key: str, secret_key: str, base_url: str): self.api_key = api_key self.secret_key = secret_key self.base_url = base_url.rstrip("/") self.client = httpx.Client(timeout=30.0) def _get_auth_body(self) -> dict[str, str]: """Return authentication payload.""" return { "apikey": self.api_key, "secretapikey": self.secret_key, } def _classify_endpoint(self, endpoint: str) -> str: """Classify endpoint by safety level.""" if any(endpoint.startswith(e) for e in INFRA_ENDPOINTS): return "INFRA" if any(endpoint.startswith(e) for e in WRITE_ENDPOINTS): return "WRITE" return "READ" def _validate_safety(self, endpoint: str) -> tuple[bool, str | None]: """Check if endpoint is allowed under current safety settings.""" level = self._classify_endpoint(endpoint) if level == "INFRA": if not ALLOW_INFRA: return False, ( "INFRA operation blocked. This endpoint modifies domain infrastructure " "(nameservers/glue records). Set PORKBUN_ALLOW_INFRA=true to enable." ) if not ALLOW_WRITES: return False, ( "INFRA operations require PORKBUN_ALLOW_WRITES=true in addition to " "PORKBUN_ALLOW_INFRA=true." ) if level == "WRITE": if not ALLOW_WRITES: return False, ( "WRITE operation blocked. This endpoint modifies DNS records. " "Set PORKBUN_ALLOW_WRITES=true to enable." ) return True, None def request( self, endpoint: str, body: dict[str, Any] | None = None ) -> dict[str, Any]: """Execute API request with safety checks.""" # Normalize endpoint if not endpoint.startswith("/"): endpoint = f"/{endpoint}" # Safety validation allowed, error_msg = self._validate_safety(endpoint) if not allowed: return { "status": "BLOCKED", "reason": error_msg, "endpoint": endpoint, "category": self._classify_endpoint(endpoint), } # Build request body with auth request_body = self._get_auth_body() if body: request_body.update(body) try: response = self.client.post( f"{self.base_url}{endpoint}", json=request_body, ) return response.json() except httpx.HTTPError as e: return {"status": "ERROR", "message": f"HTTP error: {e}"} except json.JSONDecodeError: return {"status": "ERROR", "message": "Invalid JSON response from API"} def ping(self) -> dict[str, Any]: """Test API connectivity.""" return self.request("/ping") def close(self): """Close HTTP client.""" self.client.close() # Initialize client and MCP client = PorkbunClient(PORKBUN_API_KEY, PORKBUN_SECRET_KEY, PORKBUN_API_BASE) mcp = FastMCP("porkbun-dns") # --- MCP Resources --- @mcp.resource("porkbun://api-reference") def get_api_docs() -> str: """Returns the Porkbun API documentation for using the porkbun_api tool.""" return API_DOCS @mcp.resource("porkbun://safety-status") def get_safety_status() -> str: """Returns current safety configuration.""" return json.dumps( { "allow_writes": ALLOW_WRITES, "allow_infra": ALLOW_INFRA, "mode": "INFRA" if ALLOW_INFRA else ("WRITE" if ALLOW_WRITES else "READ"), }, indent=2, ) # --- MCP Tools --- @mcp.tool() def ping() -> str: """Test API connectivity and return your public IP address. Useful for verifying credentials and for dynamic DNS scenarios. """ result = client.ping() return json.dumps(result, indent=2) @mcp.tool() def list_domains(start: int = 0, include_labels: bool = False) -> str: """List all domains in your Porkbun account. Args: start: Pagination offset (domains returned in chunks of 1000) include_labels: Include domain labels in response Returns: JSON string with domain list and details """ body = {"start": str(start)} if include_labels: body["includeLabels"] = "yes" result = client.request("/domain/listAll", body) return json.dumps(result, indent=2) @mcp.tool() def list_dns_records(domain: str, record_id: str | None = None) -> str: """Retrieve DNS records for a domain. Args: domain: The domain name (e.g., 'example.com') record_id: Optional specific record ID to retrieve Returns: JSON string with DNS records """ endpoint = f"/dns/retrieve/{domain}" if record_id: endpoint = f"{endpoint}/{record_id}" result = client.request(endpoint) return json.dumps(result, indent=2) @mcp.tool() def porkbun_api(endpoint: str, body: str = "{}") -> str: """Execute a raw Porkbun API call. Use the 'porkbun://api-reference' resource for endpoint documentation. Safety controls apply: WRITE and INFRA operations may be blocked. Args: endpoint: API path (e.g., '/dns/create/example.com') body: JSON string of additional parameters (auth is added automatically) Returns: JSON string with API response """ try: parsed_body = json.loads(body) if body else {} except json.JSONDecodeError as e: return json.dumps({"status": "ERROR", "message": f"Invalid JSON body: {e}"}) result = client.request(endpoint, parsed_body) return json.dumps(result, indent=2) # --- Health Check & App --- async def health(request): """Health check endpoint for Docker/orchestration.""" if not PORKBUN_API_KEY or not PORKBUN_SECRET_KEY: return JSONResponse( {"status": "unhealthy", "reason": "Missing API credentials"}, status_code=503, ) # Test API connectivity result = client.ping() if result.get("status") == "SUCCESS": return JSONResponse( { "status": "healthy", "mode": "INFRA" if ALLOW_INFRA else ("WRITE" if ALLOW_WRITES else "READ"), } ) return JSONResponse( {"status": "unhealthy", "reason": result.get("message", "API ping failed")}, status_code=503, ) def create_app() -> Starlette: """Create the ASGI application.""" mcp_app = mcp.http_app() return Starlette( routes=[ Route("/health", health), Mount("/", app=mcp_app), ], lifespan=mcp_app.lifespan, ) if __name__ == "__main__": import uvicorn port = int(os.getenv("PORT", "8000")) uvicorn.run(create_app(), host="0.0.0.0", port=port)