from langgraph.graph import StateGraph, END from src.schema.state import WikiState from src.agents.harvester import harvester_node from src.agents.esi_collector import esi_collector_node from src.agents.validation import validation_agent_node from typing import Dict, Any # --- Nodes --- import git import os async def git_sync_node(state: WikiState) -> Dict[str, Any]: """Infrastructure: Sync content to Git SSOT.""" print("Node: Git Sync") if not state.current_page: return {} repo_path = os.getcwd() repo = git.Repo(repo_path) # Ensure directory exists file_path = f"content/{state.current_page.path}.md" os.makedirs(os.path.dirname(file_path), exist_ok=True) # Write content with open(file_path, "w") as f: # Write frontmatter (simplified) f.write("---\n") f.write(f"title: {state.current_page.title}\n") f.write(f"source: {state.current_page.metadata.source}\n") f.write("---\n\n") f.write(state.current_page.content_markdown) # Commit and Push try: repo.index.add([file_path]) repo.index.commit(f"[AGENT] Update: {state.current_page.path}") repo.remotes.origin.push() print(f"Successfully pushed {file_path} to Git.") except Exception as e: print(f"Git Sync Failed: {str(e)}") return {} from src.clients.wikijs import WikiJSClient async def wikijs_node(state: WikiState) -> Dict[str, Any]: """Agent API: Publish to Wiki.js UI.""" print("Node: Wiki.js API") if not state.current_page: return {} wiki = WikiJSClient() try: res = await wiki.upsert_page( path=state.current_page.path, title=state.current_page.title, content=state.current_page.content_markdown, description=f"Automated sync from {state.current_page.metadata.source}", tags=state.current_page.metadata.categories ) print(f"Successfully published to Wiki.js: {state.current_page.path}") except Exception as e: print(f"Wiki.js Publication Failed: {str(e)}") finally: await wiki.close() return {} # --- Routing --- def should_publish(state: WikiState) -> str: """Route to Git/Wiki or Halt based on approval.""" if state.is_approved: return "publish" return "halt" # --- Graph Construction --- def create_wiki_graph(): workflow = StateGraph(WikiState) # Add Nodes workflow.add_node("harvester", harvester_node) workflow.add_node("esi_collector", esi_collector_node) workflow.add_node("validation", validation_agent_node) workflow.add_node("git_sync", git_sync_node) workflow.add_node("wikijs_api", wikijs_node) # Define Edges workflow.set_entry_point("harvester") workflow.add_edge("harvester", "esi_collector") workflow.add_edge("esi_collector", "validation") # Conditional Routing workflow.add_conditional_edges( "validation", should_publish, { "publish": "git_sync", "halt": END } ) workflow.add_edge("git_sync", "wikijs_api") workflow.add_edge("wikijs_api", END) return workflow.compile() if __name__ == "__main__": import asyncio async def run_test(): graph = create_wiki_graph() print("--- Starting Wiki Automation Test (Condor) ---") initial_state = { "thread_id": "test-run-1", "retry_count": 0, "is_approved": False } async for event in graph.astream(initial_state): for node_name, output in event.items(): print(f"\n[Node: {node_name}]") if output and isinstance(output, dict) and "error" in output: print(f"Error: {output['error']}") print("\n--- Test Run Complete ---") asyncio.run(run_test())