Files
visualizador_instanciados/backend/app/rdf_store.py
2026-03-02 14:32:42 -03:00

135 lines
3.9 KiB
Python

from __future__ import annotations
from dataclasses import dataclass
from typing import Any
from rdflib import BNode, Graph, Literal, URIRef
from rdflib.namespace import RDFS, SKOS
LABEL_PREDICATES = {RDFS.label, SKOS.prefLabel, SKOS.altLabel}
@dataclass(frozen=True)
class EdgeRow:
source: int
target: int
predicate: str
class RDFStore:
def __init__(self, *, ttl_path: str, include_bnodes: bool, max_triples: int | None):
self.ttl_path = ttl_path
self.include_bnodes = include_bnodes
self.max_triples = max_triples
self.graph: Graph | None = None
self._id_by_term: dict[Any, int] = {}
self._term_by_id: list[Any] = []
self._labels_by_id: dict[int, str] = {}
self._edges: list[EdgeRow] = []
self._parsed_triples = 0
def _term_allowed(self, term: Any) -> bool:
if isinstance(term, Literal):
return False
if isinstance(term, BNode) and not self.include_bnodes:
return False
return isinstance(term, (URIRef, BNode))
def _get_id(self, term: Any) -> int | None:
if not self._term_allowed(term):
return None
existing = self._id_by_term.get(term)
if existing is not None:
return existing
nid = len(self._term_by_id)
self._id_by_term[term] = nid
self._term_by_id.append(term)
return nid
def _term_type(self, term: Any) -> str:
if isinstance(term, BNode):
return "bnode"
return "uri"
def _term_iri(self, term: Any) -> str:
if isinstance(term, BNode):
return f"_:{term}"
return str(term)
def load(self, graph: Graph | None = None) -> None:
g = graph or Graph()
if graph is None:
g.parse(self.ttl_path, format="turtle")
self.graph = g
self._id_by_term.clear()
self._term_by_id.clear()
self._labels_by_id.clear()
self._edges.clear()
parsed = 0
for (s, p, o) in g:
parsed += 1
if self.max_triples is not None and parsed > self.max_triples:
break
# Capture labels but do not emit them as edges.
if p in LABEL_PREDICATES and isinstance(o, Literal):
sid = self._get_id(s)
if sid is not None and sid not in self._labels_by_id:
self._labels_by_id[sid] = str(o)
continue
sid = self._get_id(s)
oid = self._get_id(o)
if sid is None or oid is None:
continue
self._edges.append(EdgeRow(source=sid, target=oid, predicate=str(p)))
self._parsed_triples = parsed
@property
def parsed_triples(self) -> int:
return self._parsed_triples
@property
def node_count(self) -> int:
return len(self._term_by_id)
@property
def edge_count(self) -> int:
return len(self._edges)
def node_slice(self, *, offset: int, limit: int) -> list[dict[str, Any]]:
end = min(self.node_count, offset + limit)
out: list[dict[str, Any]] = []
for nid in range(offset, end):
term = self._term_by_id[nid]
out.append(
{
"id": nid,
"termType": self._term_type(term),
"iri": self._term_iri(term),
"label": self._labels_by_id.get(nid),
}
)
return out
def edge_slice(self, *, offset: int, limit: int) -> list[dict[str, Any]]:
end = min(self.edge_count, offset + limit)
out: list[dict[str, Any]] = []
for row in self._edges[offset:end]:
out.append(
{
"source": row.source,
"target": row.target,
"predicate": row.predicate,
}
)
return out