Compare commits

..

4 Commits

Author SHA1 Message Date
Oxy8
5badcd8d6f Visualizando todo grafo com anzograph 2026-03-10 17:21:47 -03:00
Oxy8
a0c5bec19f 32bit Node ID 2026-03-06 16:10:52 -03:00
Oxy8
3c487d088b Add filter, add READMES 2026-03-06 15:35:04 -03:00
Oxy8
b44867abfa midpoint - go 2026-03-05 15:39:47 -03:00
63 changed files with 2942 additions and 1722 deletions

44
.env.example Normal file
View File

@@ -0,0 +1,44 @@
# TTL path used as a fallback by the owl-imports combiner when COMBINE_ENTRY_LOCATION is not set.
TTL_PATH=/data/merged_ontologies_pruned_patched.ttl
# Backend behavior
INCLUDE_BNODES=false
# Graph snapshot limits (used by Go backend defaults/validation)
DEFAULT_NODE_LIMIT=800000
DEFAULT_EDGE_LIMIT=2000000
MAX_NODE_LIMIT=10000000
MAX_EDGE_LIMIT=20000000
# SPARQL paging: number of triples per batch (LIMIT/OFFSET)
EDGE_BATCH_SIZE=100000
# Owl imports combiner service (docker-compose `owl_imports_combiner`)
COMBINE_OWL_IMPORTS_ON_START=true
COMBINE_ENTRY_LOCATION=/data/vkg.ttl
COMBINE_OUTPUT_LOCATION=/data/vkg_full.ttl
# COMBINE_OUTPUT_NAME=combined_ontology.ttl # Only used if COMBINE_OUTPUT_LOCATION is not set.
COMBINE_FORCE=true
# AnzoGraph / SPARQL endpoint settings
SPARQL_HOST=http://anzograph:8080
# SPARQL_ENDPOINT=http://anzograph:8080/sparql
SPARQL_USER=admin
SPARQL_PASS=Passw0rd1
# File URI as seen by the AnzoGraph container (used by SPARQL `LOAD`) # Currently not used.
SPARQL_DATA_FILE=file:///opt/shared-files/o3po.ttl # Currently not used.
# SPARQL_GRAPH_IRI=http://example.org/graph
# Startup behavior for AnzoGraph mode
SPARQL_LOAD_ON_START=false
SPARQL_CLEAR_ON_START=false
SPARQL_READY_TIMEOUT_S=10
# Dev UX
CORS_ORIGINS=http://localhost:5173
VITE_BACKEND_URL=http://backend:8000
# Debugging
LOG_SNAPSHOT_TIMINGS=false
FREE_OS_MEMORY_AFTER_SNAPSHOT=false

108
README.md Normal file
View File

@@ -0,0 +1,108 @@
# Visualizador Instanciados
This repo is a Docker Compose stack for visualizing large RDF/OWL graphs stored in **AnzoGraph**. It includes:
- A **Go backend** that queries AnzoGraph via SPARQL and serves a cached graph snapshot + selection queries.
- A **React/Vite frontend** that renders nodes/edges with WebGL2 and supports “selection query” + “graph query” modes.
- A **Python one-shot service** to combine `owl:imports` into a single Turtle file.
- An **AnzoGraph** container (SPARQL endpoint).
## Quick start (Docker Compose)
1) Put your TTL file(s) in `./data/` (this folder is volume-mounted into AnzoGraph as `/opt/shared-files`).
2) Optionally configure `.env` (see `.env.example`).
3) Start the stack:
```bash
docker compose up --build
```
Then open the frontend:
- `http://localhost:5173`
Stop everything:
```bash
docker compose down
```
## Services
Defined in `docker-compose.yml`:
- `anzograph` (image `cambridgesemantics/anzograph:latest`)
- Ports: `8080`, `8443`
- Shared files: `./data → /opt/shared-files`
- `backend` (`./backend_go`)
- Port: `8000` (API under `/api/*`)
- Talks to AnzoGraph at `SPARQL_HOST` / `SPARQL_ENDPOINT`
- `frontend` (`./frontend`)
- Port: `5173`
- Proxies `/api/*` to `VITE_BACKEND_URL`
- `owl_imports_combiner` (`./python_services/owl_imports_combiner`)
- One-shot: optionally produces a combined TTL by following `owl:imports`
Service READMEs:
- `backend_go/README.md`
- `frontend/README.md`
- `python_services/owl_imports_combiner/README.md`
- `anzograph/README.md`
## Repo layout
- `backend_go/` Go API service (SPARQL → snapshot + selection queries)
- `frontend/` React/Vite WebGL renderer
- `python_services/owl_imports_combiner/` Python one-shot OWL imports combiner
- `data/` local shared volume for TTL inputs/outputs (gitignored)
- `docker-compose.yml` service wiring
- `flake.nix` optional Nix dev shell
## Configuration
This repo expects a local `.env` file (not committed). Start from `.env.example`.
Common knobs:
- Backend snapshot size: `DEFAULT_NODE_LIMIT`, `DEFAULT_EDGE_LIMIT`, `MAX_NODE_LIMIT`, `MAX_EDGE_LIMIT`
- SPARQL connectivity: `SPARQL_HOST` or `SPARQL_ENDPOINT`, plus `SPARQL_USER` / `SPARQL_PASS`
- Load data on backend startup: `SPARQL_LOAD_ON_START=true` with `SPARQL_DATA_FILE=file:///opt/shared-files/<file>.ttl`
- Frontend → backend proxy: `VITE_BACKEND_URL`
## API (backend)
Base URL: `http://localhost:8000`
- `GET /api/health` liveness
- `GET /api/stats` snapshot stats (uses default limits)
- `GET /api/graph` graph snapshot
- Query params: `node_limit`, `edge_limit`, `graph_query_id`
- `GET /api/graph_queries` available graph snapshot modes (`graph_query_id` values)
- `GET /api/selection_queries` available selection-highlight modes (`query_id` values)
- `POST /api/selection_query` run a selection query for highlighted neighbors
- Body: `{"query_id":"neighbors","selected_ids":[...],"node_limit":...,"edge_limit":...,"graph_query_id":"default"}`
- `POST /api/sparql` raw SPARQL passthrough (debug/advanced)
- `POST /api/neighbors` legacy alias (same behavior as `query_id="neighbors"`)
## Frontend UI
- Mouse:
- Drag: pan
- Scroll: zoom
- Click: select nodes
- **Top-right buttons:** “selection query” mode (how neighbors/highlights are computed for the current selection)
- **Bottom-right buttons:** “graph query” mode (which SPARQL edge set is used to build the graph snapshot; switching reloads the graph)
## Notes on performance/limits
- The backend caches snapshots in memory; tune `DEFAULT_*_LIMIT` if memory is too high.
- The frontend renders a sampled subset when zoomed out, and only draws edges when fewer than ~20k nodes are visible.
## Nix dev shell (optional)
If you use Nix, `flake.nix` provides a minimal `devShell`:
```bash
nix develop
```

0
Requisitos.md Normal file
View File

38
anzograph/README.md Normal file
View File

@@ -0,0 +1,38 @@
# AnzoGraph (Docker Compose service)
This repo runs AnzoGraph as an external container image:
- Image: `cambridgesemantics/anzograph:latest`
- Ports: `8080` (HTTP), `8443` (HTTPS)
- Volume: `./data → /opt/shared-files`
The backend connects to AnzoGraph via:
- `SPARQL_HOST` (default `http://anzograph:8080`) and the `/sparql` path, or
- an explicit `SPARQL_ENDPOINT`
## Persistence
The `docker-compose.yml` config mounts named volumes into the AnzoGraph container so its state survives
container recreation (e.g. `docker compose up --force-recreate`):
- `anzograph_app_home → /opt/anzograph/app-home` (machine-id / user home)
- `anzograph_persistence → /opt/anzograph/persistence` (database persistence dir)
- `anzograph_config → /opt/anzograph/config` (settings + activation markers)
- `anzograph_internal → /opt/anzograph/internal` (internal state, including EULA acceptance marker)
To fully reset AnzoGraph state, remove volumes with `docker compose down -v`.
## Loading data
The backend can optionally load a TTL file on startup (after AnzoGraph is ready):
- `SPARQL_LOAD_ON_START=true`
- `SPARQL_DATA_FILE=file:///opt/shared-files/<file>.ttl`
Because `./data` is mounted at `/opt/shared-files`, anything placed in `./data` is accessible via a `file:///opt/shared-files/...` URI.
## Notes
- Authentication defaults are configured via the backend env (`SPARQL_USER` / `SPARQL_PASS`).
- The AnzoGraph container in this repo is not customized; consult the upstream image documentation for persistence, licensing, and advanced configuration.

View File

@@ -1,197 +0,0 @@
# Backend App (`backend/app`)
This folder contains the FastAPI backend for `visualizador_instanciados`.
The backend can execute SPARQL queries in two interchangeable ways:
1. **`GRAPH_BACKEND=rdflib`**: parse a Turtle file into an in-memory RDFLib `Graph` and run SPARQL queries locally.
2. **`GRAPH_BACKEND=anzograph`**: run SPARQL queries against an AnzoGraph SPARQL endpoint over HTTP (optionally `LOAD` a TTL on startup).
Callers (frontend or other clients) interact with a single API surface (`/api/*`) and do not need to know which backend is configured.
## Files
- `main.py`
- FastAPI app setup, startup/shutdown (`lifespan`), and HTTP endpoints.
- `settings.py`
- Env-driven configuration (`pydantic-settings`).
- `sparql_engine.py`
- Backend-agnostic SPARQL execution layer:
- `RdflibEngine`: `Graph.query(...)` + SPARQL JSON serialization.
- `AnzoGraphEngine`: HTTP POST to `/sparql` with Basic auth + readiness gate.
- `create_sparql_engine(settings)` chooses the engine based on `GRAPH_BACKEND`.
- `graph_export.py`
- Shared helpers to:
- build the snapshot SPARQL query used for edge retrieval
- map SPARQL JSON bindings to `{nodes, edges}`.
- `models.py`
- Pydantic response/request models:
- `Node`, `Edge`, `GraphResponse`, `StatsResponse`, etc.
- `rdf_store.py`
- A local parsed representation (dense IDs + neighbor-ish data) built only in `GRAPH_BACKEND=rdflib`.
- Used by `/api/nodes`, `/api/edges`, and `rdflib`-mode `/api/stats`.
- `pipelines/graph_snapshot.py`
- Pipeline used by `/api/graph` to return a `{nodes, edges}` snapshot via SPARQL (works for both RDFLib and AnzoGraph).
- `pipelines/layout_dag_radial.py`
- DAG layout helpers used by `pipelines/graph_snapshot.py`:
- cycle detection
- level-synchronous Kahn layering
- radial (ring-per-layer) positioning.
- `pipelines/snapshot_service.py`
- Snapshot cache layer used by `/api/graph` and `/api/stats` so the backend doesn't run expensive SPARQL twice.
- `pipelines/subclass_labels.py`
- Pipeline to extract `rdfs:subClassOf` entities and aligned `rdfs:label` list.
## Runtime Flow
On startup (FastAPI lifespan):
1. `create_sparql_engine(settings)` selects and starts a SPARQL engine.
2. The engine is stored at `app.state.sparql`.
3. If `GRAPH_BACKEND=rdflib`, `RDFStore` is also built from the already-loaded RDFLib graph and stored at `app.state.store`.
On shutdown:
- `app.state.sparql.shutdown()` is called to close the HTTP client (AnzoGraph mode) or no-op (RDFLib mode).
## Environment Variables
Most configuration is intended to be provided via container environment variables (see repo root `.env` and `docker-compose.yml`).
Core:
- `GRAPH_BACKEND`: `rdflib` or `anzograph`
- `INCLUDE_BNODES`: `true`/`false`
- `CORS_ORIGINS`: comma-separated list or `*`
RDFLib mode:
- `TTL_PATH`: path inside the backend container to a `.ttl` file (example: `/data/o3po.ttl`)
- `MAX_TRIPLES`: optional int; if set, stops parsing after this many triples
Optional import-combining step (runs before the SPARQL engine starts):
- `COMBINE_OWL_IMPORTS_ON_START`: `true` to recursively load `TTL_PATH` (or `COMBINE_ENTRY_LOCATION`) plus `owl:imports` and write a combined TTL file.
- `COMBINE_ENTRY_LOCATION`: optional override for the entry file/URL to load (defaults to `TTL_PATH`)
- `COMBINE_OUTPUT_LOCATION`: optional explicit output path (defaults to `${dirname(entry)}/${COMBINE_OUTPUT_NAME}`)
- `COMBINE_OUTPUT_NAME`: output filename when `COMBINE_OUTPUT_LOCATION` is not set (default: `combined_ontology.ttl`)
- `COMBINE_FORCE`: `true` to rebuild even if the output file already exists
AnzoGraph mode:
- `SPARQL_HOST`: base host (example: `http://anzograph:8080`)
- `SPARQL_ENDPOINT`: optional full endpoint; if set, overrides `${SPARQL_HOST}/sparql`
- `SPARQL_USER`, `SPARQL_PASS`: Basic auth credentials
- `SPARQL_DATA_FILE`: file URI as seen by the **AnzoGraph container** (example: `file:///opt/shared-files/o3po.ttl`)
- `SPARQL_GRAPH_IRI`: optional graph IRI for `LOAD ... INTO GRAPH <...>`
- `SPARQL_LOAD_ON_START`: `true` to execute `LOAD <SPARQL_DATA_FILE>` during startup
- `SPARQL_CLEAR_ON_START`: `true` to execute `CLEAR ALL` during startup (dangerous)
- `SPARQL_TIMEOUT_S`: request timeout for normal SPARQL requests
- `SPARQL_READY_RETRIES`, `SPARQL_READY_DELAY_S`, `SPARQL_READY_TIMEOUT_S`: readiness gate parameters
## AnzoGraph Readiness Gate
`AnzoGraphEngine` does not assume "container started" means "SPARQL works".
It waits for a smoke-test POST:
- Method: `POST ${SPARQL_ENDPOINT}`
- Headers:
- `Content-Type: application/x-www-form-urlencoded`
- `Accept: application/sparql-results+json`
- `Authorization: Basic ...` (if configured)
- Body: `query=ASK WHERE { ?s ?p ?o }`
- Success condition: HTTP 2xx and response parses as JSON
This matches the behavior described in `docs/anzograph-readiness-julia.md`.
## API Endpoints
- `GET /api/health`
- Returns `{ "status": "ok" }`.
- `GET /api/stats`
- Returns counts for the same snapshot used by `/api/graph` (via the snapshot cache).
- `POST /api/sparql`
- Body: `{ "query": "<SPARQL SELECT/ASK>" }`
- Returns SPARQL JSON results as-is.
- Notes:
- This endpoint is intended for **SELECT/ASK returning SPARQL-JSON**.
- SPARQL UPDATE is not exposed here (AnzoGraph `LOAD`/`CLEAR` are handled internally during startup).
- `GET /api/graph?node_limit=...&edge_limit=...`
- Returns a graph snapshot as `{ nodes: [...], edges: [...] }`.
- Implemented as a SPARQL edge query + mapping in `pipelines/graph_snapshot.py`.
- `GET /api/nodes`, `GET /api/edges`
- Only available in `GRAPH_BACKEND=rdflib` (these use `RDFStore`'s dense ID tables).
## Data Contract
### Node
Returned in `nodes[]` (dense IDs; suitable for indexing in typed arrays):
```json
{
"id": 0,
"termType": "uri",
"iri": "http://example.org/Thing",
"label": null,
"x": 0.0,
"y": 0.0
}
```
- `id`: integer dense node ID used in edges
- `termType`: `"uri"` or `"bnode"`
- `iri`: URI string; blank nodes are normalized to `_:<id>`
- `label`: `rdfs:label` when available (best-effort; prefers English)
- `x`/`y`: world-space coordinates for rendering (currently a radial layered layout derived from `rdfs:subClassOf`)
### Edge
Returned in `edges[]`:
```json
{
"source": 0,
"target": 12,
"predicate": "http://www.w3.org/2000/01/rdf-schema#subClassOf"
}
```
- `source`/`target`: dense node IDs (indexes into `nodes[]`)
- `predicate`: predicate IRI string
## Snapshot Query (`/api/graph`)
`/api/graph` currently uses a SPARQL query that returns only `rdfs:subClassOf` edges:
- selects bindings as `?s ?p ?o` (with `?p` bound to `rdfs:subClassOf`)
- excludes literal objects (`FILTER(!isLiteral(?o))`) for safety
- optionally excludes blank nodes (unless `INCLUDE_BNODES=true`)
- applies `LIMIT edge_limit`
The result bindings are mapped to dense node IDs (first-seen order) and returned to the caller.
`/api/graph` also returns `meta` with snapshot counts and engine info so the frontend doesn't need to call `/api/stats`.
If a cycle is detected in the returned `rdfs:subClassOf` snapshot, `/api/graph` returns HTTP 422 (layout requires a DAG).
## Pipelines
### `pipelines/graph_snapshot.py`
`fetch_graph_snapshot(...)` is the main "export graph" pipeline used by `/api/graph`.
### `pipelines/subclass_labels.py`
`extract_subclass_entities_and_labels(...)`:
1. Queries all `rdfs:subClassOf` triples.
2. Builds a unique set of subjects+objects, then converts it to a deterministic list.
3. Queries `rdfs:label` for those entities and returns aligned lists:
- `entities[i]` corresponds to `labels[i]`.
## Notes / Tradeoffs
- `/api/graph` returns only nodes that appear in the returned edge result set. Nodes not referenced by those edges will not be present.
- RDFLib and AnzoGraph may differ in supported SPARQL features (vendor extensions, inference, performance), but the API surface is the same.
- `rdf_store.py` is currently only needed for `/api/nodes`, `/api/edges`, and rdflib-mode `/api/stats`. If you don't use those endpoints, it can be removed later.

View File

@@ -1 +0,0 @@

View File

@@ -1,102 +0,0 @@
from __future__ import annotations
from typing import Any
def edge_retrieval_query(*, edge_limit: int, include_bnodes: bool) -> str:
bnode_filter = "" if include_bnodes else "FILTER(!isBlank(?s) && !isBlank(?o))"
return f"""
PREFIX rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#>
PREFIX rdfs: <http://www.w3.org/2000/01/rdf-schema#>
PREFIX owl: <http://www.w3.org/2002/07/owl#>
SELECT ?s ?p ?o
WHERE {{
{{
VALUES ?p {{ rdf:type }}
?s ?p ?o .
?o rdf:type owl:Class .
}}
UNION
{{
VALUES ?p {{ rdfs:subClassOf }}
?s ?p ?o .
}}
FILTER(!isLiteral(?o))
{bnode_filter}
}}
LIMIT {edge_limit}
"""
def graph_from_sparql_bindings(
bindings: list[dict[str, Any]],
*,
node_limit: int,
include_bnodes: bool,
) -> tuple[list[dict[str, object]], list[dict[str, object]]]:
"""
Convert SPARQL JSON results bindings into:
nodes: [{id, termType, iri, label}]
edges: [{source, target, predicate}]
IDs are assigned densely (0..N-1) based on first occurrence in bindings.
"""
node_id_by_key: dict[tuple[str, str], int] = {}
node_meta: list[tuple[str, str]] = [] # (termType, iri)
out_edges: list[dict[str, object]] = []
def term_to_key_and_iri(term: dict[str, Any]) -> tuple[tuple[str, str], tuple[str, str]] | None:
t = term.get("type")
v = term.get("value")
if not t or v is None:
return None
if t == "literal":
return None
if t == "bnode":
if not include_bnodes:
return None
# SPARQL JSON uses bnode identifiers without the "_:" prefix; we normalize to "_:id".
return (("bnode", str(v)), ("bnode", f"_:{v}"))
# Default to "uri".
return (("uri", str(v)), ("uri", str(v)))
def get_or_add(term: dict[str, Any]) -> int | None:
out = term_to_key_and_iri(term)
if out is None:
return None
key, meta = out
existing = node_id_by_key.get(key)
if existing is not None:
return existing
if len(node_meta) >= node_limit:
return None
nid = len(node_meta)
node_id_by_key[key] = nid
node_meta.append(meta)
return nid
for b in bindings:
s_term = b.get("s") or {}
o_term = b.get("o") or {}
p_term = b.get("p") or {}
sid = get_or_add(s_term)
oid = get_or_add(o_term)
if sid is None or oid is None:
continue
pred = p_term.get("value")
if not pred:
continue
out_edges.append({"source": sid, "target": oid, "predicate": str(pred)})
out_nodes = [
{"id": i, "termType": term_type, "iri": iri, "label": None}
for i, (term_type, iri) in enumerate(node_meta)
]
return out_nodes, out_edges

View File

@@ -1,172 +0,0 @@
from __future__ import annotations
from contextlib import asynccontextmanager
import logging
import asyncio
from fastapi import FastAPI, HTTPException, Query
from fastapi.middleware.cors import CORSMiddleware
from .models import (
EdgesResponse,
GraphResponse,
NeighborsRequest,
NeighborsResponse,
NodesResponse,
SparqlQueryRequest,
StatsResponse,
)
from .pipelines.layout_dag_radial import CycleError
from .pipelines.owl_imports_combiner import (
build_combined_graph,
output_location_to_path,
resolve_output_location,
serialize_graph_to_ttl,
)
from .pipelines.selection_neighbors import fetch_neighbor_ids_for_selection
from .pipelines.snapshot_service import GraphSnapshotService
from .rdf_store import RDFStore
from .sparql_engine import RdflibEngine, SparqlEngine, create_sparql_engine
from .settings import Settings
settings = Settings()
logger = logging.getLogger(__name__)
@asynccontextmanager
async def lifespan(app: FastAPI):
rdflib_preloaded_graph = None
if settings.combine_owl_imports_on_start:
entry_location = settings.combine_entry_location or settings.ttl_path
output_location = resolve_output_location(
entry_location,
output_location=settings.combine_output_location,
output_name=settings.combine_output_name,
)
output_path = output_location_to_path(output_location)
if output_path.exists() and not settings.combine_force:
logger.info("Skipping combine step (output exists): %s", output_location)
else:
rdflib_preloaded_graph = await asyncio.to_thread(build_combined_graph, entry_location)
logger.info("Finished combining imports; serializing to: %s", output_location)
await asyncio.to_thread(serialize_graph_to_ttl, rdflib_preloaded_graph, output_location)
if settings.graph_backend == "rdflib":
settings.ttl_path = str(output_path)
sparql: SparqlEngine = create_sparql_engine(settings, rdflib_graph=rdflib_preloaded_graph)
await sparql.startup()
app.state.sparql = sparql
app.state.snapshot_service = GraphSnapshotService(sparql=sparql, settings=settings)
# Only build node/edge tables when running in rdflib mode.
if settings.graph_backend == "rdflib":
assert isinstance(sparql, RdflibEngine)
if sparql.graph is None:
raise RuntimeError("rdflib graph failed to load")
store = RDFStore(
ttl_path=settings.ttl_path,
include_bnodes=settings.include_bnodes,
max_triples=settings.max_triples,
)
store.load(sparql.graph)
app.state.store = store
yield
await sparql.shutdown()
app = FastAPI(title="visualizador_instanciados backend", lifespan=lifespan)
cors_origins = settings.cors_origin_list()
app.add_middleware(
CORSMiddleware,
allow_origins=cors_origins,
allow_credentials=False,
allow_methods=["*"],
allow_headers=["*"],
)
@app.get("/api/health")
def health() -> dict[str, str]:
return {"status": "ok"}
@app.get("/api/stats", response_model=StatsResponse)
async def stats() -> StatsResponse:
# Stats reflect exactly what we send to the frontend (/api/graph), not global graph size.
svc: GraphSnapshotService = app.state.snapshot_service
try:
snap = await svc.get(node_limit=50_000, edge_limit=100_000)
except CycleError as e:
raise HTTPException(status_code=422, detail=str(e)) from None
meta = snap.meta
return StatsResponse(
backend=meta.backend if meta else app.state.sparql.name,
ttl_path=meta.ttl_path if meta and meta.ttl_path else settings.ttl_path,
sparql_endpoint=meta.sparql_endpoint if meta else None,
parsed_triples=len(snap.edges),
nodes=len(snap.nodes),
edges=len(snap.edges),
)
@app.post("/api/sparql")
async def sparql_query(req: SparqlQueryRequest) -> dict:
sparql: SparqlEngine = app.state.sparql
data = await sparql.query_json(req.query)
return data
@app.post("/api/neighbors", response_model=NeighborsResponse)
async def neighbors(req: NeighborsRequest) -> NeighborsResponse:
svc: GraphSnapshotService = app.state.snapshot_service
snap = await svc.get(node_limit=req.node_limit, edge_limit=req.edge_limit)
sparql: SparqlEngine = app.state.sparql
neighbor_ids = await fetch_neighbor_ids_for_selection(
sparql,
snapshot=snap,
selected_ids=req.selected_ids,
include_bnodes=settings.include_bnodes,
)
return NeighborsResponse(selected_ids=req.selected_ids, neighbor_ids=neighbor_ids)
@app.get("/api/nodes", response_model=NodesResponse)
def nodes(
limit: int = Query(default=10_000, ge=1, le=200_000),
offset: int = Query(default=0, ge=0),
) -> NodesResponse:
if settings.graph_backend != "rdflib":
raise HTTPException(status_code=501, detail="GET /api/nodes is only supported in GRAPH_BACKEND=rdflib mode")
store: RDFStore = app.state.store
return NodesResponse(total=store.node_count, nodes=store.node_slice(offset=offset, limit=limit))
@app.get("/api/edges", response_model=EdgesResponse)
def edges(
limit: int = Query(default=50_000, ge=1, le=500_000),
offset: int = Query(default=0, ge=0),
) -> EdgesResponse:
if settings.graph_backend != "rdflib":
raise HTTPException(status_code=501, detail="GET /api/edges is only supported in GRAPH_BACKEND=rdflib mode")
store: RDFStore = app.state.store
return EdgesResponse(total=store.edge_count, edges=store.edge_slice(offset=offset, limit=limit))
@app.get("/api/graph", response_model=GraphResponse)
async def graph(
node_limit: int = Query(default=50_000, ge=1, le=200_000),
edge_limit: int = Query(default=100_000, ge=1, le=500_000),
) -> GraphResponse:
svc: GraphSnapshotService = app.state.snapshot_service
try:
return await svc.get(node_limit=node_limit, edge_limit=edge_limit)
except CycleError as e:
raise HTTPException(status_code=422, detail=str(e)) from None

View File

@@ -1,69 +0,0 @@
from __future__ import annotations
from pydantic import BaseModel
class Node(BaseModel):
id: int
termType: str # "uri" | "bnode"
iri: str
label: str | None = None
# Optional because /api/nodes (RDFStore) doesn't currently provide positions.
x: float | None = None
y: float | None = None
class Edge(BaseModel):
source: int
target: int
predicate: str
class StatsResponse(BaseModel):
backend: str
ttl_path: str
sparql_endpoint: str | None = None
parsed_triples: int
nodes: int
edges: int
class NodesResponse(BaseModel):
total: int
nodes: list[Node]
class EdgesResponse(BaseModel):
total: int
edges: list[Edge]
class GraphResponse(BaseModel):
class Meta(BaseModel):
backend: str
ttl_path: str | None = None
sparql_endpoint: str | None = None
include_bnodes: bool
node_limit: int
edge_limit: int
nodes: int
edges: int
nodes: list[Node]
edges: list[Edge]
meta: Meta | None = None
class SparqlQueryRequest(BaseModel):
query: str
class NeighborsRequest(BaseModel):
selected_ids: list[int]
node_limit: int = 50_000
edge_limit: int = 100_000
class NeighborsResponse(BaseModel):
selected_ids: list[int]
neighbor_ids: list[int]

View File

@@ -1 +0,0 @@

View File

@@ -1,148 +0,0 @@
from __future__ import annotations
from typing import Any
from ..graph_export import edge_retrieval_query, graph_from_sparql_bindings
from ..models import GraphResponse
from ..sparql_engine import SparqlEngine
from ..settings import Settings
from .layout_dag_radial import CycleError, level_synchronous_kahn_layers, radial_positions_from_layers
RDFS_LABEL = "http://www.w3.org/2000/01/rdf-schema#label"
def _bindings(res: dict[str, Any]) -> list[dict[str, Any]]:
return (((res.get("results") or {}).get("bindings")) or [])
def _label_score(label_binding: dict[str, Any]) -> int:
# Prefer English, then no-language, then anything else.
lang = (label_binding.get("xml:lang") or "").lower()
if lang == "en":
return 3
if lang == "":
return 2
return 1
async def _fetch_rdfs_labels_for_iris(
sparql: SparqlEngine,
iris: list[str],
*,
batch_size: int = 500,
) -> dict[str, str]:
best: dict[str, tuple[int, str]] = {}
for i in range(0, len(iris), batch_size):
batch = iris[i : i + batch_size]
values = " ".join(f"<{u}>" for u in batch)
q = f"""
SELECT ?s ?label
WHERE {{
VALUES ?s {{ {values} }}
?s <{RDFS_LABEL}> ?label .
}}
"""
res = await sparql.query_json(q)
for b in _bindings(res):
s = (b.get("s") or {}).get("value")
label_term = b.get("label") or {}
if not s or label_term.get("type") != "literal":
continue
label_value = label_term.get("value")
if label_value is None:
continue
score = _label_score(label_term)
prev = best.get(s)
if prev is None or score > prev[0]:
best[s] = (score, str(label_value))
return {iri: lbl for iri, (_, lbl) in best.items()}
async def fetch_graph_snapshot(
sparql: SparqlEngine,
*,
settings: Settings,
node_limit: int,
edge_limit: int,
) -> GraphResponse:
"""
Fetch a graph snapshot (nodes + edges) via SPARQL, independent of whether the
underlying engine is RDFLib or AnzoGraph.
"""
edges_q = edge_retrieval_query(edge_limit=edge_limit, include_bnodes=settings.include_bnodes)
res = await sparql.query_json(edges_q)
bindings = (((res.get("results") or {}).get("bindings")) or [])
nodes, edges = graph_from_sparql_bindings(
bindings,
node_limit=node_limit,
include_bnodes=settings.include_bnodes,
)
# Add positions so the frontend doesn't need to run a layout.
#
# We are exporting only rdfs:subClassOf triples. In the exported edges:
# source = subclass, target = superclass
# For hierarchical layout we invert edges to:
# superclass -> subclass
hier_edges: list[tuple[int, int]] = []
for e in edges:
s = e.get("source")
t = e.get("target")
try:
sid = int(s) # subclass
tid = int(t) # superclass
except Exception:
continue
hier_edges.append((tid, sid))
try:
layers = level_synchronous_kahn_layers(node_count=len(nodes), edges=hier_edges)
except CycleError as e:
# Add a small URI sample to aid debugging.
sample: list[str] = []
for nid in e.remaining_node_ids[:20]:
try:
sample.append(str(nodes[nid].get("iri")))
except Exception:
continue
raise CycleError(
processed=e.processed,
total=e.total,
remaining_node_ids=e.remaining_node_ids,
remaining_iri_sample=sample or None,
) from None
# Deterministic order within each ring/layer for stable layouts.
id_to_iri = [str(n.get("iri", "")) for n in nodes]
for layer in layers:
layer.sort(key=lambda nid: id_to_iri[nid])
xs, ys = radial_positions_from_layers(node_count=len(nodes), layers=layers)
for i, node in enumerate(nodes):
node["x"] = float(xs[i])
node["y"] = float(ys[i])
# Attach labels for URI nodes (blank nodes remain label-less).
uri_nodes = [n for n in nodes if n.get("termType") == "uri"]
if uri_nodes:
iris = [str(n["iri"]) for n in uri_nodes if isinstance(n.get("iri"), str)]
label_by_iri = await _fetch_rdfs_labels_for_iris(sparql, iris)
for n in uri_nodes:
iri = n.get("iri")
if isinstance(iri, str) and iri in label_by_iri:
n["label"] = label_by_iri[iri]
meta = GraphResponse.Meta(
backend=sparql.name,
ttl_path=settings.ttl_path if settings.graph_backend == "rdflib" else None,
sparql_endpoint=settings.effective_sparql_endpoint() if settings.graph_backend == "anzograph" else None,
include_bnodes=settings.include_bnodes,
node_limit=node_limit,
edge_limit=edge_limit,
nodes=len(nodes),
edges=len(edges),
)
return GraphResponse(nodes=nodes, edges=edges, meta=meta)

View File

@@ -1,141 +0,0 @@
from __future__ import annotations
import math
from collections import deque
from typing import Iterable, Sequence
class CycleError(RuntimeError):
"""
Raised when the requested layout requires a DAG, but a cycle is detected.
`remaining_node_ids` are the node ids that still had indegree > 0 after Kahn.
"""
def __init__(
self,
*,
processed: int,
total: int,
remaining_node_ids: list[int],
remaining_iri_sample: list[str] | None = None,
) -> None:
self.processed = int(processed)
self.total = int(total)
self.remaining_node_ids = remaining_node_ids
self.remaining_iri_sample = remaining_iri_sample
msg = f"Cycle detected in subClassOf graph (processed {self.processed}/{self.total} nodes)."
if remaining_iri_sample:
msg += f" Example nodes: {', '.join(remaining_iri_sample)}"
super().__init__(msg)
def level_synchronous_kahn_layers(
*,
node_count: int,
edges: Iterable[tuple[int, int]],
) -> list[list[int]]:
"""
Level-synchronous Kahn's algorithm:
- process the entire current queue as one batch (one layer)
- only then enqueue newly-unlocked nodes for the next batch
`edges` are directed (u -> v).
"""
n = int(node_count)
if n <= 0:
return []
adj: list[list[int]] = [[] for _ in range(n)]
indeg = [0] * n
for u, v in edges:
if u == v:
# Self-loops don't help layout and would trivially violate DAG-ness.
continue
if not (0 <= u < n and 0 <= v < n):
continue
adj[u].append(v)
indeg[v] += 1
q: deque[int] = deque(i for i, d in enumerate(indeg) if d == 0)
layers: list[list[int]] = []
processed = 0
while q:
# Consume the full current queue as a single layer.
layer = list(q)
q.clear()
layers.append(layer)
for u in layer:
processed += 1
for v in adj[u]:
indeg[v] -= 1
if indeg[v] == 0:
q.append(v)
if processed != n:
remaining = [i for i, d in enumerate(indeg) if d > 0]
raise CycleError(processed=processed, total=n, remaining_node_ids=remaining)
return layers
def radial_positions_from_layers(
*,
node_count: int,
layers: Sequence[Sequence[int]],
max_r: float = 5000.0,
) -> tuple[list[float], list[float]]:
"""
Assign node positions in concentric rings (one ring per layer).
- radius increases with layer index
- nodes within a layer are placed evenly by angle
- each ring gets a "golden-angle" rotation to reduce spoke artifacts
"""
n = int(node_count)
if n <= 0:
return ([], [])
xs = [0.0] * n
ys = [0.0] * n
if not layers:
return (xs, ys)
two_pi = 2.0 * math.pi
golden = math.pi * (3.0 - math.sqrt(5.0))
layer_count = len(layers)
denom = float(layer_count + 1)
for li, layer in enumerate(layers):
m = len(layer)
if m <= 0:
continue
# Keep everything within ~[-max_r, max_r] like the previous spiral layout.
r = ((li + 1) / denom) * max_r
# Rotate each layer deterministically to avoid radial spokes aligning.
offset = (li * golden) % two_pi
if m == 1:
nid = int(layer[0])
if 0 <= nid < n:
xs[nid] = r * math.cos(offset)
ys[nid] = r * math.sin(offset)
continue
step = two_pi / float(m)
for j, raw_id in enumerate(layer):
nid = int(raw_id)
if not (0 <= nid < n):
continue
t = offset + step * float(j)
xs[nid] = r * math.cos(t)
ys[nid] = r * math.sin(t)
return (xs, ys)

View File

@@ -1,30 +0,0 @@
from __future__ import annotations
import math
def spiral_positions(n: int, *, max_r: float = 5000.0) -> tuple[list[float], list[float]]:
"""
Deterministic "sunflower" (golden-angle) spiral layout.
This is intentionally simple and stable across runs:
- angle increments by the golden angle to avoid radial spokes
- radius grows with sqrt(i) to keep density roughly uniform over area
"""
if n <= 0:
return ([], [])
xs = [0.0] * n
ys = [0.0] * n
golden = math.pi * (3.0 - math.sqrt(5.0))
denom = float(max(1, n - 1))
for i in range(n):
t = i * golden
r = math.sqrt(i / denom) * max_r
xs[i] = r * math.cos(t)
ys[i] = r * math.sin(t)
return xs, ys

View File

@@ -1,137 +0,0 @@
from __future__ import annotations
from typing import Any, Iterable
from ..models import GraphResponse, Node
from ..sparql_engine import SparqlEngine
def _values_term(node: Node) -> str | None:
iri = node.iri
if node.termType == "uri":
return f"<{iri}>"
if node.termType == "bnode":
if iri.startswith("_:"):
return iri
return f"_:{iri}"
return None
def selection_neighbors_query(*, selected_nodes: Iterable[Node], include_bnodes: bool) -> str:
values_terms: list[str] = []
for n in selected_nodes:
t = _values_term(n)
if t is None:
continue
values_terms.append(t)
if not values_terms:
# Caller should avoid running this query when selection is empty, but keep this safe.
return "SELECT ?nbr WHERE { FILTER(false) }"
bnode_filter = "" if include_bnodes else "FILTER(!isBlank(?nbr))"
values = " ".join(values_terms)
# Neighbors are defined as any node directly connected by rdf:type (to owl:Class)
# or rdfs:subClassOf, in either direction (treating edges as undirected).
return f"""
PREFIX rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#>
PREFIX rdfs: <http://www.w3.org/2000/01/rdf-schema#>
PREFIX owl: <http://www.w3.org/2002/07/owl#>
SELECT DISTINCT ?nbr
WHERE {{
VALUES ?sel {{ {values} }}
{{
?sel rdf:type ?o .
?o rdf:type owl:Class .
BIND(?o AS ?nbr)
}}
UNION
{{
?s rdf:type ?sel .
?sel rdf:type owl:Class .
BIND(?s AS ?nbr)
}}
UNION
{{
?sel rdfs:subClassOf ?o .
BIND(?o AS ?nbr)
}}
UNION
{{
?s rdfs:subClassOf ?sel .
BIND(?s AS ?nbr)
}}
FILTER(!isLiteral(?nbr))
FILTER(?nbr != ?sel)
{bnode_filter}
}}
"""
def _bindings(res: dict[str, Any]) -> list[dict[str, Any]]:
return (((res.get("results") or {}).get("bindings")) or [])
def _term_key(term: dict[str, Any], *, include_bnodes: bool) -> tuple[str, str] | None:
t = term.get("type")
v = term.get("value")
if not t or v is None:
return None
if t == "literal":
return None
if t == "bnode":
if not include_bnodes:
return None
return ("bnode", f"_:{v}")
return ("uri", str(v))
async def fetch_neighbor_ids_for_selection(
sparql: SparqlEngine,
*,
snapshot: GraphResponse,
selected_ids: list[int],
include_bnodes: bool,
) -> list[int]:
id_to_node: dict[int, Node] = {n.id: n for n in snapshot.nodes}
selected_nodes: list[Node] = []
selected_id_set: set[int] = set()
for nid in selected_ids:
if not isinstance(nid, int):
continue
n = id_to_node.get(nid)
if n is None:
continue
if n.termType == "bnode" and not include_bnodes:
continue
selected_nodes.append(n)
selected_id_set.add(nid)
if not selected_nodes:
return []
key_to_id: dict[tuple[str, str], int] = {}
for n in snapshot.nodes:
key_to_id[(n.termType, n.iri)] = n.id
q = selection_neighbors_query(selected_nodes=selected_nodes, include_bnodes=include_bnodes)
res = await sparql.query_json(q)
neighbor_ids: set[int] = set()
for b in _bindings(res):
nbr_term = b.get("nbr") or {}
key = _term_key(nbr_term, include_bnodes=include_bnodes)
if key is None:
continue
nid = key_to_id.get(key)
if nid is None:
continue
if nid in selected_id_set:
continue
neighbor_ids.add(nid)
# Stable ordering for consistent frontend behavior.
return sorted(neighbor_ids)

View File

@@ -1,63 +0,0 @@
from __future__ import annotations
import asyncio
from dataclasses import dataclass
from ..models import GraphResponse
from ..sparql_engine import SparqlEngine
from ..settings import Settings
from .graph_snapshot import fetch_graph_snapshot
@dataclass(frozen=True)
class SnapshotKey:
node_limit: int
edge_limit: int
include_bnodes: bool
class GraphSnapshotService:
"""
Caches graph snapshots so the backend doesn't re-run expensive SPARQL for stats/graph.
"""
def __init__(self, *, sparql: SparqlEngine, settings: Settings):
self._sparql = sparql
self._settings = settings
self._cache: dict[SnapshotKey, GraphResponse] = {}
self._locks: dict[SnapshotKey, asyncio.Lock] = {}
self._global_lock = asyncio.Lock()
async def get(self, *, node_limit: int, edge_limit: int) -> GraphResponse:
key = SnapshotKey(
node_limit=node_limit,
edge_limit=edge_limit,
include_bnodes=self._settings.include_bnodes,
)
cached = self._cache.get(key)
if cached is not None:
return cached
# Create/get a per-key lock under a global lock to avoid races.
async with self._global_lock:
lock = self._locks.get(key)
if lock is None:
lock = asyncio.Lock()
self._locks[key] = lock
async with lock:
cached2 = self._cache.get(key)
if cached2 is not None:
return cached2
snapshot = await fetch_graph_snapshot(
self._sparql,
settings=self._settings,
node_limit=node_limit,
edge_limit=edge_limit,
)
self._cache[key] = snapshot
return snapshot

View File

@@ -1,153 +0,0 @@
from __future__ import annotations
from typing import Any
from ..sparql_engine import SparqlEngine
RDFS_SUBCLASS_OF = "http://www.w3.org/2000/01/rdf-schema#subClassOf"
RDFS_LABEL = "http://www.w3.org/2000/01/rdf-schema#label"
def _bindings(res: dict[str, Any]) -> list[dict[str, Any]]:
return (((res.get("results") or {}).get("bindings")) or [])
def _term_key(term: dict[str, Any]) -> tuple[str, str] | None:
t = term.get("type")
v = term.get("value")
if not t or v is None:
return None
if t == "literal":
return None
if t == "bnode":
return ("bnode", str(v))
return ("uri", str(v))
def _key_to_entity_string(key: tuple[str, str]) -> str:
t, v = key
if t == "bnode":
return f"_:{v}"
return v
def _label_score(binding: dict[str, Any]) -> int:
"""
Higher is better.
Prefer English, then no-language, then anything else.
"""
lang = (binding.get("xml:lang") or "").lower()
if lang == "en":
return 3
if lang == "":
return 2
return 1
async def extract_subclass_entities_and_labels(
sparql: SparqlEngine,
*,
include_bnodes: bool,
label_batch_size: int = 500,
) -> tuple[list[str], list[str | None]]:
"""
Pipeline:
1) Query all rdfs:subClassOf triples.
2) Build a unique set of entity terms from subjects+objects, convert to list.
3) Fetch rdfs:label for those entities and return an aligned labels list.
Returns:
entities: list[str] (IRI or "_:bnodeId")
labels: list[str|None], aligned with entities
"""
subclass_q = f"""
SELECT ?s ?o
WHERE {{
?s <{RDFS_SUBCLASS_OF}> ?o .
FILTER(!isLiteral(?o))
{"FILTER(!isBlank(?s) && !isBlank(?o))" if not include_bnodes else ""}
}}
"""
res = await sparql.query_json(subclass_q)
entity_keys: set[tuple[str, str]] = set()
for b in _bindings(res):
sk = _term_key(b.get("s") or {})
ok = _term_key(b.get("o") or {})
if sk is not None and (include_bnodes or sk[0] != "bnode"):
entity_keys.add(sk)
if ok is not None and (include_bnodes or ok[0] != "bnode"):
entity_keys.add(ok)
# Deterministic ordering.
entity_key_list = sorted(entity_keys, key=lambda k: (k[0], k[1]))
entities = [_key_to_entity_string(k) for k in entity_key_list]
# Build label map keyed by term key.
best_label_by_key: dict[tuple[str, str], tuple[int, str]] = {}
# URIs can be batch-queried via VALUES.
uri_values = [v for (t, v) in entity_key_list if t == "uri"]
for i in range(0, len(uri_values), label_batch_size):
batch = uri_values[i : i + label_batch_size]
values = " ".join(f"<{u}>" for u in batch)
labels_q = f"""
SELECT ?s ?label
WHERE {{
VALUES ?s {{ {values} }}
?s <{RDFS_LABEL}> ?label .
}}
"""
lres = await sparql.query_json(labels_q)
for b in _bindings(lres):
sk = _term_key(b.get("s") or {})
if sk is None or sk[0] != "uri":
continue
label_term = b.get("label") or {}
if label_term.get("type") != "literal":
continue
label_value = label_term.get("value")
if label_value is None:
continue
score = _label_score(label_term)
prev = best_label_by_key.get(sk)
if prev is None or score > prev[0]:
best_label_by_key[sk] = (score, str(label_value))
# Blank nodes can't reliably be addressed by ID across queries, but if enabled we can still
# fetch all bnode labels and filter locally.
if include_bnodes:
bnode_keys = {k for k in entity_key_list if k[0] == "bnode"}
if bnode_keys:
bnode_labels_q = f"""
SELECT ?s ?label
WHERE {{
?s <{RDFS_LABEL}> ?label .
FILTER(isBlank(?s))
}}
"""
blres = await sparql.query_json(bnode_labels_q)
for b in _bindings(blres):
sk = _term_key(b.get("s") or {})
if sk is None or sk not in bnode_keys:
continue
label_term = b.get("label") or {}
if label_term.get("type") != "literal":
continue
label_value = label_term.get("value")
if label_value is None:
continue
score = _label_score(label_term)
prev = best_label_by_key.get(sk)
if prev is None or score > prev[0]:
best_label_by_key[sk] = (score, str(label_value))
labels: list[str | None] = []
for k in entity_key_list:
item = best_label_by_key.get(k)
labels.append(item[1] if item else None)
return entities, labels

View File

@@ -1,150 +0,0 @@
from __future__ import annotations
from dataclasses import dataclass
from typing import Any
from rdflib import BNode, Graph, Literal, URIRef
from rdflib.namespace import RDFS, SKOS
LABEL_PREDICATES = {RDFS.label, SKOS.prefLabel, SKOS.altLabel}
@dataclass(frozen=True)
class EdgeRow:
source: int
target: int
predicate: str
class RDFStore:
def __init__(self, *, ttl_path: str, include_bnodes: bool, max_triples: int | None):
self.ttl_path = ttl_path
self.include_bnodes = include_bnodes
self.max_triples = max_triples
self.graph: Graph | None = None
self._id_by_term: dict[Any, int] = {}
self._term_by_id: list[Any] = []
self._labels_by_id: dict[int, str] = {}
self._edges: list[EdgeRow] = []
self._parsed_triples = 0
def _term_allowed(self, term: Any) -> bool:
if isinstance(term, Literal):
return False
if isinstance(term, BNode) and not self.include_bnodes:
return False
return isinstance(term, (URIRef, BNode))
def _get_id(self, term: Any) -> int | None:
if not self._term_allowed(term):
return None
existing = self._id_by_term.get(term)
if existing is not None:
return existing
nid = len(self._term_by_id)
self._id_by_term[term] = nid
self._term_by_id.append(term)
return nid
def _term_type(self, term: Any) -> str:
if isinstance(term, BNode):
return "bnode"
return "uri"
def _term_iri(self, term: Any) -> str:
if isinstance(term, BNode):
return f"_:{term}"
return str(term)
def load(self, graph: Graph | None = None) -> None:
g = graph or Graph()
if graph is None:
g.parse(self.ttl_path, format="turtle")
self.graph = g
self._id_by_term.clear()
self._term_by_id.clear()
self._labels_by_id.clear()
self._edges.clear()
parsed = 0
for (s, p, o) in g:
parsed += 1
if self.max_triples is not None and parsed > self.max_triples:
break
# Capture labels but do not emit them as edges.
if p in LABEL_PREDICATES and isinstance(o, Literal):
sid = self._get_id(s)
if sid is not None and sid not in self._labels_by_id:
self._labels_by_id[sid] = str(o)
continue
sid = self._get_id(s)
oid = self._get_id(o)
if sid is None or oid is None:
continue
self._edges.append(EdgeRow(source=sid, target=oid, predicate=str(p)))
self._parsed_triples = parsed
@property
def parsed_triples(self) -> int:
return self._parsed_triples
@property
def node_count(self) -> int:
return len(self._term_by_id)
@property
def edge_count(self) -> int:
return len(self._edges)
def node_slice(self, *, offset: int, limit: int) -> list[dict[str, Any]]:
end = min(self.node_count, offset + limit)
out: list[dict[str, Any]] = []
for nid in range(offset, end):
term = self._term_by_id[nid]
out.append(
{
"id": nid,
"termType": self._term_type(term),
"iri": self._term_iri(term),
"label": self._labels_by_id.get(nid),
}
)
return out
def edge_slice(self, *, offset: int, limit: int) -> list[dict[str, Any]]:
end = min(self.edge_count, offset + limit)
out: list[dict[str, Any]] = []
for row in self._edges[offset:end]:
out.append(
{
"source": row.source,
"target": row.target,
"predicate": row.predicate,
}
)
return out
def edges_within_nodes(self, *, max_node_id_exclusive: int, limit: int) -> list[dict[str, Any]]:
out: list[dict[str, Any]] = []
for row in self._edges:
if row.source >= max_node_id_exclusive or row.target >= max_node_id_exclusive:
continue
out.append(
{
"source": row.source,
"target": row.target,
"predicate": row.predicate,
}
)
if len(out) >= limit:
break
return out

View File

@@ -1,58 +0,0 @@
from __future__ import annotations
from typing import Literal
from pydantic import Field
from pydantic_settings import BaseSettings, SettingsConfigDict
class Settings(BaseSettings):
# Which graph engine executes SPARQL queries.
# - rdflib: parse TTL locally and query in-memory
# - anzograph: query a remote AnzoGraph SPARQL endpoint (optionally LOAD on startup)
graph_backend: Literal["rdflib", "anzograph"] = Field(default="rdflib", alias="GRAPH_BACKEND")
ttl_path: str = Field(default="/data/o3po.ttl", alias="TTL_PATH")
include_bnodes: bool = Field(default=False, alias="INCLUDE_BNODES")
max_triples: int | None = Field(default=None, alias="MAX_TRIPLES")
# Optional: Combine owl:imports into a single TTL file on backend startup.
combine_owl_imports_on_start: bool = Field(default=False, alias="COMBINE_OWL_IMPORTS_ON_START")
combine_entry_location: str | None = Field(default=None, alias="COMBINE_ENTRY_LOCATION")
combine_output_location: str | None = Field(default=None, alias="COMBINE_OUTPUT_LOCATION")
combine_output_name: str = Field(default="combined_ontology.ttl", alias="COMBINE_OUTPUT_NAME")
combine_force: bool = Field(default=False, alias="COMBINE_FORCE")
# AnzoGraph / SPARQL endpoint configuration
sparql_host: str = Field(default="http://anzograph:8080", alias="SPARQL_HOST")
# If not set, the backend uses `${SPARQL_HOST}/sparql`.
sparql_endpoint: str | None = Field(default=None, alias="SPARQL_ENDPOINT")
sparql_user: str | None = Field(default=None, alias="SPARQL_USER")
sparql_pass: str | None = Field(default=None, alias="SPARQL_PASS")
# File URI as seen by the AnzoGraph container (used with SPARQL `LOAD`).
# Example: file:///opt/shared-files/o3po.ttl
sparql_data_file: str | None = Field(default=None, alias="SPARQL_DATA_FILE")
sparql_graph_iri: str | None = Field(default=None, alias="SPARQL_GRAPH_IRI")
sparql_load_on_start: bool = Field(default=False, alias="SPARQL_LOAD_ON_START")
sparql_clear_on_start: bool = Field(default=False, alias="SPARQL_CLEAR_ON_START")
sparql_timeout_s: float = Field(default=300.0, alias="SPARQL_TIMEOUT_S")
sparql_ready_retries: int = Field(default=30, alias="SPARQL_READY_RETRIES")
sparql_ready_delay_s: float = Field(default=4.0, alias="SPARQL_READY_DELAY_S")
sparql_ready_timeout_s: float = Field(default=10.0, alias="SPARQL_READY_TIMEOUT_S")
# Comma-separated, or "*" (default).
cors_origins: str = Field(default="*", alias="CORS_ORIGINS")
model_config = SettingsConfigDict(env_file=".env", extra="ignore")
def cors_origin_list(self) -> list[str]:
if self.cors_origins.strip() == "*":
return ["*"]
return [o.strip() for o in self.cors_origins.split(",") if o.strip()]
def effective_sparql_endpoint(self) -> str:
if self.sparql_endpoint and self.sparql_endpoint.strip():
return self.sparql_endpoint.strip()
return self.sparql_host.rstrip("/") + "/sparql"

View File

@@ -1,177 +0,0 @@
from __future__ import annotations
import asyncio
import base64
import json
from typing import Any, Protocol
import httpx
from rdflib import Graph
from .settings import Settings
class SparqlEngine(Protocol):
name: str
async def startup(self) -> None: ...
async def shutdown(self) -> None: ...
async def query_json(self, query: str) -> dict[str, Any]: ...
class RdflibEngine:
name = "rdflib"
def __init__(self, *, ttl_path: str, graph: Graph | None = None):
self.ttl_path = ttl_path
self.graph: Graph | None = graph
async def startup(self) -> None:
if self.graph is not None:
return
g = Graph()
g.parse(self.ttl_path, format="turtle")
self.graph = g
async def shutdown(self) -> None:
# Nothing to close for in-memory rdflib graph.
return None
async def query_json(self, query: str) -> dict[str, Any]:
if self.graph is None:
raise RuntimeError("RdflibEngine not started")
result = self.graph.query(query)
payload = result.serialize(format="json")
if isinstance(payload, bytes):
payload = payload.decode("utf-8")
return json.loads(payload)
class AnzoGraphEngine:
name = "anzograph"
def __init__(self, *, settings: Settings):
self.endpoint = settings.effective_sparql_endpoint()
self.timeout_s = settings.sparql_timeout_s
self.ready_retries = settings.sparql_ready_retries
self.ready_delay_s = settings.sparql_ready_delay_s
self.ready_timeout_s = settings.sparql_ready_timeout_s
self.user = settings.sparql_user
self.password = settings.sparql_pass
self.data_file = settings.sparql_data_file
self.graph_iri = settings.sparql_graph_iri
self.load_on_start = settings.sparql_load_on_start
self.clear_on_start = settings.sparql_clear_on_start
self._client: httpx.AsyncClient | None = None
self._auth_header = self._build_auth_header(self.user, self.password)
@staticmethod
def _build_auth_header(user: str | None, password: str | None) -> str | None:
if not user or not password:
return None
token = base64.b64encode(f"{user}:{password}".encode("utf-8")).decode("ascii")
return f"Basic {token}"
async def startup(self) -> None:
self._client = httpx.AsyncClient(timeout=self.timeout_s)
await self._wait_ready()
if self.clear_on_start:
await self._update("CLEAR ALL")
await self._wait_ready()
if self.load_on_start:
if not self.data_file:
raise RuntimeError("SPARQL_LOAD_ON_START=true but SPARQL_DATA_FILE is not set")
if self.graph_iri:
await self._update(f"LOAD <{self.data_file}> INTO GRAPH <{self.graph_iri}>")
else:
await self._update(f"LOAD <{self.data_file}>")
# AnzoGraph may still be indexing after LOAD.
await self._wait_ready()
async def shutdown(self) -> None:
if self._client is not None:
await self._client.aclose()
self._client = None
async def query_json(self, query: str) -> dict[str, Any]:
if self._client is None:
raise RuntimeError("AnzoGraphEngine not started")
headers = {
"Content-Type": "application/x-www-form-urlencoded",
"Accept": "application/sparql-results+json",
}
if self._auth_header:
headers["Authorization"] = self._auth_header
# AnzoGraph expects x-www-form-urlencoded with `query=...`.
resp = await self._client.post(
self.endpoint,
headers=headers,
data={"query": query},
)
resp.raise_for_status()
return resp.json()
async def _update(self, update: str) -> None:
if self._client is None:
raise RuntimeError("AnzoGraphEngine not started")
headers = {
"Content-Type": "application/sparql-update",
"Accept": "application/json",
}
if self._auth_header:
headers["Authorization"] = self._auth_header
resp = await self._client.post(self.endpoint, headers=headers, content=update)
resp.raise_for_status()
async def _wait_ready(self) -> None:
if self._client is None:
raise RuntimeError("AnzoGraphEngine not started")
# Match the repo's Julia readiness gate: real SPARQL POST + valid JSON parse.
headers = {
"Content-Type": "application/x-www-form-urlencoded",
"Accept": "application/sparql-results+json",
}
if self._auth_header:
headers["Authorization"] = self._auth_header
last_err: Exception | None = None
for _ in range(self.ready_retries):
try:
resp = await self._client.post(
self.endpoint,
headers=headers,
data={"query": "ASK WHERE { ?s ?p ?o }"},
timeout=self.ready_timeout_s,
)
resp.raise_for_status()
# Ensure it's JSON, not HTML/text during boot.
resp.json()
return
except Exception as e:
last_err = e
await asyncio.sleep(self.ready_delay_s)
raise RuntimeError(f"AnzoGraph not ready at {self.endpoint}") from last_err
def create_sparql_engine(settings: Settings, *, rdflib_graph: Graph | None = None) -> SparqlEngine:
if settings.graph_backend == "rdflib":
return RdflibEngine(ttl_path=settings.ttl_path, graph=rdflib_graph)
if settings.graph_backend == "anzograph":
return AnzoGraphEngine(settings=settings)
raise RuntimeError(f"Unsupported GRAPH_BACKEND={settings.graph_backend!r}")

View File

@@ -1,5 +0,0 @@
fastapi
uvicorn[standard]
rdflib
pydantic-settings
httpx

24
backend_go/Dockerfile Normal file
View File

@@ -0,0 +1,24 @@
ARG GO_VERSION=1.24
FROM golang:${GO_VERSION}-alpine AS builder
WORKDIR /src
COPY go.mod /src/go.mod
RUN go mod download
COPY . /src
RUN CGO_ENABLED=0 GOOS=linux go build -trimpath -ldflags="-s -w" -o /out/backend ./
FROM alpine:3.20
RUN apk add --no-cache ca-certificates curl
WORKDIR /app
COPY --from=builder /out/backend /app/backend
EXPOSE 8000
CMD ["/app/backend"]

98
backend_go/README.md Normal file
View File

@@ -0,0 +1,98 @@
# Backend (Go) Graph + Selection API
This service exposes a small HTTP API for:
- Building and caching a “graph snapshot” from AnzoGraph via SPARQL (`/api/graph`)
- Returning available “graph query” and “selection query” modes
- Running selection queries for the currently selected node IDs
- (Optionally) issuing raw SPARQL passthrough for debugging
## Run
Via Docker Compose (recommended):
```bash
docker compose up --build backend
```
The backend listens on `:8000` (configurable via `LISTEN_ADDR`).
## Configuration (env)
See `backend_go/config.go` for the full set.
Important variables:
- Snapshot limits:
- `DEFAULT_NODE_LIMIT`, `DEFAULT_EDGE_LIMIT`
- `MAX_NODE_LIMIT`, `MAX_EDGE_LIMIT`
- SPARQL connectivity:
- `SPARQL_HOST` (default `http://anzograph:8080`) or `SPARQL_ENDPOINT`
- `SPARQL_USER`, `SPARQL_PASS`
- Startup behavior:
- `SPARQL_LOAD_ON_START`, `SPARQL_CLEAR_ON_START`
- `SPARQL_DATA_FILE` (typically `file:///opt/shared-files/<file>.ttl`)
- Other:
- `INCLUDE_BNODES` (include blank nodes in snapshots)
- `CORS_ORIGINS`
## Endpoints
- `GET /api/health`
- `GET /api/stats`
- `GET /api/graph?node_limit=&edge_limit=&graph_query_id=`
- `GET /api/graph_queries`
- `GET /api/selection_queries`
- `POST /api/selection_query`
- Body: `{"query_id":"neighbors","selected_ids":[1,2,3],"node_limit":...,"edge_limit":...,"graph_query_id":"default"}`
- `POST /api/sparql` (raw passthrough)
- `POST /api/neighbors` (legacy alias of `query_id="neighbors"`)
## Graph snapshots
Snapshots are built by:
1) Running a SPARQL edge query (controlled by `graph_query_id`)
2) Converting SPARQL bindings into dense integer node IDs + edge list
3) Computing a layout and fetching optional `rdfs:label`
Snapshots are cached in-memory keyed by:
- `node_limit`, `edge_limit`, `INCLUDE_BNODES`, `graph_query_id`
## Query registries
### Graph query modes (`graph_query_id`)
Stored under `backend_go/graph_queries/` and listed by `GET /api/graph_queries`.
Built-in modes:
- `default` `rdf:type` (to `owl:Class`) + `rdfs:subClassOf`
- `hierarchy` `rdfs:subClassOf` only
- `types` `rdf:type` (to `owl:Class`) only
To add a new mode:
1) Add a new file under `backend_go/graph_queries/` that returns a SPARQL query selecting `?s ?p ?o`.
2) Register it in `backend_go/graph_queries/registry.go`.
### Selection query modes (`query_id`)
Stored under `backend_go/selection_queries/` and listed by `GET /api/selection_queries`.
Built-in modes:
- `neighbors` type + subclass neighbors (both directions)
- `superclasses` `?sel rdfs:subClassOf ?nbr`
- `subclasses` `?nbr rdfs:subClassOf ?sel`
To add a new mode:
1) Add a new file under `backend_go/selection_queries/` that returns neighbor node IDs.
2) Register it in `backend_go/selection_queries/registry.go`.
## Performance notes
- Memory usage is dominated by the cached snapshot (`[]Node`, `[]Edge`) and the temporary SPARQL JSON unmarshalling step.
- Tune `DEFAULT_NODE_LIMIT`/`DEFAULT_EDGE_LIMIT` first if memory is too high.

190
backend_go/config.go Normal file
View File

@@ -0,0 +1,190 @@
package main
import (
"fmt"
"os"
"strconv"
"strings"
"time"
)
type Config struct {
IncludeBNodes bool
CorsOrigins string
DefaultNodeLimit int
DefaultEdgeLimit int
MaxNodeLimit int
MaxEdgeLimit int
EdgeBatchSize int
FreeOSMemoryAfterSnapshot bool
LogSnapshotTimings bool
SparqlHost string
SparqlEndpoint string
SparqlUser string
SparqlPass string
SparqlInsecureTLS bool
SparqlDataFile string
SparqlGraphIRI string
SparqlLoadOnStart bool
SparqlClearOnStart bool
SparqlTimeout time.Duration
SparqlReadyRetries int
SparqlReadyDelay time.Duration
SparqlReadyTimeout time.Duration
ListenAddr string
}
func LoadConfig() (Config, error) {
cfg := Config{
IncludeBNodes: envBool("INCLUDE_BNODES", false),
CorsOrigins: envString("CORS_ORIGINS", "*"),
DefaultNodeLimit: envInt("DEFAULT_NODE_LIMIT", 800_000),
DefaultEdgeLimit: envInt("DEFAULT_EDGE_LIMIT", 2_000_000),
MaxNodeLimit: envInt("MAX_NODE_LIMIT", 10_000_000),
MaxEdgeLimit: envInt("MAX_EDGE_LIMIT", 20_000_000),
EdgeBatchSize: envInt("EDGE_BATCH_SIZE", 100_000),
FreeOSMemoryAfterSnapshot: envBool("FREE_OS_MEMORY_AFTER_SNAPSHOT", false),
LogSnapshotTimings: envBool("LOG_SNAPSHOT_TIMINGS", false),
SparqlHost: envString("SPARQL_HOST", "http://anzograph:8080"),
SparqlEndpoint: envString("SPARQL_ENDPOINT", ""),
SparqlUser: envString("SPARQL_USER", ""),
SparqlPass: envString("SPARQL_PASS", ""),
SparqlInsecureTLS: envBool("SPARQL_INSECURE_TLS", false),
SparqlDataFile: envString("SPARQL_DATA_FILE", ""),
SparqlGraphIRI: envString("SPARQL_GRAPH_IRI", ""),
SparqlLoadOnStart: envBool("SPARQL_LOAD_ON_START", false),
SparqlClearOnStart: envBool("SPARQL_CLEAR_ON_START", false),
SparqlReadyRetries: envInt("SPARQL_READY_RETRIES", 30),
ListenAddr: envString("LISTEN_ADDR", ":8000"),
}
var err error
cfg.SparqlTimeout, err = envSeconds("SPARQL_TIMEOUT_S", 300)
if err != nil {
return Config{}, err
}
cfg.SparqlReadyDelay, err = envSeconds("SPARQL_READY_DELAY_S", 4)
if err != nil {
return Config{}, err
}
cfg.SparqlReadyTimeout, err = envSeconds("SPARQL_READY_TIMEOUT_S", 10)
if err != nil {
return Config{}, err
}
if cfg.SparqlLoadOnStart && strings.TrimSpace(cfg.SparqlDataFile) == "" {
return Config{}, fmt.Errorf("SPARQL_LOAD_ON_START=true but SPARQL_DATA_FILE is not set")
}
if cfg.DefaultNodeLimit < 1 {
return Config{}, fmt.Errorf("DEFAULT_NODE_LIMIT must be >= 1")
}
if cfg.DefaultEdgeLimit < 1 {
return Config{}, fmt.Errorf("DEFAULT_EDGE_LIMIT must be >= 1")
}
if cfg.MaxNodeLimit < 1 {
return Config{}, fmt.Errorf("MAX_NODE_LIMIT must be >= 1")
}
if cfg.MaxEdgeLimit < 1 {
return Config{}, fmt.Errorf("MAX_EDGE_LIMIT must be >= 1")
}
if cfg.DefaultNodeLimit > cfg.MaxNodeLimit {
return Config{}, fmt.Errorf("DEFAULT_NODE_LIMIT must be <= MAX_NODE_LIMIT")
}
if cfg.DefaultEdgeLimit > cfg.MaxEdgeLimit {
return Config{}, fmt.Errorf("DEFAULT_EDGE_LIMIT must be <= MAX_EDGE_LIMIT")
}
if cfg.EdgeBatchSize < 1 {
return Config{}, fmt.Errorf("EDGE_BATCH_SIZE must be >= 1")
}
if cfg.EdgeBatchSize > cfg.MaxEdgeLimit {
return Config{}, fmt.Errorf("EDGE_BATCH_SIZE must be <= MAX_EDGE_LIMIT")
}
return cfg, nil
}
func (c Config) EffectiveSparqlEndpoint() string {
if strings.TrimSpace(c.SparqlEndpoint) != "" {
return strings.TrimSpace(c.SparqlEndpoint)
}
return strings.TrimRight(c.SparqlHost, "/") + "/sparql"
}
func (c Config) corsOriginList() []string {
raw := strings.TrimSpace(c.CorsOrigins)
if raw == "" || raw == "*" {
return []string{"*"}
}
parts := strings.Split(raw, ",")
out := make([]string, 0, len(parts))
for _, p := range parts {
p = strings.TrimSpace(p)
if p == "" {
continue
}
out = append(out, p)
}
if len(out) == 0 {
return []string{"*"}
}
return out
}
func envString(name, def string) string {
v := os.Getenv(name)
if strings.TrimSpace(v) == "" {
return def
}
return v
}
func envBool(name string, def bool) bool {
v := strings.TrimSpace(os.Getenv(name))
if v == "" {
return def
}
switch strings.ToLower(v) {
case "1", "true", "yes", "y", "on":
return true
case "0", "false", "no", "n", "off":
return false
default:
return def
}
}
func envInt(name string, def int) int {
v := strings.TrimSpace(os.Getenv(name))
if v == "" {
return def
}
v = strings.ReplaceAll(v, "_", "")
n, err := strconv.Atoi(v)
if err != nil {
return def
}
return n
}
func envSeconds(name string, def float64) (time.Duration, error) {
v := strings.TrimSpace(os.Getenv(name))
if v == "" {
return time.Duration(def * float64(time.Second)), nil
}
f, err := strconv.ParseFloat(v, 64)
if err != nil {
return 0, fmt.Errorf("%s must be a number (seconds): %w", name, err)
}
return time.Duration(f * float64(time.Second)), nil
}

3
backend_go/go.mod Normal file
View File

@@ -0,0 +1,3 @@
module visualizador_instanciados/backend_go
go 1.22

View File

@@ -0,0 +1,96 @@
package main
type termKey struct {
termType string
key string
}
type graphAccumulator struct {
includeBNodes bool
nodeLimit int
nodeIDByKey map[termKey]uint32
nodes []Node
edges []Edge
preds *PredicateDict
}
func newGraphAccumulator(nodeLimit int, includeBNodes bool, edgeCapHint int, preds *PredicateDict) *graphAccumulator {
if preds == nil {
preds = NewPredicateDict(nil)
}
return &graphAccumulator{
includeBNodes: includeBNodes,
nodeLimit: nodeLimit,
nodeIDByKey: make(map[termKey]uint32),
nodes: make([]Node, 0, min(nodeLimit, 4096)),
edges: make([]Edge, 0, min(edgeCapHint, 4096)),
preds: preds,
}
}
func (g *graphAccumulator) getOrAddNode(term sparqlTerm) (uint32, bool) {
if term.Type == "" || term.Value == "" {
return 0, false
}
if term.Type == "literal" {
return 0, false
}
var key termKey
var node Node
if term.Type == "bnode" {
if !g.includeBNodes {
return 0, false
}
key = termKey{termType: "bnode", key: term.Value}
node = Node{ID: 0, TermType: "bnode", IRI: "_:" + term.Value, Label: nil, X: 0, Y: 0}
} else {
key = termKey{termType: "uri", key: term.Value}
node = Node{ID: 0, TermType: "uri", IRI: term.Value, Label: nil, X: 0, Y: 0}
}
if existing, ok := g.nodeIDByKey[key]; ok {
return existing, true
}
if len(g.nodes) >= g.nodeLimit {
return 0, false
}
nid := uint32(len(g.nodes))
g.nodeIDByKey[key] = nid
node.ID = nid
g.nodes = append(g.nodes, node)
return nid, true
}
func (g *graphAccumulator) addBindings(bindings []map[string]sparqlTerm) {
for _, b := range bindings {
sTerm := b["s"]
oTerm := b["o"]
pTerm := b["p"]
sid, okS := g.getOrAddNode(sTerm)
oid, okO := g.getOrAddNode(oTerm)
if !okS || !okO {
continue
}
predID, ok := g.preds.GetOrAdd(pTerm.Value)
if !ok {
continue
}
g.edges = append(g.edges, Edge{
Source: sid,
Target: oid,
PredicateID: predID,
})
}
}
func min(a, b int) int {
if a < b {
return a
}
return b
}

View File

@@ -0,0 +1,65 @@
package graph_queries
import "fmt"
func defaultEdgeQuery(limit int, offset int, includeBNodes bool) string {
bnodeFilter := ""
if !includeBNodes {
bnodeFilter = "FILTER(!isBlank(?s) && !isBlank(?o))"
}
return fmt.Sprintf(`
PREFIX rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#>
PREFIX rdfs: <http://www.w3.org/2000/01/rdf-schema#>
PREFIX owl: <http://www.w3.org/2002/07/owl#>
SELECT ?s ?p ?o
WHERE {
{
VALUES ?p { rdf:type }
?s ?p ?o .
?o rdf:type owl:Class .
}
UNION
{
VALUES ?p { rdfs:subClassOf }
?s ?p ?o .
}
FILTER(!isLiteral(?o))
%s
}
ORDER BY ?s ?p ?o
LIMIT %d
OFFSET %d
`, bnodeFilter, limit, offset)
}
func defaultPredicateQuery(includeBNodes bool) string {
bnodeFilter := ""
if !includeBNodes {
bnodeFilter = "FILTER(!isBlank(?s) && !isBlank(?o))"
}
return fmt.Sprintf(`
PREFIX rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#>
PREFIX rdfs: <http://www.w3.org/2000/01/rdf-schema#>
PREFIX owl: <http://www.w3.org/2002/07/owl#>
SELECT DISTINCT ?p
WHERE {
{
VALUES ?p { rdf:type }
?s ?p ?o .
?o rdf:type owl:Class .
}
UNION
{
VALUES ?p { rdfs:subClassOf }
?s ?p ?o .
}
FILTER(!isLiteral(?o))
%s
}
ORDER BY ?p
`, bnodeFilter)
}

View File

@@ -0,0 +1,45 @@
package graph_queries
import "fmt"
func hierarchyEdgeQuery(limit int, offset int, includeBNodes bool) string {
bnodeFilter := ""
if !includeBNodes {
bnodeFilter = "FILTER(!isBlank(?s) && !isBlank(?o))"
}
return fmt.Sprintf(`
PREFIX rdfs: <http://www.w3.org/2000/01/rdf-schema#>
SELECT ?s ?p ?o
WHERE {
VALUES ?p { rdfs:subClassOf }
?s ?p ?o .
FILTER(!isLiteral(?o))
%s
}
ORDER BY ?s ?p ?o
LIMIT %d
OFFSET %d
`, bnodeFilter, limit, offset)
}
func hierarchyPredicateQuery(includeBNodes bool) string {
bnodeFilter := ""
if !includeBNodes {
bnodeFilter = "FILTER(!isBlank(?s) && !isBlank(?o))"
}
return fmt.Sprintf(`
PREFIX rdfs: <http://www.w3.org/2000/01/rdf-schema#>
SELECT DISTINCT ?p
WHERE {
VALUES ?p { rdfs:subClassOf }
?s ?p ?o .
FILTER(!isLiteral(?o))
%s
}
ORDER BY ?p
`, bnodeFilter)
}

View File

@@ -0,0 +1,26 @@
package graph_queries
const DefaultID = "default"
var definitions = []Definition{
{Meta: Meta{ID: DefaultID, Label: "Default"}, EdgeQuery: defaultEdgeQuery, PredicateQuery: defaultPredicateQuery},
{Meta: Meta{ID: "hierarchy", Label: "Hierarchy"}, EdgeQuery: hierarchyEdgeQuery, PredicateQuery: hierarchyPredicateQuery},
{Meta: Meta{ID: "types", Label: "Types"}, EdgeQuery: typesOnlyEdgeQuery, PredicateQuery: typesOnlyPredicateQuery},
}
func List() []Meta {
out := make([]Meta, 0, len(definitions))
for _, d := range definitions {
out = append(out, d.Meta)
}
return out
}
func Get(id string) (Definition, bool) {
for _, d := range definitions {
if d.Meta.ID == id {
return d, true
}
}
return Definition{}, false
}

View File

@@ -0,0 +1,12 @@
package graph_queries
type Meta struct {
ID string `json:"id"`
Label string `json:"label"`
}
type Definition struct {
Meta Meta
EdgeQuery func(limit int, offset int, includeBNodes bool) string
PredicateQuery func(includeBNodes bool) string
}

View File

@@ -0,0 +1,49 @@
package graph_queries
import "fmt"
func typesOnlyEdgeQuery(limit int, offset int, includeBNodes bool) string {
bnodeFilter := ""
if !includeBNodes {
bnodeFilter = "FILTER(!isBlank(?s) && !isBlank(?o))"
}
return fmt.Sprintf(`
PREFIX rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#>
PREFIX owl: <http://www.w3.org/2002/07/owl#>
SELECT ?s ?p ?o
WHERE {
VALUES ?p { rdf:type }
?s ?p ?o .
?o rdf:type owl:Class .
FILTER(!isLiteral(?o))
%s
}
ORDER BY ?s ?p ?o
LIMIT %d
OFFSET %d
`, bnodeFilter, limit, offset)
}
func typesOnlyPredicateQuery(includeBNodes bool) string {
bnodeFilter := ""
if !includeBNodes {
bnodeFilter = "FILTER(!isBlank(?s) && !isBlank(?o))"
}
return fmt.Sprintf(`
PREFIX rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#>
PREFIX owl: <http://www.w3.org/2002/07/owl#>
SELECT DISTINCT ?p
WHERE {
VALUES ?p { rdf:type }
?s ?p ?o .
?o rdf:type owl:Class .
FILTER(!isLiteral(?o))
%s
}
ORDER BY ?p
`, bnodeFilter)
}

View File

@@ -0,0 +1,341 @@
package main
import (
"context"
"encoding/json"
"fmt"
"log"
"runtime"
"runtime/debug"
"sort"
"strings"
"time"
graphqueries "visualizador_instanciados/backend_go/graph_queries"
)
const (
rdfsLabelIRI = "http://www.w3.org/2000/01/rdf-schema#label"
)
func fetchGraphSnapshot(
ctx context.Context,
sparql *AnzoGraphClient,
cfg Config,
nodeLimit int,
edgeLimit int,
graphQueryID string,
) (GraphResponse, error) {
start := time.Now()
logStats := func(stage string) {
if !cfg.LogSnapshotTimings {
return
}
var ms runtime.MemStats
runtime.ReadMemStats(&ms)
log.Printf(
"[snapshot] %s graph_query_id=%s node_limit=%d edge_limit=%d elapsed=%s alloc=%dMB heap_inuse=%dMB sys=%dMB numgc=%d",
stage,
graphQueryID,
nodeLimit,
edgeLimit,
time.Since(start).Truncate(time.Millisecond),
ms.Alloc/1024/1024,
ms.HeapInuse/1024/1024,
ms.Sys/1024/1024,
ms.NumGC,
)
}
def, ok := graphqueries.Get(graphQueryID)
if !ok {
return GraphResponse{}, fmt.Errorf("unknown graph_query_id: %s", graphQueryID)
}
// Build predicate dictionary (predicate IRI -> uint32 ID) before fetching edges.
preds, err := func() (*PredicateDict, error) {
logStats("predicates_query_start")
predQ := def.PredicateQuery(cfg.IncludeBNodes)
t0 := time.Now()
rawPred, err := sparql.Query(ctx, predQ)
if err != nil {
return nil, fmt.Errorf("predicates query failed: %w", err)
}
if cfg.LogSnapshotTimings {
log.Printf("[snapshot] predicates_query_returned bytes=%d query_time=%s", len(rawPred), time.Since(t0).Truncate(time.Millisecond))
}
var predRes sparqlResponse
t1 := time.Now()
if err := json.Unmarshal(rawPred, &predRes); err != nil {
return nil, fmt.Errorf("predicates unmarshal failed: %w", err)
}
if cfg.LogSnapshotTimings {
log.Printf("[snapshot] predicates_unmarshal_done bindings=%d unmarshal_time=%s", len(predRes.Results.Bindings), time.Since(t1).Truncate(time.Millisecond))
}
predicateIRIs := make([]string, 0, len(predRes.Results.Bindings))
for _, b := range predRes.Results.Bindings {
pTerm, ok := b["p"]
if !ok || pTerm.Type != "uri" || pTerm.Value == "" {
continue
}
predicateIRIs = append(predicateIRIs, pTerm.Value)
}
logStats("predicates_dict_built")
return NewPredicateDict(predicateIRIs), nil
}()
if err != nil {
return GraphResponse{}, err
}
// Fetch edges in batches to avoid decoding a single huge SPARQL JSON response.
logStats("edges_batched_start")
batchSize := cfg.EdgeBatchSize
acc := newGraphAccumulator(nodeLimit, cfg.IncludeBNodes, min(edgeLimit, batchSize), preds)
totalBindings := 0
convAllT0 := time.Now()
for batch, offset := 0, 0; offset < edgeLimit; batch, offset = batch+1, offset+batchSize {
limit := batchSize
remaining := edgeLimit - offset
if remaining < limit {
limit = remaining
}
logStats(fmt.Sprintf("edges_batch_start batch=%d offset=%d limit=%d", batch, offset, limit))
bindings, err := func() ([]map[string]sparqlTerm, error) {
edgesQ := def.EdgeQuery(limit, offset, cfg.IncludeBNodes)
t0 := time.Now()
raw, err := sparql.Query(ctx, edgesQ)
if err != nil {
return nil, fmt.Errorf("edges query failed: %w", err)
}
if cfg.LogSnapshotTimings {
log.Printf("[snapshot] edges_batch_query_returned batch=%d offset=%d limit=%d bytes=%d query_time=%s", batch, offset, limit, len(raw), time.Since(t0).Truncate(time.Millisecond))
}
var res sparqlResponse
t1 := time.Now()
if err := json.Unmarshal(raw, &res); err != nil {
return nil, fmt.Errorf("edges unmarshal failed: %w", err)
}
if cfg.LogSnapshotTimings {
log.Printf("[snapshot] edges_batch_unmarshal_done batch=%d bindings=%d unmarshal_time=%s", batch, len(res.Results.Bindings), time.Since(t1).Truncate(time.Millisecond))
}
return res.Results.Bindings, nil
}()
if err != nil {
return GraphResponse{}, fmt.Errorf("edges batch=%d offset=%d limit=%d: %w", batch, offset, limit, err)
}
got := len(bindings)
totalBindings += got
if got == 0 {
bindings = nil
logStats(fmt.Sprintf("edges_batch_done_empty batch=%d offset=%d", batch, offset))
break
}
convT0 := time.Now()
acc.addBindings(bindings)
if cfg.LogSnapshotTimings {
log.Printf(
"[snapshot] edges_batch_convert_done batch=%d got_bindings=%d total_bindings=%d nodes=%d edges=%d convert_time=%s",
batch,
got,
totalBindings,
len(acc.nodes),
len(acc.edges),
time.Since(convT0).Truncate(time.Millisecond),
)
}
// Make the batch eligible for GC.
bindings = nil
logStats(fmt.Sprintf("edges_batch_done batch=%d offset=%d", batch, offset))
if cfg.FreeOSMemoryAfterSnapshot {
debug.FreeOSMemory()
logStats(fmt.Sprintf("edges_batch_free_os_memory_done batch=%d offset=%d", batch, offset))
}
if got < limit {
break
}
}
if cfg.LogSnapshotTimings {
log.Printf("[snapshot] convert_batches_done total_bindings=%d total_time=%s", totalBindings, time.Since(convAllT0).Truncate(time.Millisecond))
}
logStats("edges_batched_done")
nodes := acc.nodes
edges := acc.edges
// Layout: invert edges for hierarchy (target -> source).
hierEdges := make([][2]int, 0, len(edges))
for _, e := range edges {
hierEdges = append(hierEdges, [2]int{int(e.Target), int(e.Source)})
}
layers, cycleErr := levelSynchronousKahnLayers(len(nodes), hierEdges)
if cycleErr != nil {
sample := make([]string, 0, 20)
for _, nid := range cycleErr.RemainingNodeIDs {
if len(sample) >= 20 {
break
}
if nid >= 0 && nid < len(nodes) {
sample = append(sample, nodes[nid].IRI)
}
}
cycleErr.RemainingIRISample = sample
return GraphResponse{}, cycleErr
}
idToIRI := make([]string, len(nodes))
for i := range nodes {
idToIRI[i] = nodes[i].IRI
}
for _, layer := range layers {
sortLayerByIRI(layer, idToIRI)
}
xs, ys := radialPositionsFromLayers(len(nodes), layers, 5000.0)
for i := range nodes {
nodes[i].X = xs[i]
nodes[i].Y = ys[i]
}
// Attach labels for URI nodes.
iris := make([]string, 0)
for _, n := range nodes {
if n.TermType == "uri" && n.IRI != "" {
iris = append(iris, n.IRI)
}
}
if len(iris) > 0 {
labelByIRI, err := fetchRDFSLabels(ctx, sparql, iris, 500)
if err != nil {
return GraphResponse{}, fmt.Errorf("fetch rdfs:label failed: %w", err)
}
for i := range nodes {
if nodes[i].TermType != "uri" {
continue
}
lbl, ok := labelByIRI[nodes[i].IRI]
if !ok {
continue
}
val := lbl
nodes[i].Label = &val
}
}
meta := &GraphMeta{
Backend: "anzograph",
TTLPath: nil,
SparqlEndpoint: cfg.EffectiveSparqlEndpoint(),
IncludeBNodes: cfg.IncludeBNodes,
GraphQueryID: graphQueryID,
Predicates: preds.IRIs(),
NodeLimit: nodeLimit,
EdgeLimit: edgeLimit,
Nodes: len(nodes),
Edges: len(edges),
}
return GraphResponse{Nodes: nodes, Edges: edges, Meta: meta}, nil
}
type bestLabel struct {
score int
value string
}
func fetchRDFSLabels(
ctx context.Context,
sparql *AnzoGraphClient,
iris []string,
batchSize int,
) (map[string]string, error) {
best := make(map[string]bestLabel)
for i := 0; i < len(iris); i += batchSize {
end := i + batchSize
if end > len(iris) {
end = len(iris)
}
batch := iris[i:end]
values := make([]string, 0, len(batch))
for _, u := range batch {
values = append(values, "<"+u+">")
}
q := fmt.Sprintf(`
SELECT ?s ?label
WHERE {
VALUES ?s { %s }
?s <%s> ?label .
}
`, strings.Join(values, " "), rdfsLabelIRI)
raw, err := sparql.Query(ctx, q)
if err != nil {
return nil, err
}
var res sparqlResponse
if err := json.Unmarshal(raw, &res); err != nil {
return nil, fmt.Errorf("failed to parse SPARQL JSON: %w", err)
}
for _, b := range res.Results.Bindings {
sTerm, ok := b["s"]
if !ok || sTerm.Value == "" {
continue
}
lblTerm, ok := b["label"]
if !ok || lblTerm.Type != "literal" || lblTerm.Value == "" {
continue
}
score := labelScore(lblTerm.Lang)
prev, ok := best[sTerm.Value]
if !ok || score > prev.score {
best[sTerm.Value] = bestLabel{score: score, value: lblTerm.Value}
}
}
}
out := make(map[string]string, len(best))
for iri, v := range best {
out[iri] = v.value
}
return out, nil
}
func labelScore(lang string) int {
lang = strings.ToLower(strings.TrimSpace(lang))
if lang == "en" {
return 3
}
if lang == "" {
return 2
}
return 1
}
func sortIntsUnique(xs []int) []int {
if len(xs) == 0 {
return xs
}
sort.Ints(xs)
out := xs[:0]
var last int
for i, v := range xs {
if i == 0 || v != last {
out = append(out, v)
}
last = v
}
return out
}

View File

@@ -0,0 +1,23 @@
package main
import (
"encoding/json"
"io"
"net/http"
)
func writeJSON(w http.ResponseWriter, status int, v any) {
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(status)
enc := json.NewEncoder(w)
_ = enc.Encode(v)
}
func writeError(w http.ResponseWriter, status int, msg string) {
writeJSON(w, status, ErrorResponse{Detail: msg})
}
func decodeJSON(r io.Reader, dst any) error {
dec := json.NewDecoder(r)
return dec.Decode(dst)
}

148
backend_go/layout.go Normal file
View File

@@ -0,0 +1,148 @@
package main
import (
"fmt"
"math"
"sort"
)
type CycleError struct {
Processed int
Total int
RemainingNodeIDs []int
RemainingIRISample []string
}
func (e *CycleError) Error() string {
msg := fmt.Sprintf("Cycle detected in subClassOf graph (processed %d/%d nodes).", e.Processed, e.Total)
if len(e.RemainingIRISample) > 0 {
msg += " Example nodes: " + stringsJoin(e.RemainingIRISample, ", ")
}
return msg
}
func levelSynchronousKahnLayers(nodeCount int, edges [][2]int) ([][]int, *CycleError) {
n := nodeCount
if n <= 0 {
return [][]int{}, nil
}
adj := make([][]int, n)
indeg := make([]int, n)
for _, e := range edges {
u, v := e[0], e[1]
if u == v {
continue
}
if u < 0 || u >= n || v < 0 || v >= n {
continue
}
adj[u] = append(adj[u], v)
indeg[v]++
}
q := make([]int, 0, n)
for i, d := range indeg {
if d == 0 {
q = append(q, i)
}
}
layers := make([][]int, 0)
processed := 0
for len(q) > 0 {
layer := append([]int(nil), q...)
q = q[:0]
layers = append(layers, layer)
for _, u := range layer {
processed++
for _, v := range adj[u] {
indeg[v]--
if indeg[v] == 0 {
q = append(q, v)
}
}
}
}
if processed != n {
remaining := make([]int, 0)
for i, d := range indeg {
if d > 0 {
remaining = append(remaining, i)
}
}
return nil, &CycleError{Processed: processed, Total: n, RemainingNodeIDs: remaining}
}
return layers, nil
}
func radialPositionsFromLayers(nodeCount int, layers [][]int, maxR float64) (xs []float64, ys []float64) {
n := nodeCount
if n <= 0 {
return []float64{}, []float64{}
}
xs = make([]float64, n)
ys = make([]float64, n)
if len(layers) == 0 {
return xs, ys
}
twoPi := 2.0 * math.Pi
golden := math.Pi * (3.0 - math.Sqrt(5.0))
layerCount := float64(len(layers))
denom := layerCount + 1.0
for li, layer := range layers {
m := len(layer)
if m == 0 {
continue
}
r := (float64(li+1) / denom) * maxR
offset := math.Mod(float64(li)*golden, twoPi)
if m == 1 {
nid := layer[0]
if nid >= 0 && nid < n {
xs[nid] = r * math.Cos(offset)
ys[nid] = r * math.Sin(offset)
}
continue
}
step := twoPi / float64(m)
for j, nid := range layer {
if nid < 0 || nid >= n {
continue
}
t := offset + step*float64(j)
xs[nid] = r * math.Cos(t)
ys[nid] = r * math.Sin(t)
}
}
return xs, ys
}
func sortLayerByIRI(layer []int, idToIRI []string) {
sort.Slice(layer, func(i, j int) bool {
return idToIRI[layer[i]] < idToIRI[layer[j]]
})
}
func stringsJoin(parts []string, sep string) string {
if len(parts) == 0 {
return ""
}
out := parts[0]
for i := 1; i < len(parts); i++ {
out += sep
out += parts[i]
}
return out
}

35
backend_go/main.go Normal file
View File

@@ -0,0 +1,35 @@
package main
import (
"context"
"log"
"net/http"
"time"
)
func main() {
cfg, err := LoadConfig()
if err != nil {
log.Fatal(err)
}
sparql := NewAnzoGraphClient(cfg)
if err := sparql.Startup(context.Background()); err != nil {
log.Fatal(err)
}
api := &APIServer{
cfg: cfg,
sparql: sparql,
snapshots: NewGraphSnapshotService(sparql, cfg),
}
srv := &http.Server{
Addr: cfg.ListenAddr,
Handler: api.handler(),
ReadHeaderTimeout: 5 * time.Second,
}
log.Printf("backend listening on %s", cfg.ListenAddr)
log.Fatal(srv.ListenAndServe())
}

82
backend_go/models.go Normal file
View File

@@ -0,0 +1,82 @@
package main
type ErrorResponse struct {
Detail string `json:"detail"`
}
type HealthResponse struct {
Status string `json:"status"`
}
type Node struct {
ID uint32 `json:"id"`
TermType string `json:"termType"` // "uri" | "bnode"
IRI string `json:"iri"`
Label *string `json:"label"`
X float64 `json:"x"`
Y float64 `json:"y"`
}
type Edge struct {
Source uint32 `json:"source"`
Target uint32 `json:"target"`
PredicateID uint32 `json:"predicate_id"`
}
type GraphMeta struct {
Backend string `json:"backend"`
TTLPath *string `json:"ttl_path"`
SparqlEndpoint string `json:"sparql_endpoint"`
IncludeBNodes bool `json:"include_bnodes"`
GraphQueryID string `json:"graph_query_id"`
Predicates []string `json:"predicates,omitempty"` // index = predicate_id
NodeLimit int `json:"node_limit"`
EdgeLimit int `json:"edge_limit"`
Nodes int `json:"nodes"`
Edges int `json:"edges"`
}
type GraphResponse struct {
Nodes []Node `json:"nodes"`
Edges []Edge `json:"edges"`
Meta *GraphMeta `json:"meta"`
}
type StatsResponse struct {
Backend string `json:"backend"`
TTLPath *string `json:"ttl_path"`
SparqlEndpoint *string `json:"sparql_endpoint"`
ParsedTriples int `json:"parsed_triples"`
Nodes int `json:"nodes"`
Edges int `json:"edges"`
}
type SparqlQueryRequest struct {
Query string `json:"query"`
}
type NeighborsRequest struct {
SelectedIDs []uint32 `json:"selected_ids"`
NodeLimit *int `json:"node_limit,omitempty"`
EdgeLimit *int `json:"edge_limit,omitempty"`
GraphQueryID *string `json:"graph_query_id,omitempty"`
}
type NeighborsResponse struct {
SelectedIDs []uint32 `json:"selected_ids"`
NeighborIDs []uint32 `json:"neighbor_ids"`
}
type SelectionQueryRequest struct {
QueryID string `json:"query_id"`
SelectedIDs []uint32 `json:"selected_ids"`
NodeLimit *int `json:"node_limit,omitempty"`
EdgeLimit *int `json:"edge_limit,omitempty"`
GraphQueryID *string `json:"graph_query_id,omitempty"`
}
type SelectionQueryResponse struct {
QueryID string `json:"query_id"`
SelectedIDs []uint32 `json:"selected_ids"`
NeighborIDs []uint32 `json:"neighbor_ids"`
}

View File

@@ -0,0 +1,40 @@
package main
type PredicateDict struct {
idByIRI map[string]uint32
iriByID []string
}
func NewPredicateDict(predicates []string) *PredicateDict {
idByIRI := make(map[string]uint32, len(predicates))
iriByID := make([]string, 0, len(predicates))
for _, iri := range predicates {
if iri == "" {
continue
}
if _, ok := idByIRI[iri]; ok {
continue
}
id := uint32(len(iriByID))
idByIRI[iri] = id
iriByID = append(iriByID, iri)
}
return &PredicateDict{idByIRI: idByIRI, iriByID: iriByID}
}
func (d *PredicateDict) GetOrAdd(iri string) (uint32, bool) {
if iri == "" {
return 0, false
}
if id, ok := d.idByIRI[iri]; ok {
return id, true
}
id := uint32(len(d.iriByID))
d.idByIRI[iri] = id
d.iriByID = append(d.iriByID, iri)
return id, true
}
func (d *PredicateDict) IRIs() []string {
return d.iriByID
}

View File

@@ -0,0 +1,101 @@
package selection_queries
import (
"encoding/json"
"fmt"
"sort"
"strings"
)
func nodeKey(termType, iri string) string {
return termType + "\x00" + iri
}
func valuesTerm(n NodeRef) string {
if n.TermType == "uri" {
if n.IRI == "" {
return ""
}
return "<" + n.IRI + ">"
}
if n.TermType == "bnode" {
if n.IRI == "" {
return ""
}
if strings.HasPrefix(n.IRI, "_:") {
return n.IRI
}
return "_:" + n.IRI
}
return ""
}
func termKeyFromSparqlTerm(term sparqlTerm, includeBNodes bool) (string, bool) {
if term.Type == "" || term.Value == "" {
return "", false
}
if term.Type == "literal" {
return "", false
}
if term.Type == "bnode" {
if !includeBNodes {
return "", false
}
return nodeKey("bnode", "_:"+term.Value), true
}
if term.Type == "uri" {
return nodeKey("uri", term.Value), true
}
return "", false
}
func selectedNodesFromIDs(idx Index, selectedIDs []uint32, includeBNodes bool) ([]NodeRef, map[uint32]struct{}) {
out := make([]NodeRef, 0, len(selectedIDs))
set := make(map[uint32]struct{}, len(selectedIDs))
for _, nid := range selectedIDs {
n, ok := idx.IDToNode[nid]
if !ok {
continue
}
if n.TermType == "bnode" && !includeBNodes {
continue
}
out = append(out, n)
set[nid] = struct{}{}
}
return out, set
}
func idsFromBindings(raw []byte, varName string, idx Index, selectedSet map[uint32]struct{}, includeBNodes bool) ([]uint32, error) {
var res sparqlResponse
if err := json.Unmarshal(raw, &res); err != nil {
return nil, fmt.Errorf("failed to parse SPARQL JSON: %w", err)
}
neighborSet := make(map[uint32]struct{})
for _, b := range res.Results.Bindings {
term, ok := b[varName]
if !ok {
continue
}
key, ok := termKeyFromSparqlTerm(term, includeBNodes)
if !ok {
continue
}
nid, ok := idx.KeyToID[key]
if !ok {
continue
}
if _, sel := selectedSet[nid]; sel {
continue
}
neighborSet[nid] = struct{}{}
}
ids := make([]uint32, 0, len(neighborSet))
for nid := range neighborSet {
ids = append(ids, nid)
}
sort.Slice(ids, func(i, j int) bool { return ids[i] < ids[j] })
return ids, nil
}

View File

@@ -0,0 +1,76 @@
package selection_queries
import (
"context"
"fmt"
"strings"
)
func neighborsQuery(selectedNodes []NodeRef, includeBNodes bool) string {
valuesTerms := make([]string, 0, len(selectedNodes))
for _, n := range selectedNodes {
t := valuesTerm(n)
if t == "" {
continue
}
valuesTerms = append(valuesTerms, t)
}
if len(valuesTerms) == 0 {
return "SELECT ?nbr WHERE { FILTER(false) }"
}
bnodeFilter := ""
if !includeBNodes {
bnodeFilter = "FILTER(!isBlank(?nbr))"
}
values := strings.Join(valuesTerms, " ")
return fmt.Sprintf(`
PREFIX rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#>
PREFIX rdfs: <http://www.w3.org/2000/01/rdf-schema#>
PREFIX owl: <http://www.w3.org/2002/07/owl#>
SELECT DISTINCT ?nbr
WHERE {
VALUES ?sel { %s }
{
?sel rdf:type ?o .
?o rdf:type owl:Class .
BIND(?o AS ?nbr)
}
UNION
{
?s rdf:type ?sel .
?sel rdf:type owl:Class .
BIND(?s AS ?nbr)
}
UNION
{
?sel rdfs:subClassOf ?o .
BIND(?o AS ?nbr)
}
UNION
{
?s rdfs:subClassOf ?sel .
BIND(?s AS ?nbr)
}
FILTER(!isLiteral(?nbr))
FILTER(?nbr != ?sel)
%s
}
`, values, bnodeFilter)
}
func runNeighbors(ctx context.Context, q Querier, idx Index, selectedIDs []uint32, includeBNodes bool) ([]uint32, error) {
selectedNodes, selectedSet := selectedNodesFromIDs(idx, selectedIDs, includeBNodes)
if len(selectedNodes) == 0 {
return []uint32{}, nil
}
raw, err := q.Query(ctx, neighborsQuery(selectedNodes, includeBNodes))
if err != nil {
return nil, err
}
return idsFromBindings(raw, "nbr", idx, selectedSet, includeBNodes)
}

View File

@@ -0,0 +1,33 @@
package selection_queries
var definitions = []Definition{
{
Meta: Meta{ID: "neighbors", Label: "Neighbors"},
Run: runNeighbors,
},
{
Meta: Meta{ID: "superclasses", Label: "Superclasses"},
Run: runSuperclasses,
},
{
Meta: Meta{ID: "subclasses", Label: "Subclasses"},
Run: runSubclasses,
},
}
func List() []Meta {
out := make([]Meta, 0, len(definitions))
for _, d := range definitions {
out = append(out, d.Meta)
}
return out
}
func Get(id string) (Definition, bool) {
for _, d := range definitions {
if d.Meta.ID == id {
return d, true
}
}
return Definition{}, false
}

View File

@@ -0,0 +1,14 @@
package selection_queries
type sparqlTerm struct {
Type string `json:"type"`
Value string `json:"value"`
Lang string `json:"xml:lang,omitempty"`
}
type sparqlResponse struct {
Results struct {
Bindings []map[string]sparqlTerm `json:"bindings"`
} `json:"results"`
}

View File

@@ -0,0 +1,54 @@
package selection_queries
import (
"context"
"fmt"
"strings"
)
func subclassesQuery(selectedNodes []NodeRef, includeBNodes bool) string {
valuesTerms := make([]string, 0, len(selectedNodes))
for _, n := range selectedNodes {
t := valuesTerm(n)
if t == "" {
continue
}
valuesTerms = append(valuesTerms, t)
}
if len(valuesTerms) == 0 {
return "SELECT ?nbr WHERE { FILTER(false) }"
}
bnodeFilter := ""
if !includeBNodes {
bnodeFilter = "FILTER(!isBlank(?nbr))"
}
values := strings.Join(valuesTerms, " ")
return fmt.Sprintf(`
PREFIX rdfs: <http://www.w3.org/2000/01/rdf-schema#>
SELECT DISTINCT ?nbr
WHERE {
VALUES ?sel { %s }
?nbr rdfs:subClassOf ?sel .
FILTER(!isLiteral(?nbr))
FILTER(?nbr != ?sel)
%s
}
`, values, bnodeFilter)
}
func runSubclasses(ctx context.Context, q Querier, idx Index, selectedIDs []uint32, includeBNodes bool) ([]uint32, error) {
selectedNodes, selectedSet := selectedNodesFromIDs(idx, selectedIDs, includeBNodes)
if len(selectedNodes) == 0 {
return []uint32{}, nil
}
raw, err := q.Query(ctx, subclassesQuery(selectedNodes, includeBNodes))
if err != nil {
return nil, err
}
return idsFromBindings(raw, "nbr", idx, selectedSet, includeBNodes)
}

View File

@@ -0,0 +1,54 @@
package selection_queries
import (
"context"
"fmt"
"strings"
)
func superclassesQuery(selectedNodes []NodeRef, includeBNodes bool) string {
valuesTerms := make([]string, 0, len(selectedNodes))
for _, n := range selectedNodes {
t := valuesTerm(n)
if t == "" {
continue
}
valuesTerms = append(valuesTerms, t)
}
if len(valuesTerms) == 0 {
return "SELECT ?nbr WHERE { FILTER(false) }"
}
bnodeFilter := ""
if !includeBNodes {
bnodeFilter = "FILTER(!isBlank(?nbr))"
}
values := strings.Join(valuesTerms, " ")
return fmt.Sprintf(`
PREFIX rdfs: <http://www.w3.org/2000/01/rdf-schema#>
SELECT DISTINCT ?nbr
WHERE {
VALUES ?sel { %s }
?sel rdfs:subClassOf ?nbr .
FILTER(!isLiteral(?nbr))
FILTER(?nbr != ?sel)
%s
}
`, values, bnodeFilter)
}
func runSuperclasses(ctx context.Context, q Querier, idx Index, selectedIDs []uint32, includeBNodes bool) ([]uint32, error) {
selectedNodes, selectedSet := selectedNodesFromIDs(idx, selectedIDs, includeBNodes)
if len(selectedNodes) == 0 {
return []uint32{}, nil
}
raw, err := q.Query(ctx, superclassesQuery(selectedNodes, includeBNodes))
if err != nil {
return nil, err
}
return idsFromBindings(raw, "nbr", idx, selectedSet, includeBNodes)
}

View File

@@ -0,0 +1,28 @@
package selection_queries
import "context"
type Querier interface {
Query(ctx context.Context, query string) ([]byte, error)
}
type NodeRef struct {
ID uint32
TermType string // "uri" | "bnode"
IRI string
}
type Index struct {
IDToNode map[uint32]NodeRef
KeyToID map[string]uint32
}
type Meta struct {
ID string `json:"id"`
Label string `json:"label"`
}
type Definition struct {
Meta Meta
Run func(ctx context.Context, q Querier, idx Index, selectedIDs []uint32, includeBNodes bool) ([]uint32, error)
}

View File

@@ -0,0 +1,32 @@
package main
import (
"context"
"fmt"
selectionqueries "visualizador_instanciados/backend_go/selection_queries"
)
func runSelectionQuery(
ctx context.Context,
sparql *AnzoGraphClient,
snapshot GraphResponse,
queryID string,
selectedIDs []uint32,
includeBNodes bool,
) ([]uint32, error) {
def, ok := selectionqueries.Get(queryID)
if !ok {
return nil, fmt.Errorf("unknown query_id: %s", queryID)
}
idToNode := make(map[uint32]selectionqueries.NodeRef, len(snapshot.Nodes))
keyToID := make(map[string]uint32, len(snapshot.Nodes))
for _, n := range snapshot.Nodes {
nr := selectionqueries.NodeRef{ID: n.ID, TermType: n.TermType, IRI: n.IRI}
idToNode[n.ID] = nr
keyToID[n.TermType+"\x00"+n.IRI] = n.ID
}
return def.Run(ctx, sparql, selectionqueries.Index{IDToNode: idToNode, KeyToID: keyToID}, selectedIDs, includeBNodes)
}

304
backend_go/server.go Normal file
View File

@@ -0,0 +1,304 @@
package main
import (
"fmt"
"log"
"net/http"
"strconv"
"strings"
graphqueries "visualizador_instanciados/backend_go/graph_queries"
selectionqueries "visualizador_instanciados/backend_go/selection_queries"
)
type APIServer struct {
cfg Config
sparql *AnzoGraphClient
snapshots *GraphSnapshotService
}
func (s *APIServer) handler() http.Handler {
mux := http.NewServeMux()
mux.HandleFunc("/api/health", s.handleHealth)
mux.HandleFunc("/api/stats", s.handleStats)
mux.HandleFunc("/api/sparql", s.handleSparql)
mux.HandleFunc("/api/graph", s.handleGraph)
mux.HandleFunc("/api/graph_queries", s.handleGraphQueries)
mux.HandleFunc("/api/selection_queries", s.handleSelectionQueries)
mux.HandleFunc("/api/selection_query", s.handleSelectionQuery)
mux.HandleFunc("/api/neighbors", s.handleNeighbors)
return s.corsMiddleware(mux)
}
func (s *APIServer) corsMiddleware(next http.Handler) http.Handler {
origins := s.cfg.corsOriginList()
allowAll := len(origins) == 1 && origins[0] == "*"
allowed := make(map[string]struct{}, len(origins))
for _, o := range origins {
allowed[o] = struct{}{}
}
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
origin := r.Header.Get("Origin")
if origin != "" {
if allowAll {
w.Header().Set("Access-Control-Allow-Origin", "*")
} else if _, ok := allowed[origin]; ok {
w.Header().Set("Access-Control-Allow-Origin", origin)
w.Header().Add("Vary", "Origin")
}
w.Header().Set("Access-Control-Allow-Methods", "GET,POST,OPTIONS")
w.Header().Set("Access-Control-Allow-Headers", "*")
}
if r.Method == http.MethodOptions {
w.WriteHeader(http.StatusNoContent)
return
}
next.ServeHTTP(w, r)
})
}
func (s *APIServer) handleHealth(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodGet {
w.WriteHeader(http.StatusMethodNotAllowed)
return
}
writeJSON(w, http.StatusOK, HealthResponse{Status: "ok"})
}
func (s *APIServer) handleStats(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodGet {
w.WriteHeader(http.StatusMethodNotAllowed)
return
}
ctx := r.Context()
snap, err := s.snapshots.Get(ctx, s.cfg.DefaultNodeLimit, s.cfg.DefaultEdgeLimit, graphqueries.DefaultID)
if err != nil {
log.Printf("handleStats: snapshot error: %v", err)
writeError(w, http.StatusInternalServerError, err.Error())
return
}
endpoint := snap.Meta.SparqlEndpoint
writeJSON(w, http.StatusOK, StatsResponse{
Backend: snap.Meta.Backend,
TTLPath: snap.Meta.TTLPath,
SparqlEndpoint: &endpoint,
ParsedTriples: len(snap.Edges),
Nodes: len(snap.Nodes),
Edges: len(snap.Edges),
})
}
func (s *APIServer) handleSparql(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
w.WriteHeader(http.StatusMethodNotAllowed)
return
}
var req SparqlQueryRequest
if err := decodeJSON(r.Body, &req); err != nil || strings.TrimSpace(req.Query) == "" {
writeError(w, http.StatusUnprocessableEntity, "invalid request body")
return
}
raw, err := s.sparql.Query(r.Context(), req.Query)
if err != nil {
writeError(w, http.StatusBadGateway, err.Error())
return
}
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
_, _ = w.Write(raw)
}
func (s *APIServer) handleGraph(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodGet {
w.WriteHeader(http.StatusMethodNotAllowed)
return
}
nodeLimit, err := intQuery(r, "node_limit", s.cfg.DefaultNodeLimit)
if err != nil || nodeLimit < 1 || nodeLimit > s.cfg.MaxNodeLimit {
writeError(w, http.StatusUnprocessableEntity, fmt.Sprintf("node_limit must be between 1 and %d", s.cfg.MaxNodeLimit))
return
}
edgeLimit, err := intQuery(r, "edge_limit", s.cfg.DefaultEdgeLimit)
if err != nil || edgeLimit < 1 || edgeLimit > s.cfg.MaxEdgeLimit {
writeError(w, http.StatusUnprocessableEntity, fmt.Sprintf("edge_limit must be between 1 and %d", s.cfg.MaxEdgeLimit))
return
}
graphQueryID := strings.TrimSpace(r.URL.Query().Get("graph_query_id"))
if graphQueryID == "" {
graphQueryID = graphqueries.DefaultID
}
if _, ok := graphqueries.Get(graphQueryID); !ok {
writeError(w, http.StatusUnprocessableEntity, "unknown graph_query_id")
return
}
snap, err := s.snapshots.Get(r.Context(), nodeLimit, edgeLimit, graphQueryID)
if err != nil {
log.Printf("handleGraph: snapshot error graph_query_id=%s node_limit=%d edge_limit=%d err=%v", graphQueryID, nodeLimit, edgeLimit, err)
if _, ok := err.(*CycleError); ok {
writeError(w, http.StatusUnprocessableEntity, err.Error())
return
}
writeError(w, http.StatusInternalServerError, err.Error())
return
}
writeJSON(w, http.StatusOK, snap)
}
func (s *APIServer) handleGraphQueries(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodGet {
w.WriteHeader(http.StatusMethodNotAllowed)
return
}
writeJSON(w, http.StatusOK, graphqueries.List())
}
func (s *APIServer) handleSelectionQueries(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodGet {
w.WriteHeader(http.StatusMethodNotAllowed)
return
}
writeJSON(w, http.StatusOK, selectionqueries.List())
}
func (s *APIServer) handleSelectionQuery(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
w.WriteHeader(http.StatusMethodNotAllowed)
return
}
var req SelectionQueryRequest
if err := decodeJSON(r.Body, &req); err != nil || strings.TrimSpace(req.QueryID) == "" {
writeError(w, http.StatusUnprocessableEntity, "invalid request body")
return
}
if _, ok := selectionqueries.Get(req.QueryID); !ok {
writeError(w, http.StatusUnprocessableEntity, "unknown query_id")
return
}
if len(req.SelectedIDs) == 0 {
writeJSON(w, http.StatusOK, SelectionQueryResponse{
QueryID: req.QueryID,
SelectedIDs: req.SelectedIDs,
NeighborIDs: []uint32{},
})
return
}
graphQueryID := graphqueries.DefaultID
if req.GraphQueryID != nil && strings.TrimSpace(*req.GraphQueryID) != "" {
graphQueryID = strings.TrimSpace(*req.GraphQueryID)
}
if _, ok := graphqueries.Get(graphQueryID); !ok {
writeError(w, http.StatusUnprocessableEntity, "unknown graph_query_id")
return
}
nodeLimit := s.cfg.DefaultNodeLimit
edgeLimit := s.cfg.DefaultEdgeLimit
if req.NodeLimit != nil {
nodeLimit = *req.NodeLimit
}
if req.EdgeLimit != nil {
edgeLimit = *req.EdgeLimit
}
if nodeLimit < 1 || nodeLimit > s.cfg.MaxNodeLimit || edgeLimit < 1 || edgeLimit > s.cfg.MaxEdgeLimit {
writeError(w, http.StatusUnprocessableEntity, "invalid node_limit/edge_limit")
return
}
snap, err := s.snapshots.Get(r.Context(), nodeLimit, edgeLimit, graphQueryID)
if err != nil {
writeError(w, http.StatusInternalServerError, err.Error())
return
}
ids, err := runSelectionQuery(r.Context(), s.sparql, snap, req.QueryID, req.SelectedIDs, s.cfg.IncludeBNodes)
if err != nil {
writeError(w, http.StatusBadGateway, err.Error())
return
}
writeJSON(w, http.StatusOK, SelectionQueryResponse{
QueryID: req.QueryID,
SelectedIDs: req.SelectedIDs,
NeighborIDs: ids,
})
}
func (s *APIServer) handleNeighbors(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
w.WriteHeader(http.StatusMethodNotAllowed)
return
}
var req NeighborsRequest
if err := decodeJSON(r.Body, &req); err != nil {
writeError(w, http.StatusUnprocessableEntity, "invalid request body")
return
}
if len(req.SelectedIDs) == 0 {
writeJSON(w, http.StatusOK, NeighborsResponse{SelectedIDs: req.SelectedIDs, NeighborIDs: []uint32{}})
return
}
graphQueryID := graphqueries.DefaultID
if req.GraphQueryID != nil && strings.TrimSpace(*req.GraphQueryID) != "" {
graphQueryID = strings.TrimSpace(*req.GraphQueryID)
}
if _, ok := graphqueries.Get(graphQueryID); !ok {
writeError(w, http.StatusUnprocessableEntity, "unknown graph_query_id")
return
}
nodeLimit := s.cfg.DefaultNodeLimit
edgeLimit := s.cfg.DefaultEdgeLimit
if req.NodeLimit != nil {
nodeLimit = *req.NodeLimit
}
if req.EdgeLimit != nil {
edgeLimit = *req.EdgeLimit
}
if nodeLimit < 1 || nodeLimit > s.cfg.MaxNodeLimit || edgeLimit < 1 || edgeLimit > s.cfg.MaxEdgeLimit {
writeError(w, http.StatusUnprocessableEntity, "invalid node_limit/edge_limit")
return
}
snap, err := s.snapshots.Get(r.Context(), nodeLimit, edgeLimit, graphQueryID)
if err != nil {
writeError(w, http.StatusInternalServerError, err.Error())
return
}
nbrs, err := runSelectionQuery(r.Context(), s.sparql, snap, "neighbors", req.SelectedIDs, s.cfg.IncludeBNodes)
if err != nil {
writeError(w, http.StatusBadGateway, err.Error())
return
}
writeJSON(w, http.StatusOK, NeighborsResponse{SelectedIDs: req.SelectedIDs, NeighborIDs: nbrs})
}
func intQuery(r *http.Request, name string, def int) (int, error) {
raw := strings.TrimSpace(r.URL.Query().Get(name))
if raw == "" {
return def, nil
}
n, err := strconv.Atoi(raw)
if err != nil {
return 0, err
}
return n, nil
}

View File

@@ -0,0 +1,76 @@
package main
import (
"context"
"sync"
)
type snapshotKey struct {
NodeLimit int
EdgeLimit int
IncludeBNodes bool
GraphQueryID string
}
type snapshotInflight struct {
ready chan struct{}
snapshot GraphResponse
err error
}
type GraphSnapshotService struct {
sparql *AnzoGraphClient
cfg Config
mu sync.Mutex
cache map[snapshotKey]GraphResponse
inflight map[snapshotKey]*snapshotInflight
}
func NewGraphSnapshotService(sparql *AnzoGraphClient, cfg Config) *GraphSnapshotService {
return &GraphSnapshotService{
sparql: sparql,
cfg: cfg,
cache: make(map[snapshotKey]GraphResponse),
inflight: make(map[snapshotKey]*snapshotInflight),
}
}
func (s *GraphSnapshotService) Get(ctx context.Context, nodeLimit int, edgeLimit int, graphQueryID string) (GraphResponse, error) {
key := snapshotKey{NodeLimit: nodeLimit, EdgeLimit: edgeLimit, IncludeBNodes: s.cfg.IncludeBNodes, GraphQueryID: graphQueryID}
s.mu.Lock()
if snap, ok := s.cache[key]; ok {
s.mu.Unlock()
return snap, nil
}
if inf, ok := s.inflight[key]; ok {
ready := inf.ready
s.mu.Unlock()
select {
case <-ctx.Done():
return GraphResponse{}, ctx.Err()
case <-ready:
return inf.snapshot, inf.err
}
}
inf := &snapshotInflight{ready: make(chan struct{})}
s.inflight[key] = inf
s.mu.Unlock()
snap, err := fetchGraphSnapshot(ctx, s.sparql, s.cfg, nodeLimit, edgeLimit, graphQueryID)
s.mu.Lock()
inf.snapshot = snap
inf.err = err
delete(s.inflight, key)
if err == nil {
s.cache[key] = snap
}
close(inf.ready)
s.mu.Unlock()
return snap, err
}

169
backend_go/sparql.go Normal file
View File

@@ -0,0 +1,169 @@
package main
import (
"context"
"encoding/base64"
"fmt"
"io"
"net/http"
"net/url"
"strings"
"time"
)
type AnzoGraphClient struct {
cfg Config
endpoint string
authHeader string
client *http.Client
}
func NewAnzoGraphClient(cfg Config) *AnzoGraphClient {
endpoint := cfg.EffectiveSparqlEndpoint()
authHeader := ""
user := strings.TrimSpace(cfg.SparqlUser)
pass := strings.TrimSpace(cfg.SparqlPass)
if user != "" && pass != "" {
token := base64.StdEncoding.EncodeToString([]byte(user + ":" + pass))
authHeader = "Basic " + token
}
return &AnzoGraphClient{
cfg: cfg,
endpoint: endpoint,
authHeader: authHeader,
client: &http.Client{},
}
}
func (c *AnzoGraphClient) Startup(ctx context.Context) error {
if err := c.waitReady(ctx); err != nil {
return err
}
if c.cfg.SparqlClearOnStart {
if err := c.update(ctx, "CLEAR ALL"); err != nil {
return err
}
if err := c.waitReady(ctx); err != nil {
return err
}
}
if c.cfg.SparqlLoadOnStart {
df := strings.TrimSpace(c.cfg.SparqlDataFile)
if df == "" {
return fmt.Errorf("SPARQL_LOAD_ON_START=true but SPARQL_DATA_FILE is not set")
}
giri := strings.TrimSpace(c.cfg.SparqlGraphIRI)
if giri != "" {
if err := c.update(ctx, fmt.Sprintf("LOAD <%s> INTO GRAPH <%s>", df, giri)); err != nil {
return err
}
} else {
if err := c.update(ctx, fmt.Sprintf("LOAD <%s>", df)); err != nil {
return err
}
}
if err := c.waitReady(ctx); err != nil {
return err
}
}
return nil
}
func (c *AnzoGraphClient) Shutdown(ctx context.Context) error {
_ = ctx
return nil
}
func (c *AnzoGraphClient) Query(ctx context.Context, query string) ([]byte, error) {
return c.queryWithTimeout(ctx, query, c.cfg.SparqlTimeout)
}
func (c *AnzoGraphClient) queryWithTimeout(ctx context.Context, query string, timeout time.Duration) ([]byte, error) {
ctx2, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
form := url.Values{}
form.Set("query", query)
req, err := http.NewRequestWithContext(ctx2, http.MethodPost, c.endpoint, strings.NewReader(form.Encode()))
if err != nil {
return nil, err
}
req.Header.Set("Content-Type", "application/x-www-form-urlencoded")
req.Header.Set("Accept", "application/sparql-results+json")
if c.authHeader != "" {
req.Header.Set("Authorization", c.authHeader)
}
resp, err := c.client.Do(req)
if err != nil {
return nil, err
}
defer resp.Body.Close()
body, err := io.ReadAll(resp.Body)
if err != nil {
return nil, err
}
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
return nil, fmt.Errorf("sparql query failed: %s: %s", resp.Status, strings.TrimSpace(string(body)))
}
return body, nil
}
func (c *AnzoGraphClient) update(ctx context.Context, update string) error {
ctx2, cancel := context.WithTimeout(ctx, c.cfg.SparqlTimeout)
defer cancel()
req, err := http.NewRequestWithContext(ctx2, http.MethodPost, c.endpoint, strings.NewReader(update))
if err != nil {
return err
}
req.Header.Set("Content-Type", "application/sparql-update")
req.Header.Set("Accept", "application/json")
if c.authHeader != "" {
req.Header.Set("Authorization", c.authHeader)
}
resp, err := c.client.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
body, _ := io.ReadAll(resp.Body)
return fmt.Errorf("sparql update failed: %s: %s", resp.Status, strings.TrimSpace(string(body)))
}
return nil
}
func (c *AnzoGraphClient) waitReady(ctx context.Context) error {
var lastErr error
for i := 0; i < c.cfg.SparqlReadyRetries; i++ {
select {
case <-ctx.Done():
if lastErr != nil {
return fmt.Errorf("anzograph not ready at %s: %w", c.endpoint, lastErr)
}
return ctx.Err()
default:
}
body, err := c.queryWithTimeout(ctx, "ASK WHERE { ?s ?p ?o }", c.cfg.SparqlReadyTimeout)
if err == nil {
// Ensure it's JSON, not HTML/text during boot.
if strings.HasPrefix(strings.TrimSpace(string(body)), "{") {
return nil
}
err = fmt.Errorf("unexpected readiness response: %s", strings.TrimSpace(string(body)))
}
lastErr = err
time.Sleep(c.cfg.SparqlReadyDelay)
}
return fmt.Errorf("anzograph not ready at %s: %w", c.endpoint, lastErr)
}

View File

@@ -0,0 +1,13 @@
package main
type sparqlTerm struct {
Type string `json:"type"`
Value string `json:"value"`
Lang string `json:"xml:lang,omitempty"`
}
type sparqlResponse struct {
Results struct {
Bindings []map[string]sparqlTerm `json:"bindings"`
} `json:"results"`
}

View File

@@ -1,13 +1,26 @@
services: services:
owl_imports_combiner:
build: ./python_services/owl_imports_combiner
environment:
- COMBINE_OWL_IMPORTS_ON_START=${COMBINE_OWL_IMPORTS_ON_START:-false}
- COMBINE_ENTRY_LOCATION
- COMBINE_OUTPUT_LOCATION
- COMBINE_OUTPUT_NAME
- COMBINE_FORCE=${COMBINE_FORCE:-false}
- TTL_PATH=${TTL_PATH:-/data/o3po.ttl}
volumes:
- ./data:/data:Z
backend: backend:
build: ./backend build: ./backend_go
ports: ports:
- "8000:8000" - "8000:8000"
environment: environment:
- GRAPH_BACKEND=${GRAPH_BACKEND:-rdflib} - DEFAULT_NODE_LIMIT=${DEFAULT_NODE_LIMIT:-800000}
- TTL_PATH=${TTL_PATH:-/data/o3po.ttl} - DEFAULT_EDGE_LIMIT=${DEFAULT_EDGE_LIMIT:-2000000}
- MAX_NODE_LIMIT=${MAX_NODE_LIMIT:-10000000}
- MAX_EDGE_LIMIT=${MAX_EDGE_LIMIT:-20000000}
- INCLUDE_BNODES=${INCLUDE_BNODES:-false} - INCLUDE_BNODES=${INCLUDE_BNODES:-false}
- MAX_TRIPLES
- CORS_ORIGINS=${CORS_ORIGINS:-http://localhost:5173} - CORS_ORIGINS=${CORS_ORIGINS:-http://localhost:5173}
- SPARQL_HOST=${SPARQL_HOST:-http://anzograph:8080} - SPARQL_HOST=${SPARQL_HOST:-http://anzograph:8080}
- SPARQL_ENDPOINT - SPARQL_ENDPOINT
@@ -21,17 +34,18 @@ services:
- SPARQL_READY_RETRIES=${SPARQL_READY_RETRIES:-30} - SPARQL_READY_RETRIES=${SPARQL_READY_RETRIES:-30}
- SPARQL_READY_DELAY_S=${SPARQL_READY_DELAY_S:-4} - SPARQL_READY_DELAY_S=${SPARQL_READY_DELAY_S:-4}
- SPARQL_READY_TIMEOUT_S=${SPARQL_READY_TIMEOUT_S:-10} - SPARQL_READY_TIMEOUT_S=${SPARQL_READY_TIMEOUT_S:-10}
- COMBINE_OWL_IMPORTS_ON_START=${COMBINE_OWL_IMPORTS_ON_START:-false} - EDGE_BATCH_SIZE=${EDGE_BATCH_SIZE:-100000}
- COMBINE_ENTRY_LOCATION - FREE_OS_MEMORY_AFTER_SNAPSHOT=${FREE_OS_MEMORY_AFTER_SNAPSHOT:-false}
- COMBINE_OUTPUT_LOCATION - LOG_SNAPSHOT_TIMINGS=${LOG_SNAPSHOT_TIMINGS:-false}
- COMBINE_OUTPUT_NAME depends_on:
- COMBINE_FORCE=${COMBINE_FORCE:-false} owl_imports_combiner:
condition: service_completed_successfully
anzograph:
condition: service_started
volumes: volumes:
- ./backend:/app
- ./data:/data:Z - ./data:/data:Z
command: uvicorn app.main:app --host 0.0.0.0 --port 8000 --reload
healthcheck: healthcheck:
test: ["CMD", "python", "-c", "import urllib.request; urllib.request.urlopen('http://localhost:8000/api/health').read()"] test: ["CMD", "curl", "-fsS", "http://localhost:8000/api/health"]
interval: 5s interval: 5s
timeout: 3s timeout: 3s
retries: 60 retries: 60
@@ -46,15 +60,26 @@ services:
- ./frontend:/app - ./frontend:/app
- /app/node_modules - /app/node_modules
depends_on: depends_on:
- backend backend:
# Docker Compose v1 doesn't support depends_on:condition. Do an explicit wait here. condition: service_healthy
command: sh -c "until wget -qO- http://backend:8000/api/health >/dev/null 2>&1; do echo 'waiting for backend...'; sleep 1; done; npm run dev -- --host --port 5173"
anzograph: anzograph:
image: cambridgesemantics/anzograph:latest image: cambridgesemantics/anzograph:latest
container_name: anzograph container_name: anzograph
mem_limit: 20g
ports: ports:
- "8080:8080" - "8080:8080"
- "8443:8443" - "8443:8443"
volumes: volumes:
- ./data:/opt/shared-files:Z - ./data:/opt/shared-files:Z
# Persist AnzoGraph state across container recreation (EULA acceptance, machine-id, settings, persistence).
- anzograph_app_home:/opt/anzograph/app-home
- anzograph_persistence:/opt/anzograph/persistence
- anzograph_config:/opt/anzograph/config
- anzograph_internal:/opt/anzograph/internal
volumes:
anzograph_app_home:
anzograph_persistence:
anzograph_config:
anzograph_internal:

45
frontend/README.md Normal file
View File

@@ -0,0 +1,45 @@
# Frontend (React + Vite) WebGL Graph Renderer
The frontend renders the snapshot from `/api/graph` using WebGL2:
- Nodes are drawn as points
- Edges are drawn as lines only when sufficiently zoomed in
- Selection + neighbor highlighting is driven by backend “selection queries”
## Run
Via Docker Compose (recommended):
```bash
docker compose up --build frontend
```
Open: `http://localhost:5173`
## Configuration
- `VITE_BACKEND_URL` controls where `/api/*` is proxied (see `frontend/vite.config.ts`).
## UI
- Drag: pan
- Scroll: zoom
- Click: select/deselect nodes
Buttons:
- **Top-right:** selection query mode (controls how the backend expands “neighbors” for the current selection)
- **Bottom-right:** graph query mode (controls which SPARQL edge set the backend uses to build the graph snapshot; switching reloads the graph)
The available modes are discovered from the backend at runtime (`/api/selection_queries` and `/api/graph_queries`).
## Rendering / limits
The renderer uses a quadtree spatial index and draws only a subset when zoomed out:
- Points:
- Per-frame cap: `MAX_DRAW = 2_000_000` (sampling over visible leaves)
- Lines:
- Drawn only when fewer than ~20k nodes are “visible” (leaf AABB overlap with the camera frustum)
Selected and “neighbor” nodes are drawn on top using index buffers.

View File

@@ -1,21 +1,14 @@
import { useEffect, useRef, useState } from "react"; import { useEffect, useRef, useState } from "react";
import { Renderer } from "./renderer"; import { Renderer } from "./renderer";
import { fetchGraphQueries } from "./graph_queries";
import type { GraphQueryMeta } from "./graph_queries";
import { fetchSelectionQueries, runSelectionQuery } from "./selection_queries";
import type { GraphMeta, SelectionQueryMeta } from "./selection_queries";
function sleep(ms: number): Promise<void> { function sleep(ms: number): Promise<void> {
return new Promise((r) => setTimeout(r, ms)); return new Promise((r) => setTimeout(r, ms));
} }
type GraphMeta = {
backend?: string;
ttl_path?: string | null;
sparql_endpoint?: string | null;
include_bnodes?: boolean;
node_limit?: number;
edge_limit?: number;
nodes?: number;
edges?: number;
};
export default function App() { export default function App() {
const canvasRef = useRef<HTMLCanvasElement>(null); const canvasRef = useRef<HTMLCanvasElement>(null);
const rendererRef = useRef<Renderer | null>(null); const rendererRef = useRef<Renderer | null>(null);
@@ -31,14 +24,95 @@ export default function App() {
const [error, setError] = useState(""); const [error, setError] = useState("");
const [hoveredNode, setHoveredNode] = useState<{ x: number; y: number; screenX: number; screenY: number; label?: string; iri?: string } | null>(null); const [hoveredNode, setHoveredNode] = useState<{ x: number; y: number; screenX: number; screenY: number; label?: string; iri?: string } | null>(null);
const [selectedNodes, setSelectedNodes] = useState<Set<number>>(new Set()); const [selectedNodes, setSelectedNodes] = useState<Set<number>>(new Set());
const [graphQueries, setGraphQueries] = useState<GraphQueryMeta[]>([]);
const [activeGraphQueryId, setActiveGraphQueryId] = useState<string>("default");
const [selectionQueries, setSelectionQueries] = useState<SelectionQueryMeta[]>([]);
const [activeSelectionQueryId, setActiveSelectionQueryId] = useState<string>("neighbors");
const [backendStats, setBackendStats] = useState<{ nodes: number; edges: number; backend?: string } | null>(null); const [backendStats, setBackendStats] = useState<{ nodes: number; edges: number; backend?: string } | null>(null);
const graphMetaRef = useRef<GraphMeta | null>(null); const graphMetaRef = useRef<GraphMeta | null>(null);
const neighborsReqIdRef = useRef(0); const selectionReqIdRef = useRef(0);
const graphInitializedRef = useRef(false);
// Store mouse position in a ref so it can be accessed in render loop without re-renders // Store mouse position in a ref so it can be accessed in render loop without re-renders
const mousePos = useRef({ x: 0, y: 0 }); const mousePos = useRef({ x: 0, y: 0 });
const nodesRef = useRef<any[]>([]); const nodesRef = useRef<any[]>([]);
async function loadGraph(graphQueryId: string, signal: AbortSignal): Promise<void> {
const renderer = rendererRef.current;
if (!renderer) return;
setStatus("Fetching graph…");
const graphRes = await fetch(`/api/graph?graph_query_id=${encodeURIComponent(graphQueryId)}`, { signal });
if (!graphRes.ok) {
let detail = "";
try {
const err = await graphRes.json();
if (err && typeof err === "object" && typeof (err as any).detail === "string") {
detail = (err as any).detail;
}
} catch {
// ignore
}
throw new Error(`Failed to fetch graph: ${graphRes.status}${detail ? ` (${detail})` : ""}`);
}
const graph = await graphRes.json();
if (signal.aborted) return;
const nodes = Array.isArray(graph.nodes) ? graph.nodes : [];
const edges = Array.isArray(graph.edges) ? graph.edges : [];
const meta = graph.meta || null;
const count = nodes.length;
nodesRef.current = nodes;
graphMetaRef.current = meta && typeof meta === "object" ? (meta as GraphMeta) : null;
// Build positions from backend-provided node coordinates.
setStatus("Preparing buffers…");
const xs = new Float32Array(count);
const ys = new Float32Array(count);
for (let i = 0; i < count; i++) {
const nx = nodes[i]?.x;
const ny = nodes[i]?.y;
xs[i] = typeof nx === "number" ? nx : 0;
ys[i] = typeof ny === "number" ? ny : 0;
}
const vertexIds = new Uint32Array(count);
for (let i = 0; i < count; i++) {
const id = nodes[i]?.id;
vertexIds[i] = typeof id === "number" ? id >>> 0 : i;
}
// Build edges as vertex-id pairs.
const edgeData = new Uint32Array(edges.length * 2);
for (let i = 0; i < edges.length; i++) {
const s = edges[i]?.source;
const t = edges[i]?.target;
edgeData[i * 2] = typeof s === "number" ? s >>> 0 : 0;
edgeData[i * 2 + 1] = typeof t === "number" ? t >>> 0 : 0;
}
// Use /api/graph meta; don't do a second expensive backend call.
if (meta && typeof meta.nodes === "number" && typeof meta.edges === "number") {
setBackendStats({
nodes: meta.nodes,
edges: meta.edges,
backend: typeof meta.backend === "string" ? meta.backend : undefined,
});
} else {
setBackendStats({ nodes: nodes.length, edges: edges.length });
}
setStatus("Building spatial index…");
await new Promise((r) => setTimeout(r, 0));
if (signal.aborted) return;
const buildMs = renderer.init(xs, ys, vertexIds, edgeData);
setNodeCount(renderer.getNodeCount());
setSelectedNodes(new Set());
setStatus("");
console.log(`Init complete: ${count.toLocaleString()} nodes, ${edges.length.toLocaleString()} edges in ${buildMs.toFixed(0)}ms`);
}
useEffect(() => { useEffect(() => {
const canvas = canvasRef.current; const canvas = canvasRef.current;
if (!canvas) return; if (!canvas) return;
@@ -53,6 +127,8 @@ export default function App() {
} }
let cancelled = false; let cancelled = false;
const initCtrl = new AbortController();
graphInitializedRef.current = false;
(async () => { (async () => {
try { try {
@@ -73,63 +149,36 @@ export default function App() {
if (cancelled) return; if (cancelled) return;
} }
setStatus("Fetching graph…"); let graphQueryToLoad = activeGraphQueryId;
const graphRes = await fetch("/api/graph"); try {
if (!graphRes.ok) throw new Error(`Failed to fetch graph: ${graphRes.status}`); setStatus("Fetching graph modes…");
const graph = await graphRes.json(); const gqs = await fetchGraphQueries(initCtrl.signal);
if (cancelled) return; if (cancelled || initCtrl.signal.aborted) return;
setGraphQueries(gqs);
const nodes = Array.isArray(graph.nodes) ? graph.nodes : []; graphQueryToLoad = gqs.some((q) => q.id === graphQueryToLoad) ? graphQueryToLoad : (gqs[0]?.id ?? "default");
const edges = Array.isArray(graph.edges) ? graph.edges : []; setActiveGraphQueryId(graphQueryToLoad);
const meta = graph.meta || null; } catch {
const count = nodes.length; if (cancelled || initCtrl.signal.aborted) return;
setGraphQueries([{ id: "default", label: "Default" }]);
nodesRef.current = nodes; graphQueryToLoad = "default";
graphMetaRef.current = meta && typeof meta === "object" ? (meta as GraphMeta) : null; setActiveGraphQueryId("default");
// Build positions from backend-provided node coordinates.
setStatus("Preparing buffers…");
const xs = new Float32Array(count);
const ys = new Float32Array(count);
for (let i = 0; i < count; i++) {
const nx = nodes[i]?.x;
const ny = nodes[i]?.y;
xs[i] = typeof nx === "number" ? nx : 0;
ys[i] = typeof ny === "number" ? ny : 0;
}
const vertexIds = new Uint32Array(count);
for (let i = 0; i < count; i++) {
const id = nodes[i]?.id;
vertexIds[i] = typeof id === "number" ? id >>> 0 : i;
} }
// Build edges as vertex-id pairs. await loadGraph(graphQueryToLoad, initCtrl.signal);
const edgeData = new Uint32Array(edges.length * 2); if (cancelled || initCtrl.signal.aborted) return;
for (let i = 0; i < edges.length; i++) {
const s = edges[i]?.source; try {
const t = edges[i]?.target; const qs = await fetchSelectionQueries(initCtrl.signal);
edgeData[i * 2] = typeof s === "number" ? s >>> 0 : 0; if (cancelled) return;
edgeData[i * 2 + 1] = typeof t === "number" ? t >>> 0 : 0; setSelectionQueries(qs);
setActiveSelectionQueryId((prev) => (qs.length > 0 && !qs.some((q) => q.id === prev) ? qs[0].id : prev));
} catch {
if (cancelled) return;
setSelectionQueries([{ id: "neighbors", label: "Neighbors" }]);
setActiveSelectionQueryId((prev) => (prev ? prev : "neighbors"));
} }
// Use /api/graph meta; don't do a second expensive backend call. graphInitializedRef.current = true;
if (meta && typeof meta.nodes === "number" && typeof meta.edges === "number") {
setBackendStats({
nodes: meta.nodes,
edges: meta.edges,
backend: typeof meta.backend === "string" ? meta.backend : undefined,
});
} else {
setBackendStats({ nodes: nodes.length, edges: edges.length });
}
setStatus("Building spatial index…");
await new Promise((r) => setTimeout(r, 0));
const buildMs = renderer.init(xs, ys, vertexIds, edgeData);
setNodeCount(renderer.getNodeCount());
setStatus("");
console.log(`Init complete: ${count.toLocaleString()} nodes, ${edges.length.toLocaleString()} edges in ${buildMs.toFixed(0)}ms`);
} catch (e) { } catch (e) {
if (!cancelled) { if (!cancelled) {
setError(e instanceof Error ? e.message : String(e)); setError(e instanceof Error ? e.message : String(e));
@@ -249,6 +298,7 @@ export default function App() {
return () => { return () => {
cancelled = true; cancelled = true;
initCtrl.abort();
cancelAnimationFrame(raf); cancelAnimationFrame(raf);
canvas.removeEventListener("mousedown", onDown); canvas.removeEventListener("mousedown", onDown);
window.removeEventListener("mousemove", onMove); window.removeEventListener("mousemove", onMove);
@@ -258,62 +308,68 @@ export default function App() {
}; };
}, []); }, []);
// Reload graph when the graph query mode changes (after initial load)
useEffect(() => {
if (!graphInitializedRef.current) return;
const renderer = rendererRef.current;
if (!renderer) return;
if (!activeGraphQueryId) return;
const ctrl = new AbortController();
(async () => {
try {
await loadGraph(activeGraphQueryId, ctrl.signal);
} catch (e) {
if (ctrl.signal.aborted) return;
setError(e instanceof Error ? e.message : String(e));
}
})();
return () => ctrl.abort();
}, [activeGraphQueryId]);
// Sync selection state to renderer // Sync selection state to renderer
useEffect(() => { useEffect(() => {
const renderer = rendererRef.current; const renderer = rendererRef.current;
if (!renderer) return; if (!renderer) return;
// Optimistically reflect selection immediately; neighbors will be filled in by backend. // Optimistically reflect selection immediately; highlights will be filled in by backend.
renderer.updateSelection(selectedNodes, new Set()); renderer.updateSelection(selectedNodes, new Set());
// Invalidate any in-flight neighbor request for the previous selection. // Invalidate any in-flight request for the previous selection/mode.
const reqId = ++neighborsReqIdRef.current; const reqId = ++selectionReqIdRef.current;
// Convert selected sorted indices to backend node IDs (graph-export dense IDs). // Convert selected sorted indices to backend node IDs (graph-export dense IDs).
const selectedIds: number[] = []; const selectedIds: number[] = [];
for (const sortedIdx of selectedNodes) { for (const sortedIdx of selectedNodes) {
const origIdx = renderer.sortedIndexToOriginalIndex(sortedIdx); const origIdx = renderer.sortedIndexToOriginalIndex(sortedIdx);
if (origIdx === null) continue; if (origIdx === null) continue;
const nodeId = nodesRef.current?.[origIdx]?.id; const n = nodesRef.current?.[origIdx];
if (typeof nodeId === "number") selectedIds.push(nodeId); const nodeId = n?.id;
if (typeof nodeId !== "number") continue;
selectedIds.push(nodeId);
} }
if (selectedIds.length === 0) { if (selectedIds.length === 0) {
return; return;
} }
// Always send the full current selection list; backend returns the merged neighbor set. const queryId = (activeSelectionQueryId || selectionQueries[0]?.id || "neighbors").trim();
const ctrl = new AbortController(); const ctrl = new AbortController();
(async () => { (async () => {
try { try {
const meta = graphMetaRef.current; const neighborIds = await runSelectionQuery(queryId, selectedIds, graphMetaRef.current, ctrl.signal);
const body = {
selected_ids: selectedIds,
node_limit: typeof meta?.node_limit === "number" ? meta.node_limit : undefined,
edge_limit: typeof meta?.edge_limit === "number" ? meta.edge_limit : undefined,
};
const res = await fetch("/api/neighbors", {
method: "POST",
headers: { "content-type": "application/json" },
body: JSON.stringify(body),
signal: ctrl.signal,
});
if (!res.ok) throw new Error(`POST /api/neighbors failed: ${res.status}`);
const data = await res.json();
if (ctrl.signal.aborted) return; if (ctrl.signal.aborted) return;
if (reqId !== neighborsReqIdRef.current) return; if (reqId !== selectionReqIdRef.current) return;
const neighborIds: unknown = data?.neighbor_ids;
const neighborSorted = new Set<number>(); const neighborSorted = new Set<number>();
if (Array.isArray(neighborIds)) { for (const id of neighborIds) {
for (const id of neighborIds) { if (typeof id !== "number") continue;
if (typeof id !== "number") continue; const sorted = renderer.vertexIdToSortedIndexOrNull(id);
const sorted = renderer.vertexIdToSortedIndexOrNull(id); if (sorted === null) continue;
if (sorted === null) continue; if (!selectedNodes.has(sorted)) neighborSorted.add(sorted);
if (!selectedNodes.has(sorted)) neighborSorted.add(sorted);
}
} }
renderer.updateSelection(selectedNodes, neighborSorted); renderer.updateSelection(selectedNodes, neighborSorted);
@@ -326,7 +382,7 @@ export default function App() {
})(); })();
return () => ctrl.abort(); return () => ctrl.abort();
}, [selectedNodes]); }, [selectedNodes, activeSelectionQueryId]);
return ( return (
<div style={{ width: "100vw", height: "100vh", overflow: "hidden", background: "#000" }}> <div style={{ width: "100vw", height: "100vh", overflow: "hidden", background: "#000" }}>
@@ -420,6 +476,92 @@ export default function App() {
Drag to pan · Scroll to zoom · Click to select Drag to pan · Scroll to zoom · Click to select
</div> </div>
{/* Selection query buttons */}
{selectionQueries.length > 0 && (
<div
style={{
position: "absolute",
top: 10,
right: 10,
display: "flex",
flexDirection: "column",
gap: "6px",
background: "rgba(0,0,0,0.55)",
padding: "8px",
borderRadius: "6px",
border: "1px solid rgba(255,255,255,0.08)",
pointerEvents: "auto",
}}
>
{selectionQueries.map((q) => {
const active = q.id === activeSelectionQueryId;
return (
<button
key={q.id}
onClick={() => setActiveSelectionQueryId(q.id)}
style={{
cursor: "pointer",
fontFamily: "monospace",
fontSize: "12px",
padding: "6px 10px",
borderRadius: "4px",
border: active ? "1px solid rgba(0,255,255,0.8)" : "1px solid rgba(255,255,255,0.12)",
background: active ? "rgba(0,255,255,0.12)" : "rgba(255,255,255,0.04)",
color: active ? "#0ff" : "#bbb",
textAlign: "left",
}}
aria-pressed={active}
>
{q.label}
</button>
);
})}
</div>
)}
{/* Graph query buttons */}
{graphQueries.length > 0 && (
<div
style={{
position: "absolute",
bottom: 10,
right: 10,
display: "flex",
flexDirection: "column",
gap: "6px",
background: "rgba(0,0,0,0.55)",
padding: "8px",
borderRadius: "6px",
border: "1px solid rgba(255,255,255,0.08)",
pointerEvents: "auto",
}}
>
{graphQueries.map((q) => {
const active = q.id === activeGraphQueryId;
return (
<button
key={q.id}
onClick={() => setActiveGraphQueryId(q.id)}
style={{
cursor: "pointer",
fontFamily: "monospace",
fontSize: "12px",
padding: "6px 10px",
borderRadius: "4px",
border: active ? "1px solid rgba(0,255,0,0.8)" : "1px solid rgba(255,255,255,0.12)",
background: active ? "rgba(0,255,0,0.12)" : "rgba(255,255,255,0.04)",
color: active ? "#8f8" : "#bbb",
textAlign: "left",
}}
aria-pressed={active}
>
{q.label}
</button>
);
})}
</div>
)}
{/* Hover tooltip */} {/* Hover tooltip */}
{hoveredNode && ( {hoveredNode && (
<div <div

View File

@@ -0,0 +1,9 @@
import type { GraphQueryMeta } from "./types";
export async function fetchGraphQueries(signal?: AbortSignal): Promise<GraphQueryMeta[]> {
const res = await fetch("/api/graph_queries", { signal });
if (!res.ok) throw new Error(`GET /api/graph_queries failed: ${res.status}`);
const data = await res.json();
return Array.isArray(data) ? (data as GraphQueryMeta[]) : [];
}

View File

@@ -0,0 +1,3 @@
export { fetchGraphQueries } from "./api";
export type { GraphQueryMeta } from "./types";

View File

@@ -0,0 +1,5 @@
export type GraphQueryMeta = {
id: string;
label: string;
};

View File

@@ -0,0 +1,37 @@
import type { GraphMeta, SelectionQueryMeta } from "./types";
export async function fetchSelectionQueries(signal?: AbortSignal): Promise<SelectionQueryMeta[]> {
const res = await fetch("/api/selection_queries", { signal });
if (!res.ok) throw new Error(`GET /api/selection_queries failed: ${res.status}`);
const data = await res.json();
return Array.isArray(data) ? (data as SelectionQueryMeta[]) : [];
}
export async function runSelectionQuery(
queryId: string,
selectedIds: number[],
graphMeta: GraphMeta | null,
signal: AbortSignal
): Promise<number[]> {
const body = {
query_id: queryId,
selected_ids: selectedIds,
node_limit: typeof graphMeta?.node_limit === "number" ? graphMeta.node_limit : undefined,
edge_limit: typeof graphMeta?.edge_limit === "number" ? graphMeta.edge_limit : undefined,
graph_query_id: typeof graphMeta?.graph_query_id === "string" ? graphMeta.graph_query_id : undefined,
};
const res = await fetch("/api/selection_query", {
method: "POST",
headers: { "content-type": "application/json" },
body: JSON.stringify(body),
signal,
});
if (!res.ok) throw new Error(`POST /api/selection_query failed: ${res.status}`);
const data = await res.json();
const ids: unknown = data?.neighbor_ids;
if (!Array.isArray(ids)) return [];
const out: number[] = [];
for (const id of ids) if (typeof id === "number") out.push(id);
return out;
}

View File

@@ -0,0 +1,3 @@
export { fetchSelectionQueries, runSelectionQuery } from "./api";
export type { GraphMeta, SelectionQueryMeta } from "./types";

View File

@@ -0,0 +1,16 @@
export type GraphMeta = {
backend?: string;
ttl_path?: string | null;
sparql_endpoint?: string | null;
include_bnodes?: boolean;
graph_query_id?: string;
node_limit?: number;
edge_limit?: number;
nodes?: number;
edges?: number;
};
export type SelectionQueryMeta = {
id: string;
label: string;
};

View File

@@ -10,7 +10,23 @@ const __dirname = path.dirname(__filename);
// https://vite.dev/config/ // https://vite.dev/config/
export default defineConfig({ export default defineConfig({
plugins: [react(), tailwindcss(), viteSingleFile()], plugins: [
react(),
tailwindcss(),
viteSingleFile(),
{
name: "long-timeouts",
configureServer(server) {
// Large graph snapshots can take minutes; keep the dev server from killing the request.
const httpServer = server.httpServer;
if (!httpServer) return;
const ms30m = 30 * 60 * 1000;
httpServer.headersTimeout = ms30m;
httpServer.requestTimeout = ms30m;
httpServer.keepAliveTimeout = ms30m;
},
},
],
resolve: { resolve: {
alias: { alias: {
"@": path.resolve(__dirname, "src"), "@": path.resolve(__dirname, "src"),
@@ -19,7 +35,20 @@ export default defineConfig({
server: { server: {
proxy: { proxy: {
// Backend is reachable as http://backend:8000 inside docker-compose; localhost outside. // Backend is reachable as http://backend:8000 inside docker-compose; localhost outside.
"/api": process.env.VITE_BACKEND_URL || "http://localhost:8000", "/api": {
target: process.env.VITE_BACKEND_URL || "http://localhost:8000",
changeOrigin: true,
configure: (proxy) => {
proxy.on("error", (err) => {
// Surface upstream timeouts/socket errors in `docker compose logs frontend`.
console.error("[vite-proxy] /api error:", err);
});
},
// The initial graph snapshot can take minutes with large limits (SPARQL + layout + labels).
// Prevent the dev proxy from timing out and returning a 500 to the browser.
timeout: 30 * 60 * 1000,
proxyTimeout: 30 * 60 * 1000,
},
}, },
}, },
}); });

View File

@@ -8,9 +8,7 @@ ENV PYTHONDONTWRITEBYTECODE=1 \
COPY requirements.txt /app/requirements.txt COPY requirements.txt /app/requirements.txt
RUN pip install --no-cache-dir -r /app/requirements.txt RUN pip install --no-cache-dir -r /app/requirements.txt
COPY app /app/app COPY owl_imports_combiner.py /app/owl_imports_combiner.py
COPY main.py /app/main.py
EXPOSE 8000
CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8000"]
CMD ["python", "/app/main.py"]

View File

@@ -0,0 +1,36 @@
# owl_imports_combiner (Python service)
One-shot utility container that loads an ontology and recursively follows `owl:imports`, then writes a single combined Turtle file.
This is useful to precompute a single TTL for AnzoGraph loading.
## Run
Via Docker Compose:
```bash
docker compose run --rm owl_imports_combiner
```
The service mounts `./data → /data`, so use output paths under `/data/...`.
## Environment variables
- `COMBINE_OWL_IMPORTS_ON_START` (default `false`)
- If `false`, the container exits without doing anything.
- `COMBINE_ENTRY_LOCATION`
- Entry ontology: local path, `file://` URI, or `http(s)` URL.
- If unset, falls back to `TTL_PATH`.
- `COMBINE_OUTPUT_LOCATION`
- Output location (local file path or `file://` URI). Required if entry is an `http(s)` URL.
- `COMBINE_OUTPUT_NAME` (default `combined_ontology.ttl`)
- Used only when `COMBINE_OUTPUT_LOCATION` is unset and entry is a local file.
- `COMBINE_FORCE` (default `false`)
- Overwrite output if it already exists.
- `LOG_LEVEL` (default `INFO`)
## Behavior
- If the output exists and `COMBINE_FORCE=false`, it skips the combine step.
- Output is written atomically via a temporary file + rename.

View File

@@ -0,0 +1,54 @@
from __future__ import annotations
import logging
import os
from owl_imports_combiner import (
build_combined_graph,
output_location_to_path,
resolve_output_location,
serialize_graph_to_ttl,
)
logger = logging.getLogger(__name__)
def _env_bool(name: str, *, default: bool = False) -> bool:
val = os.getenv(name)
if val is None:
return default
return val.strip().lower() in {"1", "true", "yes", "y", "on"}
def main() -> None:
logging.basicConfig(level=os.getenv("LOG_LEVEL", "INFO").upper())
if not _env_bool("COMBINE_OWL_IMPORTS_ON_START", default=False):
logger.info("Skipping combine step (COMBINE_OWL_IMPORTS_ON_START=false)")
return
entry_location = os.getenv("COMBINE_ENTRY_LOCATION") or os.getenv("TTL_PATH")
if not entry_location:
raise SystemExit("Set COMBINE_ENTRY_LOCATION (or TTL_PATH) to the ontology file/URL to load.")
output_name = os.getenv("COMBINE_OUTPUT_NAME", "combined_ontology.ttl")
output_location = resolve_output_location(
entry_location,
output_location=os.getenv("COMBINE_OUTPUT_LOCATION"),
output_name=output_name,
)
output_path = output_location_to_path(output_location)
force = _env_bool("COMBINE_FORCE", default=False)
if output_path.exists() and not force:
logger.info("Skipping combine step (output exists): %s", output_location)
return
graph = build_combined_graph(entry_location)
logger.info("Finished combining imports; serializing to: %s", output_location)
serialize_graph_to_ttl(graph, output_location)
if __name__ == "__main__":
main()

View File

@@ -0,0 +1 @@
rdflib