Files
porkbun-dns-mcp/server.py
Ben 882cde3104
All checks were successful
Build and Push Docker Image / build (push) Successful in 1m32s
feat: Initial Porkbun DNS MCP Light server
- 3 specific tools: ping, list_domains, list_dns_records
- 1 pass-through tool: porkbun_api for full API access
- Safety system with READ/WRITE/INFRA access levels
- Embedded API documentation as MCP resource
- Starlette wrapper with /health endpoint
- Gitea Actions CI workflow for Docker build
2026-01-05 15:04:00 +00:00

419 lines
11 KiB
Python

"""Porkbun DNS MCP Light Server.
A lightweight MCP server for managing DNS records via the Porkbun API.
Implements a safety system with READ, WRITE, and INFRA access levels.
"""
import json
import os
from typing import Any
import httpx
from dotenv import load_dotenv
from fastmcp import FastMCP
from starlette.applications import Starlette
from starlette.routing import Mount, Route
from starlette.responses import JSONResponse
load_dotenv()
# Configuration
PORKBUN_API_KEY = os.getenv("PORKBUN_API_KEY", "")
PORKBUN_SECRET_KEY = os.getenv("PORKBUN_SECRET_KEY", "")
PORKBUN_API_BASE = os.getenv("PORKBUN_API_BASE", "https://api.porkbun.com/api/json/v3")
# Safety controls (defaults to read-only)
ALLOW_WRITES = os.getenv("PORKBUN_ALLOW_WRITES", "false").lower() == "true"
ALLOW_INFRA = os.getenv("PORKBUN_ALLOW_INFRA", "false").lower() == "true"
# Endpoint classifications for safety system
INFRA_ENDPOINTS = [
"/domain/updateNs/",
"/domain/createGlue/",
"/domain/updateGlue/",
"/domain/deleteGlue/",
]
WRITE_ENDPOINTS = [
"/dns/create/",
"/dns/edit/",
"/dns/editByNameType/",
"/dns/delete/",
"/dns/deleteByNameType/",
"/dns/createDnssecRecord/",
"/dns/deleteDnssecRecord/",
"/domain/addUrlForward/",
"/domain/deleteUrlForward/",
]
# API Documentation for agent self-service
API_DOCS = """
# Porkbun API Reference
Base URL: https://api.porkbun.com/api/json/v3
All endpoints use POST method. Authentication is handled automatically.
## Safety Levels
- READ: Default, all retrieve/list operations
- WRITE: Requires PORKBUN_ALLOW_WRITES=true - DNS record modifications
- INFRA: Requires PORKBUN_ALLOW_INFRA=true - Nameserver and glue record changes
## DNS Record Management
### Create DNS Record
Endpoint: /dns/create/{DOMAIN}
Body: {"name": "subdomain", "type": "A", "content": "1.2.3.4", "ttl": "600", "prio": "0"}
- name: Subdomain (blank for root, * for wildcard)
- type: A, AAAA, MX, CNAME, ALIAS, TXT, NS, SRV, TLSA, CAA, HTTPS, SVCB, SSHFP
- content: Record value
- ttl: Time to live (minimum 600)
- prio: Priority (for MX, SRV)
### Edit DNS Record by ID
Endpoint: /dns/edit/{DOMAIN}/{RECORD_ID}
Body: {"name": "subdomain", "type": "A", "content": "1.2.3.5", "ttl": "600"}
### Edit DNS Record by Name/Type
Endpoint: /dns/editByNameType/{DOMAIN}/{TYPE}/{SUBDOMAIN}
Body: {"content": "1.2.3.5", "ttl": "600"}
### Delete DNS Record by ID
Endpoint: /dns/delete/{DOMAIN}/{RECORD_ID}
Body: {}
### Delete DNS Record by Name/Type
Endpoint: /dns/deleteByNameType/{DOMAIN}/{TYPE}/{SUBDOMAIN}
Body: {}
### Retrieve All DNS Records
Endpoint: /dns/retrieve/{DOMAIN}
Body: {}
### Retrieve DNS Record by ID
Endpoint: /dns/retrieve/{DOMAIN}/{RECORD_ID}
Body: {}
### Retrieve DNS Records by Name/Type
Endpoint: /dns/retrieveByNameType/{DOMAIN}/{TYPE}/{SUBDOMAIN}
Body: {}
## DNSSEC Management
### Create DNSSEC Record
Endpoint: /dns/createDnssecRecord/{DOMAIN}
Body: {"keyTag": "64087", "alg": "13", "digestType": "2", "digest": "..."}
### Get DNSSEC Records
Endpoint: /dns/getDnssecRecords/{DOMAIN}
Body: {}
### Delete DNSSEC Record
Endpoint: /dns/deleteDnssecRecord/{DOMAIN}/{KEYTAG}
Body: {}
## Domain Management
### List All Domains
Endpoint: /domain/listAll
Body: {"start": "0", "includeLabels": "yes"}
### Get Nameservers
Endpoint: /domain/getNs/{DOMAIN}
Body: {}
### Update Nameservers (INFRA level)
Endpoint: /domain/updateNs/{DOMAIN}
Body: {"ns": ["ns1.example.com", "ns2.example.com"]}
### Check Domain Availability
Endpoint: /domain/checkDomain/{DOMAIN}
Body: {}
## URL Forwarding
### Add URL Forward
Endpoint: /domain/addUrlForward/{DOMAIN}
Body: {"subdomain": "", "location": "https://example.com", "type": "temporary", "includePath": "no", "wildcard": "yes"}
### Get URL Forwarding
Endpoint: /domain/getUrlForwarding/{DOMAIN}
Body: {}
### Delete URL Forward
Endpoint: /domain/deleteUrlForward/{DOMAIN}/{RECORD_ID}
Body: {}
## Glue Records (INFRA level)
### Create Glue Record
Endpoint: /domain/createGlue/{DOMAIN}/{SUBDOMAIN}
Body: {"ips": ["192.168.1.1", "2001:db8::1"]}
### Update Glue Record
Endpoint: /domain/updateGlue/{DOMAIN}/{SUBDOMAIN}
Body: {"ips": ["192.168.1.2"]}
### Delete Glue Record
Endpoint: /domain/deleteGlue/{DOMAIN}/{SUBDOMAIN}
Body: {}
### Get Glue Records
Endpoint: /domain/getGlue/{DOMAIN}
Body: {}
## SSL Certificates
### Retrieve SSL Bundle
Endpoint: /ssl/retrieve/{DOMAIN}
Body: {}
Returns: certificatechain, privatekey, publickey
## Utility
### Ping (Test Auth)
Endpoint: /ping
Body: {}
Returns: Your public IP address
"""
class PorkbunClient:
"""HTTP client for Porkbun API with safety controls."""
def __init__(self, api_key: str, secret_key: str, base_url: str):
self.api_key = api_key
self.secret_key = secret_key
self.base_url = base_url.rstrip("/")
self.client = httpx.Client(timeout=30.0)
def _get_auth_body(self) -> dict[str, str]:
"""Return authentication payload."""
return {
"apikey": self.api_key,
"secretapikey": self.secret_key,
}
def _classify_endpoint(self, endpoint: str) -> str:
"""Classify endpoint by safety level."""
if any(endpoint.startswith(e) for e in INFRA_ENDPOINTS):
return "INFRA"
if any(endpoint.startswith(e) for e in WRITE_ENDPOINTS):
return "WRITE"
return "READ"
def _validate_safety(self, endpoint: str) -> tuple[bool, str | None]:
"""Check if endpoint is allowed under current safety settings."""
level = self._classify_endpoint(endpoint)
if level == "INFRA":
if not ALLOW_INFRA:
return False, (
"INFRA operation blocked. This endpoint modifies domain infrastructure "
"(nameservers/glue records). Set PORKBUN_ALLOW_INFRA=true to enable."
)
if not ALLOW_WRITES:
return False, (
"INFRA operations require PORKBUN_ALLOW_WRITES=true in addition to "
"PORKBUN_ALLOW_INFRA=true."
)
if level == "WRITE":
if not ALLOW_WRITES:
return False, (
"WRITE operation blocked. This endpoint modifies DNS records. "
"Set PORKBUN_ALLOW_WRITES=true to enable."
)
return True, None
def request(
self, endpoint: str, body: dict[str, Any] | None = None
) -> dict[str, Any]:
"""Execute API request with safety checks."""
# Normalize endpoint
if not endpoint.startswith("/"):
endpoint = f"/{endpoint}"
# Safety validation
allowed, error_msg = self._validate_safety(endpoint)
if not allowed:
return {
"status": "BLOCKED",
"reason": error_msg,
"endpoint": endpoint,
"category": self._classify_endpoint(endpoint),
}
# Build request body with auth
request_body = self._get_auth_body()
if body:
request_body.update(body)
try:
response = self.client.post(
f"{self.base_url}{endpoint}",
json=request_body,
)
return response.json()
except httpx.HTTPError as e:
return {"status": "ERROR", "message": f"HTTP error: {e}"}
except json.JSONDecodeError:
return {"status": "ERROR", "message": "Invalid JSON response from API"}
def ping(self) -> dict[str, Any]:
"""Test API connectivity."""
return self.request("/ping")
def close(self):
"""Close HTTP client."""
self.client.close()
# Initialize client and MCP
client = PorkbunClient(PORKBUN_API_KEY, PORKBUN_SECRET_KEY, PORKBUN_API_BASE)
mcp = FastMCP("porkbun-dns")
# --- MCP Resources ---
@mcp.resource("porkbun://api-reference")
def get_api_docs() -> str:
"""Returns the Porkbun API documentation for using the porkbun_api tool."""
return API_DOCS
@mcp.resource("porkbun://safety-status")
def get_safety_status() -> str:
"""Returns current safety configuration."""
return json.dumps(
{
"allow_writes": ALLOW_WRITES,
"allow_infra": ALLOW_INFRA,
"mode": "INFRA" if ALLOW_INFRA else ("WRITE" if ALLOW_WRITES else "READ"),
},
indent=2,
)
# --- MCP Tools ---
@mcp.tool()
def ping() -> str:
"""Test API connectivity and return your public IP address.
Useful for verifying credentials and for dynamic DNS scenarios.
"""
result = client.ping()
return json.dumps(result, indent=2)
@mcp.tool()
def list_domains(start: int = 0, include_labels: bool = False) -> str:
"""List all domains in your Porkbun account.
Args:
start: Pagination offset (domains returned in chunks of 1000)
include_labels: Include domain labels in response
Returns:
JSON string with domain list and details
"""
body = {"start": str(start)}
if include_labels:
body["includeLabels"] = "yes"
result = client.request("/domain/listAll", body)
return json.dumps(result, indent=2)
@mcp.tool()
def list_dns_records(domain: str, record_id: str | None = None) -> str:
"""Retrieve DNS records for a domain.
Args:
domain: The domain name (e.g., 'example.com')
record_id: Optional specific record ID to retrieve
Returns:
JSON string with DNS records
"""
endpoint = f"/dns/retrieve/{domain}"
if record_id:
endpoint = f"{endpoint}/{record_id}"
result = client.request(endpoint)
return json.dumps(result, indent=2)
@mcp.tool()
def porkbun_api(endpoint: str, body: str = "{}") -> str:
"""Execute a raw Porkbun API call.
Use the 'porkbun://api-reference' resource for endpoint documentation.
Safety controls apply: WRITE and INFRA operations may be blocked.
Args:
endpoint: API path (e.g., '/dns/create/example.com')
body: JSON string of additional parameters (auth is added automatically)
Returns:
JSON string with API response
"""
try:
parsed_body = json.loads(body) if body else {}
except json.JSONDecodeError as e:
return json.dumps({"status": "ERROR", "message": f"Invalid JSON body: {e}"})
result = client.request(endpoint, parsed_body)
return json.dumps(result, indent=2)
# --- Health Check & App ---
async def health(request):
"""Health check endpoint for Docker/orchestration."""
if not PORKBUN_API_KEY or not PORKBUN_SECRET_KEY:
return JSONResponse(
{"status": "unhealthy", "reason": "Missing API credentials"},
status_code=503,
)
# Test API connectivity
result = client.ping()
if result.get("status") == "SUCCESS":
return JSONResponse(
{
"status": "healthy",
"mode": "INFRA"
if ALLOW_INFRA
else ("WRITE" if ALLOW_WRITES else "READ"),
}
)
return JSONResponse(
{"status": "unhealthy", "reason": result.get("message", "API ping failed")},
status_code=503,
)
def create_app() -> Starlette:
"""Create the ASGI application."""
mcp_app = mcp.http_app()
return Starlette(
routes=[
Route("/health", health),
Mount("/", app=mcp_app),
],
lifespan=mcp_app.lifespan,
)
if __name__ == "__main__":
import uvicorn
port = int(os.getenv("PORT", "8000"))
uvicorn.run(create_app(), host="0.0.0.0", port=port)