feat: implement EVE Forum Moderator CLI tool

- Add structured forum rules from EVE moderation policy
- Implement cookie-based Discourse authentication
- Add async Discourse API client with rate limiting
- Implement Claude-powered rule violation analyzer
- Add rich terminal and Discourse-compatible markdown reports
- Create Click CLI with review and rules commands
This commit is contained in:
Ben
2026-01-06 03:40:56 +00:00
parent b806ec8c83
commit 9d60f1a2fb
15 changed files with 2654 additions and 2 deletions

3
src/eve_mod/__init__.py Normal file
View File

@@ -0,0 +1,3 @@
"""EVE Forum Moderator - AI-assisted forum moderation tool."""
__version__ = "0.1.0"

333
src/eve_mod/analyzer.py Normal file
View File

@@ -0,0 +1,333 @@
"""LLM-powered rule violation analyzer using Claude via llm-proxy."""
import json
import os
from dataclasses import dataclass, field
from typing import Any, Callable
from anthropic import Anthropic
from anthropic.types import TextBlock
from .discourse import UserPost
from .rules import ALL_RULES, RULES_BY_ID, Rule, Severity, format_rules_for_prompt
# Default model to use
DEFAULT_MODEL = "claude-sonnet-4-5-20250514"
# Batch size for analyzing posts
BATCH_SIZE = 5
@dataclass
class Violation:
"""A detected rule violation in a post."""
rule_id: str
rule_name: str
severity: str
confidence: float # 0.0-1.0
explanation: str
quote: str # Relevant excerpt from the post
@property
def rule(self) -> Rule | None:
"""Get the full Rule object."""
return RULES_BY_ID.get(self.rule_id)
@dataclass
class PostAnalysis:
"""Analysis result for a single post."""
post: UserPost
violations: list[Violation] = field(default_factory=list)
clean: bool = True
error: str | None = None
@property
def has_violations(self) -> bool:
"""Check if any violations were found."""
return len(self.violations) > 0
@property
def max_severity(self) -> str:
"""Get the highest severity among violations."""
if not self.violations:
return "none"
severity_order = {"low": 0, "medium": 1, "high": 2, "critical": 3}
max_sev = max(self.violations, key=lambda v: severity_order.get(v.severity, 0))
return max_sev.severity
class AnalyzerError(Exception):
"""Raised when analysis fails."""
pass
SYSTEM_PROMPT = """You are an EVE Online forum moderator assistant. Your role is to analyze forum posts and identify potential violations of the forum moderation policy.
{rules}
## Important Context
EVE Online is a competitive PvP MMO where trash talk between rival corporations and alliances is part of the culture. However, there are limits:
- In-game rivalry and competitive banter is generally acceptable
- Personal attacks that go beyond the game are not acceptable
- Discrimination, hate speech, and harassment are never acceptable regardless of "roleplay"
- Criticism of game mechanics or CCP decisions is allowed if constructive
## Your Task
For each post provided, analyze whether it violates any forum rules. Consider:
1. The context of EVE Online's competitive culture
2. Whether the post is directed at in-game entities (acceptable) vs. real people (less acceptable)
3. The severity of the violation if one exists
4. Your confidence level in the assessment
## Output Format
Respond with valid JSON in this exact format:
{{
"analyses": [
{{
"post_id": <number>,
"violations": [
{{
"rule_id": "<string>",
"severity": "low|medium|high|critical",
"confidence": <0.0-1.0>,
"explanation": "<brief explanation>",
"quote": "<relevant excerpt from the post>"
}}
],
"clean": <true if no violations, false otherwise>
}}
]
}}
If a post has no violations, return an empty violations array and clean=true.
Be conservative - only flag clear violations. Borderline cases should have lower confidence scores.
"""
class Analyzer:
"""Analyzes forum posts for rule violations using Claude."""
def __init__(
self,
api_key: str | None = None,
base_url: str | None = None,
model: str = DEFAULT_MODEL,
):
"""
Initialize the analyzer.
Args:
api_key: LLM proxy API key (defaults to LLM_PROXY_KEY env var)
base_url: LLM proxy base URL (defaults to https://llm-proxy.ext.ben.io/v1)
model: Model to use for analysis
"""
self.api_key = api_key or os.environ.get("LLM_PROXY_KEY")
if not self.api_key:
raise AnalyzerError(
"No API key provided. Set LLM_PROXY_KEY environment variable "
"or pass api_key parameter."
)
self.base_url = base_url or os.environ.get(
"LLM_PROXY_URL", "https://llm-proxy.ext.ben.io/v1"
)
self.model = model
# Initialize Anthropic client with custom base URL
self.client = Anthropic(
api_key=self.api_key,
base_url=self.base_url,
)
def _build_system_prompt(self) -> str:
"""Build the system prompt with rule definitions."""
rules = format_rules_for_prompt()
return SYSTEM_PROMPT.format(rules=rules)
def _format_posts_for_analysis(self, posts: list[UserPost]) -> str:
"""Format posts for the LLM prompt."""
formatted = []
for post in posts:
formatted.append(
f"""---
POST ID: {post.post_id}
TOPIC: {post.topic_title}
CATEGORY: {post.category_name or "Unknown"}
DATE: {post.created_at.strftime("%Y-%m-%d %H:%M UTC")}
URL: {post.url}
CONTENT:
{post.content_text}
---"""
)
return "\n\n".join(formatted)
def _parse_response(
self, response_text: str, posts: list[UserPost]
) -> list[PostAnalysis]:
"""Parse the LLM response into PostAnalysis objects."""
try:
# Try to extract JSON from the response
# Sometimes the model wraps it in markdown code blocks
json_match = response_text
if "```json" in response_text:
start = response_text.find("```json") + 7
end = response_text.find("```", start)
json_match = response_text[start:end].strip()
elif "```" in response_text:
start = response_text.find("```") + 3
end = response_text.find("```", start)
json_match = response_text[start:end].strip()
data = json.loads(json_match)
analyses = data.get("analyses", [])
# Create lookup for posts
post_lookup = {p.post_id: p for p in posts}
results: list[PostAnalysis] = []
for analysis in analyses:
post_id = analysis.get("post_id")
post = post_lookup.get(post_id)
if not post:
continue
violations = []
for v in analysis.get("violations", []):
rule_id = v.get("rule_id", "")
rule = RULES_BY_ID.get(rule_id)
violations.append(
Violation(
rule_id=rule_id,
rule_name=rule.name if rule else "Unknown",
severity=v.get("severity", "medium"),
confidence=v.get("confidence", 0.5),
explanation=v.get("explanation", ""),
quote=v.get("quote", ""),
)
)
results.append(
PostAnalysis(
post=post,
violations=violations,
clean=analysis.get("clean", len(violations) == 0),
)
)
# Add any posts that weren't in the response
analyzed_ids = {r.post.post_id for r in results}
for post in posts:
if post.post_id not in analyzed_ids:
results.append(
PostAnalysis(
post=post,
violations=[],
clean=True,
error="Not included in LLM response",
)
)
return results
except json.JSONDecodeError as e:
# If JSON parsing fails, return error results for all posts
return [
PostAnalysis(
post=post,
violations=[],
clean=True,
error=f"Failed to parse LLM response: {e}",
)
for post in posts
]
def analyze_batch(self, posts: list[UserPost]) -> list[PostAnalysis]:
"""
Analyze a batch of posts for rule violations.
Args:
posts: List of posts to analyze
Returns:
List of PostAnalysis results
"""
if not posts:
return []
system_prompt = self._build_system_prompt()
user_message = f"""Please analyze the following {len(posts)} forum posts for potential rule violations:
{self._format_posts_for_analysis(posts)}
Analyze each post and return your findings in the specified JSON format."""
try:
response = self.client.messages.create(
model=self.model,
max_tokens=4096,
system=system_prompt,
messages=[{"role": "user", "content": user_message}],
)
# Extract text from the response (handle different content block types)
text_block = next(
(block for block in response.content if isinstance(block, TextBlock)),
None,
)
if text_block is None:
raise AnalyzerError("No text content in LLM response")
response_text = text_block.text
return self._parse_response(response_text, posts)
except Exception as e:
# Return error results for all posts
return [
PostAnalysis(
post=post,
violations=[],
clean=True,
error=f"Analysis failed: {e}",
)
for post in posts
]
def analyze_all(
self,
posts: list[UserPost],
batch_size: int = BATCH_SIZE,
progress_callback: "Callable[[int, int], None] | None" = None,
) -> list[PostAnalysis]:
"""
Analyze all posts, processing in batches.
Args:
posts: All posts to analyze
batch_size: Number of posts per batch
progress_callback: Optional callback(current, total) for progress updates
Returns:
List of all PostAnalysis results
"""
all_results: list[PostAnalysis] = []
total = len(posts)
for i in range(0, total, batch_size):
batch = posts[i : i + batch_size]
results = self.analyze_batch(batch)
all_results.extend(results)
if progress_callback:
progress_callback(min(i + batch_size, total), total)
return all_results

133
src/eve_mod/auth.py Normal file
View File

@@ -0,0 +1,133 @@
"""Authentication utilities for EVE Forums (Discourse)."""
import re
from pathlib import Path
# Required cookies for Discourse authentication
REQUIRED_COOKIES = ("_forum_session", "_t")
# Optional but useful cookies
OPTIONAL_COOKIES = ("_cf_clearance", "cf_clearance")
class AuthError(Exception):
"""Raised when authentication fails or cookies are invalid."""
pass
def parse_cookies_env(content: str) -> dict[str, str]:
"""
Parse cookies from a cookies.env file.
Supports two formats:
1. Simple key=value (one per line):
_forum_session=abc123...
_t=xyz789...
2. Netscape/curl cookie format:
# Netscape HTTP Cookie File
.eveonline.com TRUE / TRUE 0 _forum_session abc123...
Args:
content: The raw file content
Returns:
Dictionary of cookie name -> value
"""
cookies: dict[str, str] = {}
for line in content.strip().splitlines():
line = line.strip()
# Skip empty lines and comments
if not line or line.startswith("#"):
continue
# Try Netscape format first (tab-separated, 7 fields)
if "\t" in line:
parts = line.split("\t")
if len(parts) >= 7:
# Netscape format: domain, flag, path, secure, expiry, name, value
name = parts[5]
value = parts[6]
cookies[name] = value
continue
# Try simple key=value format
if "=" in line:
# Handle values that might contain '='
key, _, value = line.partition("=")
key = key.strip()
value = value.strip()
# Remove quotes if present
if value.startswith('"') and value.endswith('"'):
value = value[1:-1]
elif value.startswith("'") and value.endswith("'"):
value = value[1:-1]
if key and value:
cookies[key] = value
return cookies
def load_cookies(cookie_path: Path | str) -> dict[str, str]:
"""
Load and validate cookies from a file.
Args:
cookie_path: Path to the cookies file
Returns:
Dictionary of cookie name -> value
Raises:
AuthError: If file not found or required cookies missing
"""
path = Path(cookie_path)
if not path.exists():
raise AuthError(
f"Cookie file not found: {path}\n"
f"Please export your browser cookies to {path}\n"
f"Required cookies: {', '.join(REQUIRED_COOKIES)}"
)
content = path.read_text()
cookies = parse_cookies_env(content)
# Check for required cookies
missing = [name for name in REQUIRED_COOKIES if name not in cookies]
if missing:
raise AuthError(
f"Missing required cookies: {', '.join(missing)}\n"
f"Found cookies: {', '.join(cookies.keys())}\n"
f"Please ensure you're logged into forums.eveonline.com before exporting cookies."
)
return cookies
def validate_session_cookie(session_value: str) -> bool:
"""
Basic validation of the session cookie format.
Args:
session_value: The _forum_session cookie value
Returns:
True if the cookie appears valid
"""
# Discourse session cookies are typically URL-encoded strings
# with a specific structure. Basic length check.
if not session_value or len(session_value) < 20:
return False
# Should contain URL-safe characters
if not re.match(r"^[A-Za-z0-9%_\-\.]+$", session_value):
return False
return True

227
src/eve_mod/cli.py Normal file
View File

@@ -0,0 +1,227 @@
"""CLI entrypoint for EVE Forum Moderator."""
import asyncio
from pathlib import Path
import click
from dotenv import load_dotenv
from rich.console import Console
from rich.progress import Progress, SpinnerColumn, TextColumn
from .analyzer import Analyzer, AnalyzerError
from .auth import AuthError, load_cookies
from .discourse import DiscourseClient, DiscourseError, UserNotFoundError
from .report import ReportGenerator
console = Console()
@click.group()
@click.version_option(version="0.1.0")
def cli() -> None:
"""EVE Forum Moderator - AI-assisted forum moderation tool."""
pass
@cli.command()
@click.argument("username")
@click.option(
"--days",
"-d",
default=30,
help="Number of days to analyze (default: 30)",
)
@click.option(
"--cookies",
"-c",
type=click.Path(exists=False),
default="cookies.env",
help="Path to cookies file (default: cookies.env)",
)
@click.option(
"--output",
"-o",
type=click.Path(),
default=None,
help="Custom output path for markdown report",
)
@click.option(
"--verbose",
"-v",
is_flag=True,
help="Show verbose output including detailed analysis",
)
@click.option(
"--max-posts",
default=200,
help="Maximum number of posts to fetch (default: 200)",
)
@click.option(
"--enrich/--no-enrich",
default=True,
help="Fetch full post content (slower but more accurate)",
)
def review(
username: str,
days: int,
cookies: str,
output: str | None,
verbose: bool,
max_posts: int,
enrich: bool,
) -> None:
"""
Review a user's forum posts for potential rule violations.
USERNAME is the EVE Forum username to review.
"""
# Load environment variables (for LLM_PROXY_KEY)
load_dotenv()
try:
# Load cookies
with console.status("[bold blue]Loading authentication cookies..."):
cookie_dict = load_cookies(Path(cookies))
console.print(f"[green]Loaded {len(cookie_dict)} cookies[/green]")
# Run async operations
asyncio.run(
_review_user(
username=username,
days=days,
cookies=cookie_dict,
output=output,
verbose=verbose,
max_posts=max_posts,
enrich=enrich,
)
)
except AuthError as e:
console.print(f"[red]Authentication Error:[/red] {e}")
raise SystemExit(1)
except AnalyzerError as e:
console.print(f"[red]Analysis Error:[/red] {e}")
raise SystemExit(1)
except Exception as e:
console.print(f"[red]Unexpected Error:[/red] {e}")
if verbose:
console.print_exception()
raise SystemExit(1)
async def _review_user(
username: str,
days: int,
cookies: dict[str, str],
output: str | None,
verbose: bool,
max_posts: int,
enrich: bool,
) -> None:
"""Async implementation of user review."""
try:
async with DiscourseClient(cookies) as client:
# Fetch user profile
with console.status(f"[bold blue]Fetching profile for {username}..."):
try:
user = await client.get_user(username)
console.print(
f"[green]Found user:[/green] {user.username} "
f"(Trust Level {user.trust_level}, {user.post_count} posts)"
)
except UserNotFoundError:
console.print(f"[red]User not found:[/red] {username}")
raise SystemExit(1)
# Fetch posts
with console.status(f"[bold blue]Fetching posts from last {days} days..."):
posts = await client.get_user_posts(
username=username,
days=days,
max_posts=max_posts,
)
console.print(f"[green]Found {len(posts)} posts[/green]")
if not posts:
console.print(
"[yellow]No posts found in the specified time period.[/yellow]"
)
return
# Enrich posts with full content
if enrich:
with console.status("[bold blue]Fetching full post content..."):
posts = await client.enrich_posts(posts)
console.print("[green]Enriched posts with full content[/green]")
except DiscourseError as e:
console.print(f"[red]Forum Error:[/red] {e}")
raise SystemExit(1)
# Analyze posts
console.print("[bold blue]Analyzing posts for rule violations...[/bold blue]")
try:
analyzer = Analyzer()
except AnalyzerError as e:
console.print(f"[red]Analyzer Error:[/red] {e}")
console.print(
"[dim]Hint: Set LLM_PROXY_KEY environment variable or add it to .env[/dim]"
)
raise SystemExit(1)
with Progress(
SpinnerColumn(),
TextColumn("[progress.description]{task.description}"),
console=console,
) as progress:
task = progress.add_task("Analyzing...", total=len(posts))
def update_progress(current: int, total: int) -> None:
progress.update(task, completed=current)
analyses = analyzer.analyze_all(
posts=posts,
progress_callback=update_progress,
)
# Generate report
report_gen = ReportGenerator()
# Print to terminal
report_gen.print_summary(user, analyses, days)
if verbose:
report_gen.print_detailed(analyses)
# Save to file
if output:
filepath = Path(output)
filepath.parent.mkdir(parents=True, exist_ok=True)
filepath.write_text(report_gen.generate_markdown(user, analyses, days))
else:
filepath = report_gen.save_report(user, analyses, days)
console.print(f"\n[green]Report saved to:[/green] {filepath}")
@cli.command()
def rules() -> None:
"""Display all forum moderation rules."""
from .rules import ALL_RULES, RuleCategory
console.print("\n[bold]EVE Online Forum Moderation Rules[/bold]\n")
current_category = None
for rule in ALL_RULES:
if rule.category != current_category:
current_category = rule.category
console.print(f"\n[bold cyan]{current_category.value.upper()}[/bold cyan]")
console.print(f" [yellow]{rule.id}[/yellow] - [bold]{rule.name}[/bold]")
console.print(f" [dim]{rule.description[:80]}...[/dim]")
if __name__ == "__main__":
cli()

340
src/eve_mod/discourse.py Normal file
View File

@@ -0,0 +1,340 @@
"""Discourse API client for EVE Online Forums."""
import asyncio
import html
import re
from dataclasses import dataclass
from datetime import datetime, timedelta, timezone
from typing import Any
import httpx
from dateutil.parser import parse as parse_date
# EVE Forums base URL
BASE_URL = "https://forums.eveonline.com"
# Rate limiting settings
REQUEST_DELAY = 0.5 # seconds between requests
MAX_RETRIES = 3
RETRY_BACKOFF = 2.0 # exponential backoff multiplier
@dataclass
class UserPost:
"""Represents a single forum post by a user."""
post_id: int
post_number: int
topic_id: int
topic_title: str
topic_slug: str
content_raw: str
content_cooked: str # HTML version
created_at: datetime
category_name: str | None = None
@property
def url(self) -> str:
"""Get the direct URL to this post."""
return f"{BASE_URL}/t/{self.topic_slug}/{self.topic_id}/{self.post_number}"
@property
def content_text(self) -> str:
"""Get plain text content (HTML stripped)."""
# Remove HTML tags
text = re.sub(r"<[^>]+>", " ", self.content_cooked)
# Decode HTML entities
text = html.unescape(text)
# Normalize whitespace
text = re.sub(r"\s+", " ", text).strip()
return text
@dataclass
class UserProfile:
"""Basic user profile information."""
username: str
name: str | None
created_at: datetime
trust_level: int
post_count: int
topics_entered: int
time_read: int # seconds
class DiscourseError(Exception):
"""Base exception for Discourse API errors."""
pass
class UserNotFoundError(DiscourseError):
"""Raised when a user is not found."""
pass
class AuthenticationError(DiscourseError):
"""Raised when authentication fails."""
pass
class RateLimitError(DiscourseError):
"""Raised when rate limited by the API."""
pass
class DiscourseClient:
"""Async client for the EVE Online Discourse forums."""
def __init__(self, cookies: dict[str, str], base_url: str = BASE_URL):
"""
Initialize the Discourse client.
Args:
cookies: Dictionary of authentication cookies
base_url: Forum base URL
"""
self.base_url = base_url.rstrip("/")
self.cookies = cookies
self._client: httpx.AsyncClient | None = None
self._last_request: float = 0
async def __aenter__(self) -> "DiscourseClient":
"""Enter async context."""
self._client = httpx.AsyncClient(
base_url=self.base_url,
cookies=self.cookies,
headers={
"Accept": "application/json",
"User-Agent": "EVE-Forum-Moderator/0.1.0",
},
timeout=30.0,
follow_redirects=True,
)
return self
async def __aexit__(self, *args: Any) -> None:
"""Exit async context."""
if self._client:
await self._client.aclose()
self._client = None
async def _rate_limit(self) -> None:
"""Enforce rate limiting between requests."""
now = asyncio.get_event_loop().time()
elapsed = now - self._last_request
if elapsed < REQUEST_DELAY:
await asyncio.sleep(REQUEST_DELAY - elapsed)
self._last_request = asyncio.get_event_loop().time()
async def _request(self, method: str, path: str, **kwargs: Any) -> dict[str, Any]:
"""
Make a rate-limited request with retry logic.
Args:
method: HTTP method
path: API path
**kwargs: Additional httpx request arguments
Returns:
Parsed JSON response
Raises:
DiscourseError: On API errors
"""
if not self._client:
raise DiscourseError("Client not initialized. Use 'async with' context.")
last_error: Exception | None = None
for attempt in range(MAX_RETRIES):
await self._rate_limit()
try:
response = await self._client.request(method, path, **kwargs)
if response.status_code == 404:
raise UserNotFoundError(f"Resource not found: {path}")
if response.status_code == 401:
raise AuthenticationError(
"Authentication failed. Your cookies may have expired."
)
if response.status_code == 403:
raise AuthenticationError(
"Access forbidden. You may not have permission to view this resource."
)
if response.status_code == 429:
retry_after = int(response.headers.get("Retry-After", 60))
raise RateLimitError(
f"Rate limited. Retry after {retry_after} seconds."
)
response.raise_for_status()
return response.json()
except (httpx.TimeoutException, httpx.NetworkError) as e:
last_error = e
if attempt < MAX_RETRIES - 1:
wait = REQUEST_DELAY * (RETRY_BACKOFF**attempt)
await asyncio.sleep(wait)
continue
raise DiscourseError(
f"Request failed after {MAX_RETRIES} retries: {last_error}"
)
async def get_user(self, username: str) -> UserProfile:
"""
Get user profile information.
Args:
username: The forum username
Returns:
UserProfile object
"""
data = await self._request("GET", f"/u/{username}.json")
user = data.get("user", {})
return UserProfile(
username=user.get("username", username),
name=user.get("name"),
created_at=parse_date(user.get("created_at", "2000-01-01")),
trust_level=user.get("trust_level", 0),
post_count=user.get("post_count", 0),
topics_entered=user.get("topics_entered", 0),
time_read=user.get("time_read", 0),
)
async def get_user_posts(
self,
username: str,
days: int = 30,
max_posts: int = 200,
) -> list[UserPost]:
"""
Get a user's posts from the last N days.
Args:
username: The forum username
days: Number of days to look back
max_posts: Maximum number of posts to fetch
Returns:
List of UserPost objects, newest first
"""
posts: list[UserPost] = []
cutoff = datetime.now(timezone.utc) - timedelta(days=days)
offset = 0
while len(posts) < max_posts:
# Fetch user actions (includes posts)
data = await self._request(
"GET",
f"/user_actions.json",
params={
"username": username,
"filter": "5", # 5 = posts/replies
"offset": offset,
},
)
actions = data.get("user_actions", [])
if not actions:
break
for action in actions:
# Parse the created_at timestamp
created_str = action.get("created_at", "")
if not created_str:
continue
created_at = parse_date(created_str)
if created_at.tzinfo is None:
created_at = created_at.replace(tzinfo=timezone.utc)
# Stop if we've gone past the cutoff
if created_at < cutoff:
return posts
# Extract post data
post = UserPost(
post_id=action.get("post_id", 0),
post_number=action.get("post_number", 1),
topic_id=action.get("topic_id", 0),
topic_title=action.get("title", "Unknown Topic"),
topic_slug=action.get("slug", "topic"),
content_raw=action.get("excerpt", ""), # May be truncated
content_cooked=action.get("excerpt", ""),
created_at=created_at,
category_name=action.get("category_name"),
)
posts.append(post)
if len(posts) >= max_posts:
break
offset += len(actions)
# Safety check - if we got fewer than expected, we're at the end
if len(actions) < 30:
break
return posts
async def get_full_post(self, post_id: int) -> dict[str, Any]:
"""
Get the full content of a specific post.
Args:
post_id: The post ID
Returns:
Post data dictionary
"""
data = await self._request("GET", f"/posts/{post_id}.json")
return data
async def enrich_posts(self, posts: list[UserPost]) -> list[UserPost]:
"""
Enrich posts with full content (the activity feed only has excerpts).
Args:
posts: List of posts with potentially truncated content
Returns:
Posts with full content
"""
enriched: list[UserPost] = []
for post in posts:
try:
full_data = await self.get_full_post(post.post_id)
# Create new post with full content
enriched_post = UserPost(
post_id=post.post_id,
post_number=post.post_number,
topic_id=post.topic_id,
topic_title=post.topic_title,
topic_slug=post.topic_slug,
content_raw=full_data.get("raw", post.content_raw),
content_cooked=full_data.get("cooked", post.content_cooked),
created_at=post.created_at,
category_name=post.category_name,
)
enriched.append(enriched_post)
except DiscourseError:
# If we can't fetch full content, use what we have
enriched.append(post)
return enriched

302
src/eve_mod/report.py Normal file
View File

@@ -0,0 +1,302 @@
"""Report generator for forum moderation analysis."""
from datetime import datetime
from pathlib import Path
from rich.console import Console
from rich.panel import Panel
from rich.table import Table
from rich.text import Text
from .analyzer import PostAnalysis, Violation
from .discourse import UserProfile
class ReportGenerator:
"""Generates moderation reports in terminal and markdown formats."""
def __init__(self, output_dir: Path | str = "reports"):
"""
Initialize the report generator.
Args:
output_dir: Directory to save markdown reports
"""
self.output_dir = Path(output_dir)
self.output_dir.mkdir(parents=True, exist_ok=True)
self.console = Console()
def _severity_color(self, severity: str) -> str:
"""Get color for severity level."""
colors = {
"low": "yellow",
"medium": "orange1",
"high": "red",
"critical": "bold red",
}
return colors.get(severity, "white")
def _severity_emoji(self, severity: str) -> str:
"""Get indicator for severity level (Discourse-friendly)."""
# Using text indicators instead of emoji for broader compatibility
indicators = {
"low": "[LOW]",
"medium": "[MED]",
"high": "[HIGH]",
"critical": "[CRITICAL]",
}
return indicators.get(severity, "[-]")
def print_summary(
self,
user: UserProfile,
analyses: list[PostAnalysis],
days: int,
) -> None:
"""
Print a summary to the terminal.
Args:
user: User profile information
analyses: List of post analyses
days: Number of days analyzed
"""
violations = [a for a in analyses if a.has_violations]
total_violations = sum(len(a.violations) for a in violations)
# Header
self.console.print()
self.console.print(
Panel(
f"[bold]User Review: {user.username}[/bold]\n"
f"Period: Last {days} days\n"
f"Posts Analyzed: {len(analyses)}\n"
f"Posts with Violations: {len(violations)}\n"
f"Total Violations Found: {total_violations}",
title="EVE Forum Moderation Report",
border_style="blue",
)
)
if not violations:
self.console.print(
"\n[green]No rule violations detected in the analyzed posts.[/green]\n"
)
return
# Summary table
table = Table(title="Violations Summary", show_header=True, header_style="bold")
table.add_column("Date", style="dim", width=12)
table.add_column("Topic", width=40, overflow="ellipsis")
table.add_column("Rule", width=25)
table.add_column("Severity", width=10, justify="center")
table.add_column("Confidence", width=10, justify="center")
for analysis in violations:
for v in analysis.violations:
table.add_row(
analysis.post.created_at.strftime("%Y-%m-%d"),
analysis.post.topic_title[:38] + "..."
if len(analysis.post.topic_title) > 40
else analysis.post.topic_title,
f"{v.rule_id}: {v.rule_name}",
Text(v.severity.upper(), style=self._severity_color(v.severity)),
f"{v.confidence:.0%}",
)
self.console.print(table)
self.console.print()
def print_detailed(self, analyses: list[PostAnalysis]) -> None:
"""
Print detailed violation information to the terminal.
Args:
analyses: List of post analyses with violations
"""
violations = [a for a in analyses if a.has_violations]
if not violations:
return
self.console.print("[bold]Detailed Analysis[/bold]\n")
for analysis in violations:
self.console.print(
Panel(
f"[bold]{analysis.post.topic_title}[/bold]\n"
f"[dim]{analysis.post.url}[/dim]\n"
f"Date: {analysis.post.created_at.strftime('%Y-%m-%d %H:%M UTC')}",
border_style="yellow",
)
)
for v in analysis.violations:
self.console.print(
f" [{self._severity_color(v.severity)}]"
f"{v.severity.upper()}[/] - "
f"[bold]Rule {v.rule_id}: {v.rule_name}[/bold] "
f"(confidence: {v.confidence:.0%})"
)
self.console.print(f" [dim]Explanation:[/dim] {v.explanation}")
if v.quote:
self.console.print(f' [dim]Quote:[/dim] "{v.quote}"')
self.console.print()
def generate_markdown(
self,
user: UserProfile,
analyses: list[PostAnalysis],
days: int,
) -> str:
"""
Generate a Discourse-compatible markdown report.
Args:
user: User profile information
analyses: List of post analyses
days: Number of days analyzed
Returns:
Markdown string
"""
violations = [a for a in analyses if a.has_violations]
total_violations = sum(len(a.violations) for a in violations)
lines = [
f"## User Review: {user.username}",
"",
f"**Period:** Last {days} days (ending {datetime.now().strftime('%Y-%m-%d')})",
f"**Posts Analyzed:** {len(analyses)}",
f"**Posts with Violations:** {len(violations)}",
f"**Total Violations Found:** {total_violations}",
"",
]
if not violations:
lines.extend(
[
"---",
"",
"*No rule violations detected in the analyzed posts.*",
"",
]
)
return "\n".join(lines)
# Summary table
lines.extend(
[
"---",
"",
"### Summary",
"",
"| Date | Topic | Rule | Severity | Confidence |",
"|------|-------|------|----------|------------|",
]
)
for analysis in violations:
for v in analysis.violations:
topic_link = f"[{self._escape_md(analysis.post.topic_title[:40])}]({analysis.post.url})"
lines.append(
f"| {analysis.post.created_at.strftime('%Y-%m-%d')} "
f"| {topic_link} "
f"| {v.rule_id}: {v.rule_name} "
f"| {self._severity_emoji(v.severity)} "
f"| {v.confidence:.0%} |"
)
# Detailed analysis
lines.extend(
[
"",
"---",
"",
"### Detailed Analysis",
"",
]
)
for i, analysis in enumerate(violations, 1):
lines.extend(
[
f"#### {i}. [{self._escape_md(analysis.post.topic_title)}]({analysis.post.url})",
f"**Date:** {analysis.post.created_at.strftime('%Y-%m-%d %H:%M UTC')} | "
f"**Category:** {analysis.post.category_name or 'Unknown'}",
"",
]
)
for v in analysis.violations:
lines.extend(
[
f"**{self._severity_emoji(v.severity)} Rule {v.rule_id} - {v.rule_name}** "
f"(Confidence: {v.confidence:.0%})",
"",
]
)
if v.quote:
lines.extend(
[
f"> {self._escape_md(v.quote)}",
"",
]
)
lines.extend(
[
f"*{v.explanation}*",
"",
]
)
lines.append("---")
lines.append("")
# Footer
lines.extend(
[
"*Report generated by EVE Forum Moderator Assistant*",
f"*Generated: {datetime.now().strftime('%Y-%m-%d %H:%M UTC')}*",
]
)
return "\n".join(lines)
def _escape_md(self, text: str) -> str:
"""Escape markdown special characters."""
# Escape characters that could break markdown formatting
for char in ["[", "]", "|", "*", "_", "`"]:
text = text.replace(char, f"\\{char}")
return text
def save_report(
self,
user: UserProfile,
analyses: list[PostAnalysis],
days: int,
filename: str | None = None,
) -> Path:
"""
Save a markdown report to file.
Args:
user: User profile information
analyses: List of post analyses
days: Number of days analyzed
filename: Optional custom filename
Returns:
Path to the saved report
"""
if filename is None:
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f"{user.username}_{timestamp}.md"
filepath = self.output_dir / filename
content = self.generate_markdown(user, analyses, days)
filepath.write_text(content)
return filepath

476
src/eve_mod/rules.py Normal file
View File

@@ -0,0 +1,476 @@
"""EVE Forum Moderation Policy - Structured rule definitions."""
from dataclasses import dataclass
from enum import Enum
from pathlib import Path
class RuleCategory(str, Enum):
"""Categories of forum rules."""
CONDUCT = "conduct"
CONTENT = "content"
OTHER = "other"
class Severity(str, Enum):
"""Severity levels for rule violations."""
LOW = "low"
MEDIUM = "medium"
HIGH = "high"
CRITICAL = "critical"
@dataclass(frozen=True)
class Rule:
"""A forum moderation rule."""
id: str
name: str
category: RuleCategory
description: str
default_severity: Severity
examples: tuple[str, ...] = ()
def __str__(self) -> str:
return f"{self.id}: {self.name}"
# Prohibited Conduct Rules (1.x)
CONDUCT_RULES = (
Rule(
id="1.1",
name="Trolling",
category=RuleCategory.CONDUCT,
description="Posting inflammatory, extraneous, or off-topic messages with the intent of provoking an emotional response or disrupting normal discussion.",
default_severity=Severity.MEDIUM,
examples=(
"Deliberately misrepresenting someone's position to provoke anger",
"Posting bait designed to start arguments",
),
),
Rule(
id="1.2",
name="Flaming",
category=RuleCategory.CONDUCT,
description="Hostile and insulting interaction between forum users. This includes direct insults, name-calling, and aggressive language directed at other players.",
default_severity=Severity.MEDIUM,
examples=(
"Calling another player an idiot or similar insult",
"Aggressive personal attacks in response to disagreement",
),
),
Rule(
id="1.3",
name="Ranting",
category=RuleCategory.CONDUCT,
description="Posting lengthy, emotional complaints that do not contribute to constructive discussion.",
default_severity=Severity.LOW,
examples=(
"Long posts consisting entirely of complaints without suggestions",
"Emotional outbursts without constructive content",
),
),
Rule(
id="1.4",
name="Personal Attacks",
category=RuleCategory.CONDUCT,
description="Attacking another player personally rather than addressing their arguments or ideas.",
default_severity=Severity.MEDIUM,
examples=(
"Attacking someone's character instead of their argument",
"Making personal accusations unrelated to the discussion",
),
),
Rule(
id="1.5",
name="Harassment",
category=RuleCategory.CONDUCT,
description="Repeated unwanted contact or attention directed at a specific player, including following them across threads to attack or belittle them.",
default_severity=Severity.HIGH,
examples=(
"Following a player to multiple threads to continue arguments",
"Repeatedly tagging or mentioning someone to provoke them",
),
),
Rule(
id="1.6",
name="Doxxing",
category=RuleCategory.CONDUCT,
description="Revealing or threatening to reveal real-life personal information about another player without their consent.",
default_severity=Severity.CRITICAL,
examples=(
"Posting someone's real name without consent",
"Threatening to reveal personal information",
),
),
Rule(
id="1.7",
name="Racism & Discrimination",
category=RuleCategory.CONDUCT,
description="Any form of discrimination based on race, ethnicity, national origin, or similar characteristics.",
default_severity=Severity.CRITICAL,
examples=(
"Racial slurs or epithets",
"Stereotyping based on ethnicity or national origin",
),
),
Rule(
id="1.8",
name="Hate Speech",
category=RuleCategory.CONDUCT,
description="Speech that attacks a person or group on the basis of protected attributes such as race, religion, ethnic origin, national origin, sex, disability, sexual orientation, or gender identity.",
default_severity=Severity.CRITICAL,
examples=(
"Slurs targeting protected groups",
"Advocating violence against protected groups",
),
),
Rule(
id="1.9",
name="Sexism",
category=RuleCategory.CONDUCT,
description="Discrimination or prejudice based on sex or gender.",
default_severity=Severity.HIGH,
examples=(
"Demeaning comments based on gender",
"Sexist stereotypes or assumptions",
),
),
Rule(
id="1.10",
name="Spamming",
category=RuleCategory.CONDUCT,
description="Posting the same or similar content repeatedly, or posting content with no meaningful contribution to discussion.",
default_severity=Severity.LOW,
examples=(
"Posting the same message in multiple threads",
"Repetitive one-word responses",
),
),
Rule(
id="1.11",
name="Bumping",
category=RuleCategory.CONDUCT,
description="Posting simply to move a thread to the top of the forum without adding meaningful content.",
default_severity=Severity.LOW,
examples=(
"Posts that just say 'bump'",
"Empty or meaningless posts to keep thread visible",
),
),
Rule(
id="1.12",
name="Off-Topic Posting",
category=RuleCategory.CONDUCT,
description="Posting content that is not relevant to the thread topic or forum section.",
default_severity=Severity.LOW,
examples=(
"Discussing unrelated games in EVE forums",
"Derailing discussions with unrelated content",
),
),
Rule(
id="1.13",
name="Pyramid Quoting",
category=RuleCategory.CONDUCT,
description="Excessive nested quoting that makes posts difficult to read and disrupts discussion flow.",
default_severity=Severity.LOW,
examples=("Quoting entire posts multiple levels deep",),
),
Rule(
id="1.14",
name="Rumor Mongering",
category=RuleCategory.CONDUCT,
description="Spreading unverified information or speculation as fact, particularly regarding CCP decisions, policies, or future plans.",
default_severity=Severity.MEDIUM,
examples=(
"Presenting speculation about CCP decisions as fact",
"Spreading unverified claims about game changes",
),
),
Rule(
id="1.15",
name="New Player Bashing",
category=RuleCategory.CONDUCT,
description="Hostile or dismissive behavior toward new players asking questions or learning the game.",
default_severity=Severity.MEDIUM,
examples=(
"Mocking new players for asking basic questions",
"Telling new players to quit instead of helping",
),
),
Rule(
id="1.16",
name="Impersonation",
category=RuleCategory.CONDUCT,
description="Pretending to be another player, CCP employee, or ISD volunteer.",
default_severity=Severity.HIGH,
examples=(
"Claiming to be a CCP developer",
"Using names similar to known players to deceive",
),
),
Rule(
id="1.17",
name="Advertising",
category=RuleCategory.CONDUCT,
description="Promoting products, services, or websites unrelated to EVE Online.",
default_severity=Severity.MEDIUM,
examples=(
"Promoting external products or services",
"Posting referral links for non-EVE services",
),
),
)
# Prohibited Content Rules (2.x)
CONTENT_RULES = (
Rule(
id="2.1",
name="Pornography",
category=RuleCategory.CONTENT,
description="Sexually explicit images or content.",
default_severity=Severity.CRITICAL,
examples=("Posting explicit images", "Linking to adult content"),
),
Rule(
id="2.2",
name="Profanity",
category=RuleCategory.CONTENT,
description="Excessive or extreme profanity, particularly when directed at others.",
default_severity=Severity.MEDIUM,
examples=(
"Excessive swearing in posts",
"Using profanity as personal attacks",
),
),
Rule(
id="2.3",
name="Real Money Trading (RMT)",
category=RuleCategory.CONTENT,
description="Discussion of, solicitation for, or promotion of real money trading of in-game items, ISK, or accounts.",
default_severity=Severity.HIGH,
examples=(
"Offering to sell ISK for real money",
"Advertising account sales",
),
),
Rule(
id="2.4",
name="Discussion of Warnings & Bans",
category=RuleCategory.CONTENT,
description="Public discussion of warnings, bans, or other disciplinary actions taken against any player.",
default_severity=Severity.MEDIUM,
examples=(
"Complaining about receiving a warning",
"Discussing why another player was banned",
),
),
Rule(
id="2.5",
name="Discussion of Moderation",
category=RuleCategory.CONTENT,
description="Publicly discussing or disputing moderation decisions. Appeals should be handled through proper support channels.",
default_severity=Severity.MEDIUM,
examples=(
"Complaining about thread locks",
"Disputing moderator actions publicly",
),
),
Rule(
id="2.6",
name="Private Communications with CCP",
category=RuleCategory.CONTENT,
description="Sharing private communications with CCP staff without permission.",
default_severity=Severity.MEDIUM,
examples=(
"Posting GM responses",
"Sharing private CCP correspondence",
),
),
Rule(
id="2.7",
name="In-Game Bugs & Exploits",
category=RuleCategory.CONTENT,
description="Public discussion of bugs or exploits that could be abused. These should be reported through proper bug reporting channels.",
default_severity=Severity.HIGH,
examples=(
"Describing how to reproduce an exploit",
"Sharing details of game bugs publicly",
),
),
Rule(
id="2.8",
name="Real World Religion",
category=RuleCategory.CONTENT,
description="Discussion of real-world religious beliefs or practices.",
default_severity=Severity.MEDIUM,
examples=(
"Discussing real-world religious topics",
"Religious debates unrelated to EVE lore",
),
),
Rule(
id="2.9",
name="Real World Politics",
category=RuleCategory.CONTENT,
description="Discussion of real-world political topics, parties, or figures.",
default_severity=Severity.MEDIUM,
examples=(
"Discussing real-world elections",
"Political debates unrelated to EVE",
),
),
Rule(
id="2.10",
name="Layout-Distorting Content",
category=RuleCategory.CONTENT,
description="Images, text, or formatting that disrupts the normal forum layout.",
default_severity=Severity.LOW,
examples=(
"Oversized images",
"Formatting that breaks page layout",
),
),
Rule(
id="2.11",
name="Advertising Other Games/Services",
category=RuleCategory.CONTENT,
description="Promoting other games or competing services.",
default_severity=Severity.MEDIUM,
examples=(
"Promoting competing MMOs",
"Advertising other gaming services",
),
),
Rule(
id="2.12",
name="Real Financial Transfer Requests",
category=RuleCategory.CONTENT,
description="Soliciting real money from other players.",
default_severity=Severity.HIGH,
examples=(
"Asking for PayPal donations",
"GoFundMe or similar solicitations",
),
),
)
# Other Rules (3-9)
OTHER_RULES = (
Rule(
id="3",
name="Non-Constructive Posting",
category=RuleCategory.OTHER,
description="Posts should contribute positively to discussion. Posts that exist solely to complain, mock, or derail without offering constructive feedback may be moderated.",
default_severity=Severity.LOW,
examples=(
"Posts that only mock without substance",
"Pure negativity without constructive criticism",
),
),
Rule(
id="4",
name="Abuse of Forum Tools",
category=RuleCategory.OTHER,
description="Misusing forum features such as flagging, voting, or reporting in bad faith.",
default_severity=Severity.MEDIUM,
examples=(
"Mass flagging posts you disagree with",
"Coordinated abuse of voting systems",
),
),
Rule(
id="5",
name="Re-Opening Locked Topics",
category=RuleCategory.OTHER,
description="Creating new threads to continue discussion from threads that have been locked by moderators.",
default_severity=Severity.MEDIUM,
examples=("Creating new thread to continue locked discussion",),
),
Rule(
id="6",
name="Language Requirements",
category=RuleCategory.OTHER,
description="Posts in English-language forum sections must be in English. Other language sections have their own requirements.",
default_severity=Severity.LOW,
examples=("Posting in non-English in English sections",),
),
Rule(
id="7",
name="Attacking CCP/ISD Staff",
category=RuleCategory.OTHER,
description="Personal attacks or harassment directed at CCP staff or ISD volunteers is strictly prohibited.",
default_severity=Severity.CRITICAL,
examples=(
"Insulting CCP developers personally",
"Harassing ISD volunteers",
),
),
Rule(
id="8",
name="Section-Specific Rules",
category=RuleCategory.OTHER,
description="Individual forum sections may have additional rules that must be followed.",
default_severity=Severity.LOW,
examples=("Violating Marketplace section rules",),
),
Rule(
id="9",
name="Killmail/Chatlog Abuse",
category=RuleCategory.OTHER,
description="Using killmails or chat logs primarily to troll, flame, or harass other players rather than for legitimate discussion.",
default_severity=Severity.MEDIUM,
examples=(
"Posting killmails solely to mock someone",
"Sharing chat logs to harass",
),
),
)
# All rules combined
ALL_RULES: tuple[Rule, ...] = CONDUCT_RULES + CONTENT_RULES + OTHER_RULES
# Quick lookup by ID
RULES_BY_ID: dict[str, Rule] = {rule.id: rule for rule in ALL_RULES}
def get_rule(rule_id: str) -> Rule | None:
"""Get a rule by its ID."""
return RULES_BY_ID.get(rule_id)
def format_rules_for_prompt() -> str:
"""Format all rules as a string suitable for LLM system prompts."""
lines = ["# EVE Online Forum Moderation Rules\n"]
lines.append("## Prohibited Conduct (Rules 1.x)\n")
for rule in CONDUCT_RULES:
lines.append(f"### {rule.id} - {rule.name}")
lines.append(f"{rule.description}")
lines.append(f"Default severity: {rule.default_severity.value.upper()}\n")
lines.append("## Prohibited Content (Rules 2.x)\n")
for rule in CONTENT_RULES:
lines.append(f"### {rule.id} - {rule.name}")
lines.append(f"{rule.description}")
lines.append(f"Default severity: {rule.default_severity.value.upper()}\n")
lines.append("## Other Rules (Rules 3-9)\n")
for rule in OTHER_RULES:
lines.append(f"### {rule.id} - {rule.name}")
lines.append(f"{rule.description}")
lines.append(f"Default severity: {rule.default_severity.value.upper()}\n")
return "\n".join(lines)
def load_policy_markdown() -> str:
"""Load the full policy markdown file."""
policy_path = Path(__file__).parent.parent.parent / "rules" / "forum_policy.md"
if policy_path.exists():
return policy_path.read_text()
# Fallback to generated format
return format_rules_for_prompt()