from __future__ import annotations from contextlib import asynccontextmanager from fastapi import FastAPI, HTTPException, Query from fastapi.middleware.cors import CORSMiddleware from .models import EdgesResponse, GraphResponse, NodesResponse, SparqlQueryRequest, StatsResponse from .pipelines.snapshot_service import GraphSnapshotService from .rdf_store import RDFStore from .sparql_engine import RdflibEngine, SparqlEngine, create_sparql_engine from .settings import Settings settings = Settings() @asynccontextmanager async def lifespan(app: FastAPI): sparql: SparqlEngine = create_sparql_engine(settings) await sparql.startup() app.state.sparql = sparql app.state.snapshot_service = GraphSnapshotService(sparql=sparql, settings=settings) # Only build node/edge tables when running in rdflib mode. if settings.graph_backend == "rdflib": assert isinstance(sparql, RdflibEngine) if sparql.graph is None: raise RuntimeError("rdflib graph failed to load") store = RDFStore( ttl_path=settings.ttl_path, include_bnodes=settings.include_bnodes, max_triples=settings.max_triples, ) store.load(sparql.graph) app.state.store = store yield await sparql.shutdown() app = FastAPI(title="visualizador_instanciados backend", lifespan=lifespan) cors_origins = settings.cors_origin_list() app.add_middleware( CORSMiddleware, allow_origins=cors_origins, allow_credentials=False, allow_methods=["*"], allow_headers=["*"], ) @app.get("/api/health") def health() -> dict[str, str]: return {"status": "ok"} @app.get("/api/stats", response_model=StatsResponse) async def stats() -> StatsResponse: # Stats reflect exactly what we send to the frontend (/api/graph), not global graph size. svc: GraphSnapshotService = app.state.snapshot_service snap = await svc.get(node_limit=50_000, edge_limit=100_000) meta = snap.meta return StatsResponse( backend=meta.backend if meta else app.state.sparql.name, ttl_path=meta.ttl_path if meta and meta.ttl_path else settings.ttl_path, sparql_endpoint=meta.sparql_endpoint if meta else None, parsed_triples=len(snap.edges), nodes=len(snap.nodes), edges=len(snap.edges), ) @app.post("/api/sparql") async def sparql_query(req: SparqlQueryRequest) -> dict: sparql: SparqlEngine = app.state.sparql data = await sparql.query_json(req.query) return data @app.get("/api/nodes", response_model=NodesResponse) def nodes( limit: int = Query(default=10_000, ge=1, le=200_000), offset: int = Query(default=0, ge=0), ) -> NodesResponse: if settings.graph_backend != "rdflib": raise HTTPException(status_code=501, detail="GET /api/nodes is only supported in GRAPH_BACKEND=rdflib mode") store: RDFStore = app.state.store return NodesResponse(total=store.node_count, nodes=store.node_slice(offset=offset, limit=limit)) @app.get("/api/edges", response_model=EdgesResponse) def edges( limit: int = Query(default=50_000, ge=1, le=500_000), offset: int = Query(default=0, ge=0), ) -> EdgesResponse: if settings.graph_backend != "rdflib": raise HTTPException(status_code=501, detail="GET /api/edges is only supported in GRAPH_BACKEND=rdflib mode") store: RDFStore = app.state.store return EdgesResponse(total=store.edge_count, edges=store.edge_slice(offset=offset, limit=limit)) @app.get("/api/graph", response_model=GraphResponse) async def graph( node_limit: int = Query(default=50_000, ge=1, le=200_000), edge_limit: int = Query(default=100_000, ge=1, le=500_000), ) -> GraphResponse: svc: GraphSnapshotService = app.state.snapshot_service return await svc.get(node_limit=node_limit, edge_limit=edge_limit)