from __future__ import annotations from typing import Any def edge_retrieval_query(*, edge_limit: int, include_bnodes: bool) -> str: bnode_filter = "" if include_bnodes else "FILTER(!isBlank(?s) && !isBlank(?o))" return f""" SELECT ?s ?p ?o WHERE {{ ?s ?p ?o . FILTER(!isLiteral(?o)) FILTER(?p NOT IN ( , , )) {bnode_filter} }} LIMIT {edge_limit} """ def graph_from_sparql_bindings( bindings: list[dict[str, Any]], *, node_limit: int, include_bnodes: bool, ) -> tuple[list[dict[str, object]], list[dict[str, object]]]: """ Convert SPARQL JSON results bindings into: nodes: [{id, termType, iri, label}] edges: [{source, target, predicate}] IDs are assigned densely (0..N-1) based on first occurrence in bindings. """ node_id_by_key: dict[tuple[str, str], int] = {} node_meta: list[tuple[str, str]] = [] # (termType, iri) out_edges: list[dict[str, object]] = [] def term_to_key_and_iri(term: dict[str, Any]) -> tuple[tuple[str, str], tuple[str, str]] | None: t = term.get("type") v = term.get("value") if not t or v is None: return None if t == "literal": return None if t == "bnode": if not include_bnodes: return None # SPARQL JSON uses bnode identifiers without the "_:" prefix; we normalize to "_:id". return (("bnode", str(v)), ("bnode", f"_:{v}")) # Default to "uri". return (("uri", str(v)), ("uri", str(v))) def get_or_add(term: dict[str, Any]) -> int | None: out = term_to_key_and_iri(term) if out is None: return None key, meta = out existing = node_id_by_key.get(key) if existing is not None: return existing if len(node_meta) >= node_limit: return None nid = len(node_meta) node_id_by_key[key] = nid node_meta.append(meta) return nid for b in bindings: s_term = b.get("s") or {} o_term = b.get("o") or {} p_term = b.get("p") or {} sid = get_or_add(s_term) oid = get_or_add(o_term) if sid is None or oid is None: continue pred = p_term.get("value") if not pred: continue out_edges.append({"source": sid, "target": oid, "predicate": str(pred)}) out_nodes = [ {"id": i, "termType": term_type, "iri": iri, "label": None} for i, (term_type, iri) in enumerate(node_meta) ] return out_nodes, out_edges