Compare commits
2 Commits
3c487d088b
...
5badcd8d6f
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
5badcd8d6f | ||
|
|
a0c5bec19f |
44
.env.example
Normal file
44
.env.example
Normal file
@@ -0,0 +1,44 @@
|
|||||||
|
# TTL path used as a fallback by the owl-imports combiner when COMBINE_ENTRY_LOCATION is not set.
|
||||||
|
TTL_PATH=/data/merged_ontologies_pruned_patched.ttl
|
||||||
|
|
||||||
|
# Backend behavior
|
||||||
|
INCLUDE_BNODES=false
|
||||||
|
|
||||||
|
# Graph snapshot limits (used by Go backend defaults/validation)
|
||||||
|
DEFAULT_NODE_LIMIT=800000
|
||||||
|
DEFAULT_EDGE_LIMIT=2000000
|
||||||
|
MAX_NODE_LIMIT=10000000
|
||||||
|
MAX_EDGE_LIMIT=20000000
|
||||||
|
|
||||||
|
# SPARQL paging: number of triples per batch (LIMIT/OFFSET)
|
||||||
|
EDGE_BATCH_SIZE=100000
|
||||||
|
|
||||||
|
# Owl imports combiner service (docker-compose `owl_imports_combiner`)
|
||||||
|
COMBINE_OWL_IMPORTS_ON_START=true
|
||||||
|
COMBINE_ENTRY_LOCATION=/data/vkg.ttl
|
||||||
|
COMBINE_OUTPUT_LOCATION=/data/vkg_full.ttl
|
||||||
|
# COMBINE_OUTPUT_NAME=combined_ontology.ttl # Only used if COMBINE_OUTPUT_LOCATION is not set.
|
||||||
|
COMBINE_FORCE=true
|
||||||
|
|
||||||
|
# AnzoGraph / SPARQL endpoint settings
|
||||||
|
SPARQL_HOST=http://anzograph:8080
|
||||||
|
# SPARQL_ENDPOINT=http://anzograph:8080/sparql
|
||||||
|
SPARQL_USER=admin
|
||||||
|
SPARQL_PASS=Passw0rd1
|
||||||
|
|
||||||
|
# File URI as seen by the AnzoGraph container (used by SPARQL `LOAD`) # Currently not used.
|
||||||
|
SPARQL_DATA_FILE=file:///opt/shared-files/o3po.ttl # Currently not used.
|
||||||
|
# SPARQL_GRAPH_IRI=http://example.org/graph
|
||||||
|
|
||||||
|
# Startup behavior for AnzoGraph mode
|
||||||
|
SPARQL_LOAD_ON_START=false
|
||||||
|
SPARQL_CLEAR_ON_START=false
|
||||||
|
SPARQL_READY_TIMEOUT_S=10
|
||||||
|
|
||||||
|
# Dev UX
|
||||||
|
CORS_ORIGINS=http://localhost:5173
|
||||||
|
VITE_BACKEND_URL=http://backend:8000
|
||||||
|
|
||||||
|
# Debugging
|
||||||
|
LOG_SNAPSHOT_TIMINGS=false
|
||||||
|
FREE_OS_MEMORY_AFTER_SNAPSHOT=false
|
||||||
@@ -11,6 +11,18 @@ The backend connects to AnzoGraph via:
|
|||||||
- `SPARQL_HOST` (default `http://anzograph:8080`) and the `/sparql` path, or
|
- `SPARQL_HOST` (default `http://anzograph:8080`) and the `/sparql` path, or
|
||||||
- an explicit `SPARQL_ENDPOINT`
|
- an explicit `SPARQL_ENDPOINT`
|
||||||
|
|
||||||
|
## Persistence
|
||||||
|
|
||||||
|
The `docker-compose.yml` config mounts named volumes into the AnzoGraph container so its state survives
|
||||||
|
container recreation (e.g. `docker compose up --force-recreate`):
|
||||||
|
|
||||||
|
- `anzograph_app_home → /opt/anzograph/app-home` (machine-id / user home)
|
||||||
|
- `anzograph_persistence → /opt/anzograph/persistence` (database persistence dir)
|
||||||
|
- `anzograph_config → /opt/anzograph/config` (settings + activation markers)
|
||||||
|
- `anzograph_internal → /opt/anzograph/internal` (internal state, including EULA acceptance marker)
|
||||||
|
|
||||||
|
To fully reset AnzoGraph state, remove volumes with `docker compose down -v`.
|
||||||
|
|
||||||
## Loading data
|
## Loading data
|
||||||
|
|
||||||
The backend can optionally load a TTL file on startup (after AnzoGraph is ready):
|
The backend can optionally load a TTL file on startup (after AnzoGraph is ready):
|
||||||
@@ -24,4 +36,3 @@ Because `./data` is mounted at `/opt/shared-files`, anything placed in `./data`
|
|||||||
|
|
||||||
- Authentication defaults are configured via the backend env (`SPARQL_USER` / `SPARQL_PASS`).
|
- Authentication defaults are configured via the backend env (`SPARQL_USER` / `SPARQL_PASS`).
|
||||||
- The AnzoGraph container in this repo is not customized; consult the upstream image documentation for persistence, licensing, and advanced configuration.
|
- The AnzoGraph container in this repo is not customized; consult the upstream image documentation for persistence, licensing, and advanced configuration.
|
||||||
|
|
||||||
|
|||||||
@@ -1,4 +1,5 @@
|
|||||||
FROM golang:1.22-alpine AS builder
|
ARG GO_VERSION=1.24
|
||||||
|
FROM golang:${GO_VERSION}-alpine AS builder
|
||||||
|
|
||||||
WORKDIR /src
|
WORKDIR /src
|
||||||
|
|
||||||
|
|||||||
@@ -18,6 +18,11 @@ type Config struct {
|
|||||||
MaxNodeLimit int
|
MaxNodeLimit int
|
||||||
MaxEdgeLimit int
|
MaxEdgeLimit int
|
||||||
|
|
||||||
|
EdgeBatchSize int
|
||||||
|
|
||||||
|
FreeOSMemoryAfterSnapshot bool
|
||||||
|
LogSnapshotTimings bool
|
||||||
|
|
||||||
SparqlHost string
|
SparqlHost string
|
||||||
SparqlEndpoint string
|
SparqlEndpoint string
|
||||||
SparqlUser string
|
SparqlUser string
|
||||||
@@ -45,6 +50,9 @@ func LoadConfig() (Config, error) {
|
|||||||
DefaultEdgeLimit: envInt("DEFAULT_EDGE_LIMIT", 2_000_000),
|
DefaultEdgeLimit: envInt("DEFAULT_EDGE_LIMIT", 2_000_000),
|
||||||
MaxNodeLimit: envInt("MAX_NODE_LIMIT", 10_000_000),
|
MaxNodeLimit: envInt("MAX_NODE_LIMIT", 10_000_000),
|
||||||
MaxEdgeLimit: envInt("MAX_EDGE_LIMIT", 20_000_000),
|
MaxEdgeLimit: envInt("MAX_EDGE_LIMIT", 20_000_000),
|
||||||
|
EdgeBatchSize: envInt("EDGE_BATCH_SIZE", 100_000),
|
||||||
|
FreeOSMemoryAfterSnapshot: envBool("FREE_OS_MEMORY_AFTER_SNAPSHOT", false),
|
||||||
|
LogSnapshotTimings: envBool("LOG_SNAPSHOT_TIMINGS", false),
|
||||||
|
|
||||||
SparqlHost: envString("SPARQL_HOST", "http://anzograph:8080"),
|
SparqlHost: envString("SPARQL_HOST", "http://anzograph:8080"),
|
||||||
SparqlEndpoint: envString("SPARQL_ENDPOINT", ""),
|
SparqlEndpoint: envString("SPARQL_ENDPOINT", ""),
|
||||||
@@ -96,6 +104,12 @@ func LoadConfig() (Config, error) {
|
|||||||
if cfg.DefaultEdgeLimit > cfg.MaxEdgeLimit {
|
if cfg.DefaultEdgeLimit > cfg.MaxEdgeLimit {
|
||||||
return Config{}, fmt.Errorf("DEFAULT_EDGE_LIMIT must be <= MAX_EDGE_LIMIT")
|
return Config{}, fmt.Errorf("DEFAULT_EDGE_LIMIT must be <= MAX_EDGE_LIMIT")
|
||||||
}
|
}
|
||||||
|
if cfg.EdgeBatchSize < 1 {
|
||||||
|
return Config{}, fmt.Errorf("EDGE_BATCH_SIZE must be >= 1")
|
||||||
|
}
|
||||||
|
if cfg.EdgeBatchSize > cfg.MaxEdgeLimit {
|
||||||
|
return Config{}, fmt.Errorf("EDGE_BATCH_SIZE must be <= MAX_EDGE_LIMIT")
|
||||||
|
}
|
||||||
|
|
||||||
return cfg, nil
|
return cfg, nil
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -5,89 +5,87 @@ type termKey struct {
|
|||||||
key string
|
key string
|
||||||
}
|
}
|
||||||
|
|
||||||
type termMeta struct {
|
type graphAccumulator struct {
|
||||||
termType string
|
includeBNodes bool
|
||||||
iri string
|
nodeLimit int
|
||||||
|
nodeIDByKey map[termKey]uint32
|
||||||
|
nodes []Node
|
||||||
|
edges []Edge
|
||||||
|
preds *PredicateDict
|
||||||
}
|
}
|
||||||
|
|
||||||
func graphFromSparqlBindings(
|
func newGraphAccumulator(nodeLimit int, includeBNodes bool, edgeCapHint int, preds *PredicateDict) *graphAccumulator {
|
||||||
bindings []map[string]sparqlTerm,
|
if preds == nil {
|
||||||
nodeLimit int,
|
preds = NewPredicateDict(nil)
|
||||||
includeBNodes bool,
|
}
|
||||||
) (nodes []Node, edges []Edge) {
|
return &graphAccumulator{
|
||||||
nodeIDByKey := map[termKey]int{}
|
includeBNodes: includeBNodes,
|
||||||
nodeMeta := make([]termMeta, 0, min(nodeLimit, 4096))
|
nodeLimit: nodeLimit,
|
||||||
|
nodeIDByKey: make(map[termKey]uint32),
|
||||||
|
nodes: make([]Node, 0, min(nodeLimit, 4096)),
|
||||||
|
edges: make([]Edge, 0, min(edgeCapHint, 4096)),
|
||||||
|
preds: preds,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
getOrAdd := func(term sparqlTerm) (int, bool) {
|
func (g *graphAccumulator) getOrAddNode(term sparqlTerm) (uint32, bool) {
|
||||||
if term.Type == "" || term.Value == "" {
|
if term.Type == "" || term.Value == "" {
|
||||||
return 0, false
|
return 0, false
|
||||||
}
|
}
|
||||||
if term.Type == "literal" {
|
if term.Type == "literal" {
|
||||||
return 0, false
|
return 0, false
|
||||||
}
|
|
||||||
|
|
||||||
var key termKey
|
|
||||||
var meta termMeta
|
|
||||||
|
|
||||||
if term.Type == "bnode" {
|
|
||||||
if !includeBNodes {
|
|
||||||
return 0, false
|
|
||||||
}
|
|
||||||
key = termKey{termType: "bnode", key: term.Value}
|
|
||||||
meta = termMeta{termType: "bnode", iri: "_:" + term.Value}
|
|
||||||
} else {
|
|
||||||
key = termKey{termType: "uri", key: term.Value}
|
|
||||||
meta = termMeta{termType: "uri", iri: term.Value}
|
|
||||||
}
|
|
||||||
|
|
||||||
if existing, ok := nodeIDByKey[key]; ok {
|
|
||||||
return existing, true
|
|
||||||
}
|
|
||||||
if len(nodeMeta) >= nodeLimit {
|
|
||||||
return 0, false
|
|
||||||
}
|
|
||||||
nid := len(nodeMeta)
|
|
||||||
nodeIDByKey[key] = nid
|
|
||||||
nodeMeta = append(nodeMeta, meta)
|
|
||||||
return nid, true
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var key termKey
|
||||||
|
var node Node
|
||||||
|
|
||||||
|
if term.Type == "bnode" {
|
||||||
|
if !g.includeBNodes {
|
||||||
|
return 0, false
|
||||||
|
}
|
||||||
|
key = termKey{termType: "bnode", key: term.Value}
|
||||||
|
node = Node{ID: 0, TermType: "bnode", IRI: "_:" + term.Value, Label: nil, X: 0, Y: 0}
|
||||||
|
} else {
|
||||||
|
key = termKey{termType: "uri", key: term.Value}
|
||||||
|
node = Node{ID: 0, TermType: "uri", IRI: term.Value, Label: nil, X: 0, Y: 0}
|
||||||
|
}
|
||||||
|
|
||||||
|
if existing, ok := g.nodeIDByKey[key]; ok {
|
||||||
|
return existing, true
|
||||||
|
}
|
||||||
|
if len(g.nodes) >= g.nodeLimit {
|
||||||
|
return 0, false
|
||||||
|
}
|
||||||
|
nid := uint32(len(g.nodes))
|
||||||
|
g.nodeIDByKey[key] = nid
|
||||||
|
node.ID = nid
|
||||||
|
g.nodes = append(g.nodes, node)
|
||||||
|
return nid, true
|
||||||
|
}
|
||||||
|
|
||||||
|
func (g *graphAccumulator) addBindings(bindings []map[string]sparqlTerm) {
|
||||||
for _, b := range bindings {
|
for _, b := range bindings {
|
||||||
sTerm := b["s"]
|
sTerm := b["s"]
|
||||||
oTerm := b["o"]
|
oTerm := b["o"]
|
||||||
pTerm := b["p"]
|
pTerm := b["p"]
|
||||||
|
|
||||||
sid, okS := getOrAdd(sTerm)
|
sid, okS := g.getOrAddNode(sTerm)
|
||||||
oid, okO := getOrAdd(oTerm)
|
oid, okO := g.getOrAddNode(oTerm)
|
||||||
if !okS || !okO {
|
if !okS || !okO {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
pred := pTerm.Value
|
predID, ok := g.preds.GetOrAdd(pTerm.Value)
|
||||||
if pred == "" {
|
if !ok {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
edges = append(edges, Edge{
|
g.edges = append(g.edges, Edge{
|
||||||
Source: sid,
|
Source: sid,
|
||||||
Target: oid,
|
Target: oid,
|
||||||
Predicate: pred,
|
PredicateID: predID,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
nodes = make([]Node, len(nodeMeta))
|
|
||||||
for i, m := range nodeMeta {
|
|
||||||
nodes[i] = Node{
|
|
||||||
ID: i,
|
|
||||||
TermType: m.termType,
|
|
||||||
IRI: m.iri,
|
|
||||||
Label: nil,
|
|
||||||
X: 0,
|
|
||||||
Y: 0,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return nodes, edges
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func min(a, b int) int {
|
func min(a, b int) int {
|
||||||
|
|||||||
@@ -2,7 +2,7 @@ package graph_queries
|
|||||||
|
|
||||||
import "fmt"
|
import "fmt"
|
||||||
|
|
||||||
func defaultEdgeQuery(edgeLimit int, includeBNodes bool) string {
|
func defaultEdgeQuery(limit int, offset int, includeBNodes bool) string {
|
||||||
bnodeFilter := ""
|
bnodeFilter := ""
|
||||||
if !includeBNodes {
|
if !includeBNodes {
|
||||||
bnodeFilter = "FILTER(!isBlank(?s) && !isBlank(?o))"
|
bnodeFilter = "FILTER(!isBlank(?s) && !isBlank(?o))"
|
||||||
@@ -28,7 +28,38 @@ WHERE {
|
|||||||
FILTER(!isLiteral(?o))
|
FILTER(!isLiteral(?o))
|
||||||
%s
|
%s
|
||||||
}
|
}
|
||||||
|
ORDER BY ?s ?p ?o
|
||||||
LIMIT %d
|
LIMIT %d
|
||||||
`, bnodeFilter, edgeLimit)
|
OFFSET %d
|
||||||
|
`, bnodeFilter, limit, offset)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func defaultPredicateQuery(includeBNodes bool) string {
|
||||||
|
bnodeFilter := ""
|
||||||
|
if !includeBNodes {
|
||||||
|
bnodeFilter = "FILTER(!isBlank(?s) && !isBlank(?o))"
|
||||||
|
}
|
||||||
|
|
||||||
|
return fmt.Sprintf(`
|
||||||
|
PREFIX rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#>
|
||||||
|
PREFIX rdfs: <http://www.w3.org/2000/01/rdf-schema#>
|
||||||
|
PREFIX owl: <http://www.w3.org/2002/07/owl#>
|
||||||
|
|
||||||
|
SELECT DISTINCT ?p
|
||||||
|
WHERE {
|
||||||
|
{
|
||||||
|
VALUES ?p { rdf:type }
|
||||||
|
?s ?p ?o .
|
||||||
|
?o rdf:type owl:Class .
|
||||||
|
}
|
||||||
|
UNION
|
||||||
|
{
|
||||||
|
VALUES ?p { rdfs:subClassOf }
|
||||||
|
?s ?p ?o .
|
||||||
|
}
|
||||||
|
FILTER(!isLiteral(?o))
|
||||||
|
%s
|
||||||
|
}
|
||||||
|
ORDER BY ?p
|
||||||
|
`, bnodeFilter)
|
||||||
|
}
|
||||||
|
|||||||
@@ -2,7 +2,7 @@ package graph_queries
|
|||||||
|
|
||||||
import "fmt"
|
import "fmt"
|
||||||
|
|
||||||
func hierarchyEdgeQuery(edgeLimit int, includeBNodes bool) string {
|
func hierarchyEdgeQuery(limit int, offset int, includeBNodes bool) string {
|
||||||
bnodeFilter := ""
|
bnodeFilter := ""
|
||||||
if !includeBNodes {
|
if !includeBNodes {
|
||||||
bnodeFilter = "FILTER(!isBlank(?s) && !isBlank(?o))"
|
bnodeFilter = "FILTER(!isBlank(?s) && !isBlank(?o))"
|
||||||
@@ -18,7 +18,28 @@ WHERE {
|
|||||||
FILTER(!isLiteral(?o))
|
FILTER(!isLiteral(?o))
|
||||||
%s
|
%s
|
||||||
}
|
}
|
||||||
|
ORDER BY ?s ?p ?o
|
||||||
LIMIT %d
|
LIMIT %d
|
||||||
`, bnodeFilter, edgeLimit)
|
OFFSET %d
|
||||||
|
`, bnodeFilter, limit, offset)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func hierarchyPredicateQuery(includeBNodes bool) string {
|
||||||
|
bnodeFilter := ""
|
||||||
|
if !includeBNodes {
|
||||||
|
bnodeFilter = "FILTER(!isBlank(?s) && !isBlank(?o))"
|
||||||
|
}
|
||||||
|
|
||||||
|
return fmt.Sprintf(`
|
||||||
|
PREFIX rdfs: <http://www.w3.org/2000/01/rdf-schema#>
|
||||||
|
|
||||||
|
SELECT DISTINCT ?p
|
||||||
|
WHERE {
|
||||||
|
VALUES ?p { rdfs:subClassOf }
|
||||||
|
?s ?p ?o .
|
||||||
|
FILTER(!isLiteral(?o))
|
||||||
|
%s
|
||||||
|
}
|
||||||
|
ORDER BY ?p
|
||||||
|
`, bnodeFilter)
|
||||||
|
}
|
||||||
|
|||||||
@@ -3,9 +3,9 @@ package graph_queries
|
|||||||
const DefaultID = "default"
|
const DefaultID = "default"
|
||||||
|
|
||||||
var definitions = []Definition{
|
var definitions = []Definition{
|
||||||
{Meta: Meta{ID: DefaultID, Label: "Default"}, EdgeQuery: defaultEdgeQuery},
|
{Meta: Meta{ID: DefaultID, Label: "Default"}, EdgeQuery: defaultEdgeQuery, PredicateQuery: defaultPredicateQuery},
|
||||||
{Meta: Meta{ID: "hierarchy", Label: "Hierarchy"}, EdgeQuery: hierarchyEdgeQuery},
|
{Meta: Meta{ID: "hierarchy", Label: "Hierarchy"}, EdgeQuery: hierarchyEdgeQuery, PredicateQuery: hierarchyPredicateQuery},
|
||||||
{Meta: Meta{ID: "types", Label: "Types"}, EdgeQuery: typesOnlyEdgeQuery},
|
{Meta: Meta{ID: "types", Label: "Types"}, EdgeQuery: typesOnlyEdgeQuery, PredicateQuery: typesOnlyPredicateQuery},
|
||||||
}
|
}
|
||||||
|
|
||||||
func List() []Meta {
|
func List() []Meta {
|
||||||
@@ -24,4 +24,3 @@ func Get(id string) (Definition, bool) {
|
|||||||
}
|
}
|
||||||
return Definition{}, false
|
return Definition{}, false
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -7,6 +7,6 @@ type Meta struct {
|
|||||||
|
|
||||||
type Definition struct {
|
type Definition struct {
|
||||||
Meta Meta
|
Meta Meta
|
||||||
EdgeQuery func(edgeLimit int, includeBNodes bool) string
|
EdgeQuery func(limit int, offset int, includeBNodes bool) string
|
||||||
|
PredicateQuery func(includeBNodes bool) string
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -2,7 +2,7 @@ package graph_queries
|
|||||||
|
|
||||||
import "fmt"
|
import "fmt"
|
||||||
|
|
||||||
func typesOnlyEdgeQuery(edgeLimit int, includeBNodes bool) string {
|
func typesOnlyEdgeQuery(limit int, offset int, includeBNodes bool) string {
|
||||||
bnodeFilter := ""
|
bnodeFilter := ""
|
||||||
if !includeBNodes {
|
if !includeBNodes {
|
||||||
bnodeFilter = "FILTER(!isBlank(?s) && !isBlank(?o))"
|
bnodeFilter = "FILTER(!isBlank(?s) && !isBlank(?o))"
|
||||||
@@ -20,7 +20,30 @@ WHERE {
|
|||||||
FILTER(!isLiteral(?o))
|
FILTER(!isLiteral(?o))
|
||||||
%s
|
%s
|
||||||
}
|
}
|
||||||
|
ORDER BY ?s ?p ?o
|
||||||
LIMIT %d
|
LIMIT %d
|
||||||
`, bnodeFilter, edgeLimit)
|
OFFSET %d
|
||||||
|
`, bnodeFilter, limit, offset)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func typesOnlyPredicateQuery(includeBNodes bool) string {
|
||||||
|
bnodeFilter := ""
|
||||||
|
if !includeBNodes {
|
||||||
|
bnodeFilter = "FILTER(!isBlank(?s) && !isBlank(?o))"
|
||||||
|
}
|
||||||
|
|
||||||
|
return fmt.Sprintf(`
|
||||||
|
PREFIX rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#>
|
||||||
|
PREFIX owl: <http://www.w3.org/2002/07/owl#>
|
||||||
|
|
||||||
|
SELECT DISTINCT ?p
|
||||||
|
WHERE {
|
||||||
|
VALUES ?p { rdf:type }
|
||||||
|
?s ?p ?o .
|
||||||
|
?o rdf:type owl:Class .
|
||||||
|
FILTER(!isLiteral(?o))
|
||||||
|
%s
|
||||||
|
}
|
||||||
|
ORDER BY ?p
|
||||||
|
`, bnodeFilter)
|
||||||
|
}
|
||||||
|
|||||||
@@ -4,13 +4,19 @@ import (
|
|||||||
"context"
|
"context"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"log"
|
||||||
|
"runtime"
|
||||||
|
"runtime/debug"
|
||||||
"sort"
|
"sort"
|
||||||
"strings"
|
"strings"
|
||||||
|
"time"
|
||||||
|
|
||||||
graphqueries "visualizador_instanciados/backend_go/graph_queries"
|
graphqueries "visualizador_instanciados/backend_go/graph_queries"
|
||||||
)
|
)
|
||||||
|
|
||||||
const rdfsLabelIRI = "http://www.w3.org/2000/01/rdf-schema#label"
|
const (
|
||||||
|
rdfsLabelIRI = "http://www.w3.org/2000/01/rdf-schema#label"
|
||||||
|
)
|
||||||
|
|
||||||
func fetchGraphSnapshot(
|
func fetchGraphSnapshot(
|
||||||
ctx context.Context,
|
ctx context.Context,
|
||||||
@@ -20,27 +26,153 @@ func fetchGraphSnapshot(
|
|||||||
edgeLimit int,
|
edgeLimit int,
|
||||||
graphQueryID string,
|
graphQueryID string,
|
||||||
) (GraphResponse, error) {
|
) (GraphResponse, error) {
|
||||||
|
start := time.Now()
|
||||||
|
logStats := func(stage string) {
|
||||||
|
if !cfg.LogSnapshotTimings {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
var ms runtime.MemStats
|
||||||
|
runtime.ReadMemStats(&ms)
|
||||||
|
log.Printf(
|
||||||
|
"[snapshot] %s graph_query_id=%s node_limit=%d edge_limit=%d elapsed=%s alloc=%dMB heap_inuse=%dMB sys=%dMB numgc=%d",
|
||||||
|
stage,
|
||||||
|
graphQueryID,
|
||||||
|
nodeLimit,
|
||||||
|
edgeLimit,
|
||||||
|
time.Since(start).Truncate(time.Millisecond),
|
||||||
|
ms.Alloc/1024/1024,
|
||||||
|
ms.HeapInuse/1024/1024,
|
||||||
|
ms.Sys/1024/1024,
|
||||||
|
ms.NumGC,
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
def, ok := graphqueries.Get(graphQueryID)
|
def, ok := graphqueries.Get(graphQueryID)
|
||||||
if !ok {
|
if !ok {
|
||||||
return GraphResponse{}, fmt.Errorf("unknown graph_query_id: %s", graphQueryID)
|
return GraphResponse{}, fmt.Errorf("unknown graph_query_id: %s", graphQueryID)
|
||||||
}
|
}
|
||||||
edgesQ := def.EdgeQuery(edgeLimit, cfg.IncludeBNodes)
|
|
||||||
raw, err := sparql.Query(ctx, edgesQ)
|
// Build predicate dictionary (predicate IRI -> uint32 ID) before fetching edges.
|
||||||
|
preds, err := func() (*PredicateDict, error) {
|
||||||
|
logStats("predicates_query_start")
|
||||||
|
predQ := def.PredicateQuery(cfg.IncludeBNodes)
|
||||||
|
t0 := time.Now()
|
||||||
|
rawPred, err := sparql.Query(ctx, predQ)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("predicates query failed: %w", err)
|
||||||
|
}
|
||||||
|
if cfg.LogSnapshotTimings {
|
||||||
|
log.Printf("[snapshot] predicates_query_returned bytes=%d query_time=%s", len(rawPred), time.Since(t0).Truncate(time.Millisecond))
|
||||||
|
}
|
||||||
|
var predRes sparqlResponse
|
||||||
|
t1 := time.Now()
|
||||||
|
if err := json.Unmarshal(rawPred, &predRes); err != nil {
|
||||||
|
return nil, fmt.Errorf("predicates unmarshal failed: %w", err)
|
||||||
|
}
|
||||||
|
if cfg.LogSnapshotTimings {
|
||||||
|
log.Printf("[snapshot] predicates_unmarshal_done bindings=%d unmarshal_time=%s", len(predRes.Results.Bindings), time.Since(t1).Truncate(time.Millisecond))
|
||||||
|
}
|
||||||
|
predicateIRIs := make([]string, 0, len(predRes.Results.Bindings))
|
||||||
|
for _, b := range predRes.Results.Bindings {
|
||||||
|
pTerm, ok := b["p"]
|
||||||
|
if !ok || pTerm.Type != "uri" || pTerm.Value == "" {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
predicateIRIs = append(predicateIRIs, pTerm.Value)
|
||||||
|
}
|
||||||
|
logStats("predicates_dict_built")
|
||||||
|
return NewPredicateDict(predicateIRIs), nil
|
||||||
|
}()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return GraphResponse{}, err
|
return GraphResponse{}, err
|
||||||
}
|
}
|
||||||
|
|
||||||
var res sparqlResponse
|
// Fetch edges in batches to avoid decoding a single huge SPARQL JSON response.
|
||||||
if err := json.Unmarshal(raw, &res); err != nil {
|
logStats("edges_batched_start")
|
||||||
return GraphResponse{}, fmt.Errorf("failed to parse SPARQL JSON: %w", err)
|
batchSize := cfg.EdgeBatchSize
|
||||||
}
|
acc := newGraphAccumulator(nodeLimit, cfg.IncludeBNodes, min(edgeLimit, batchSize), preds)
|
||||||
|
|
||||||
nodes, edges := graphFromSparqlBindings(res.Results.Bindings, nodeLimit, cfg.IncludeBNodes)
|
totalBindings := 0
|
||||||
|
convAllT0 := time.Now()
|
||||||
|
for batch, offset := 0, 0; offset < edgeLimit; batch, offset = batch+1, offset+batchSize {
|
||||||
|
limit := batchSize
|
||||||
|
remaining := edgeLimit - offset
|
||||||
|
if remaining < limit {
|
||||||
|
limit = remaining
|
||||||
|
}
|
||||||
|
|
||||||
|
logStats(fmt.Sprintf("edges_batch_start batch=%d offset=%d limit=%d", batch, offset, limit))
|
||||||
|
bindings, err := func() ([]map[string]sparqlTerm, error) {
|
||||||
|
edgesQ := def.EdgeQuery(limit, offset, cfg.IncludeBNodes)
|
||||||
|
t0 := time.Now()
|
||||||
|
raw, err := sparql.Query(ctx, edgesQ)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("edges query failed: %w", err)
|
||||||
|
}
|
||||||
|
if cfg.LogSnapshotTimings {
|
||||||
|
log.Printf("[snapshot] edges_batch_query_returned batch=%d offset=%d limit=%d bytes=%d query_time=%s", batch, offset, limit, len(raw), time.Since(t0).Truncate(time.Millisecond))
|
||||||
|
}
|
||||||
|
|
||||||
|
var res sparqlResponse
|
||||||
|
t1 := time.Now()
|
||||||
|
if err := json.Unmarshal(raw, &res); err != nil {
|
||||||
|
return nil, fmt.Errorf("edges unmarshal failed: %w", err)
|
||||||
|
}
|
||||||
|
if cfg.LogSnapshotTimings {
|
||||||
|
log.Printf("[snapshot] edges_batch_unmarshal_done batch=%d bindings=%d unmarshal_time=%s", batch, len(res.Results.Bindings), time.Since(t1).Truncate(time.Millisecond))
|
||||||
|
}
|
||||||
|
return res.Results.Bindings, nil
|
||||||
|
}()
|
||||||
|
if err != nil {
|
||||||
|
return GraphResponse{}, fmt.Errorf("edges batch=%d offset=%d limit=%d: %w", batch, offset, limit, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
got := len(bindings)
|
||||||
|
totalBindings += got
|
||||||
|
if got == 0 {
|
||||||
|
bindings = nil
|
||||||
|
logStats(fmt.Sprintf("edges_batch_done_empty batch=%d offset=%d", batch, offset))
|
||||||
|
break
|
||||||
|
}
|
||||||
|
|
||||||
|
convT0 := time.Now()
|
||||||
|
acc.addBindings(bindings)
|
||||||
|
if cfg.LogSnapshotTimings {
|
||||||
|
log.Printf(
|
||||||
|
"[snapshot] edges_batch_convert_done batch=%d got_bindings=%d total_bindings=%d nodes=%d edges=%d convert_time=%s",
|
||||||
|
batch,
|
||||||
|
got,
|
||||||
|
totalBindings,
|
||||||
|
len(acc.nodes),
|
||||||
|
len(acc.edges),
|
||||||
|
time.Since(convT0).Truncate(time.Millisecond),
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Make the batch eligible for GC.
|
||||||
|
bindings = nil
|
||||||
|
logStats(fmt.Sprintf("edges_batch_done batch=%d offset=%d", batch, offset))
|
||||||
|
if cfg.FreeOSMemoryAfterSnapshot {
|
||||||
|
debug.FreeOSMemory()
|
||||||
|
logStats(fmt.Sprintf("edges_batch_free_os_memory_done batch=%d offset=%d", batch, offset))
|
||||||
|
}
|
||||||
|
|
||||||
|
if got < limit {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if cfg.LogSnapshotTimings {
|
||||||
|
log.Printf("[snapshot] convert_batches_done total_bindings=%d total_time=%s", totalBindings, time.Since(convAllT0).Truncate(time.Millisecond))
|
||||||
|
}
|
||||||
|
logStats("edges_batched_done")
|
||||||
|
|
||||||
|
nodes := acc.nodes
|
||||||
|
edges := acc.edges
|
||||||
|
|
||||||
// Layout: invert edges for hierarchy (target -> source).
|
// Layout: invert edges for hierarchy (target -> source).
|
||||||
hierEdges := make([][2]int, 0, len(edges))
|
hierEdges := make([][2]int, 0, len(edges))
|
||||||
for _, e := range edges {
|
for _, e := range edges {
|
||||||
hierEdges = append(hierEdges, [2]int{e.Target, e.Source})
|
hierEdges = append(hierEdges, [2]int{int(e.Target), int(e.Source)})
|
||||||
}
|
}
|
||||||
|
|
||||||
layers, cycleErr := levelSynchronousKahnLayers(len(nodes), hierEdges)
|
layers, cycleErr := levelSynchronousKahnLayers(len(nodes), hierEdges)
|
||||||
@@ -82,7 +214,7 @@ func fetchGraphSnapshot(
|
|||||||
if len(iris) > 0 {
|
if len(iris) > 0 {
|
||||||
labelByIRI, err := fetchRDFSLabels(ctx, sparql, iris, 500)
|
labelByIRI, err := fetchRDFSLabels(ctx, sparql, iris, 500)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return GraphResponse{}, err
|
return GraphResponse{}, fmt.Errorf("fetch rdfs:label failed: %w", err)
|
||||||
}
|
}
|
||||||
for i := range nodes {
|
for i := range nodes {
|
||||||
if nodes[i].TermType != "uri" {
|
if nodes[i].TermType != "uri" {
|
||||||
@@ -103,6 +235,7 @@ func fetchGraphSnapshot(
|
|||||||
SparqlEndpoint: cfg.EffectiveSparqlEndpoint(),
|
SparqlEndpoint: cfg.EffectiveSparqlEndpoint(),
|
||||||
IncludeBNodes: cfg.IncludeBNodes,
|
IncludeBNodes: cfg.IncludeBNodes,
|
||||||
GraphQueryID: graphQueryID,
|
GraphQueryID: graphQueryID,
|
||||||
|
Predicates: preds.IRIs(),
|
||||||
NodeLimit: nodeLimit,
|
NodeLimit: nodeLimit,
|
||||||
EdgeLimit: edgeLimit,
|
EdgeLimit: edgeLimit,
|
||||||
Nodes: len(nodes),
|
Nodes: len(nodes),
|
||||||
|
|||||||
@@ -9,7 +9,7 @@ type HealthResponse struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
type Node struct {
|
type Node struct {
|
||||||
ID int `json:"id"`
|
ID uint32 `json:"id"`
|
||||||
TermType string `json:"termType"` // "uri" | "bnode"
|
TermType string `json:"termType"` // "uri" | "bnode"
|
||||||
IRI string `json:"iri"`
|
IRI string `json:"iri"`
|
||||||
Label *string `json:"label"`
|
Label *string `json:"label"`
|
||||||
@@ -18,9 +18,9 @@ type Node struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
type Edge struct {
|
type Edge struct {
|
||||||
Source int `json:"source"`
|
Source uint32 `json:"source"`
|
||||||
Target int `json:"target"`
|
Target uint32 `json:"target"`
|
||||||
Predicate string `json:"predicate"`
|
PredicateID uint32 `json:"predicate_id"`
|
||||||
}
|
}
|
||||||
|
|
||||||
type GraphMeta struct {
|
type GraphMeta struct {
|
||||||
@@ -29,6 +29,7 @@ type GraphMeta struct {
|
|||||||
SparqlEndpoint string `json:"sparql_endpoint"`
|
SparqlEndpoint string `json:"sparql_endpoint"`
|
||||||
IncludeBNodes bool `json:"include_bnodes"`
|
IncludeBNodes bool `json:"include_bnodes"`
|
||||||
GraphQueryID string `json:"graph_query_id"`
|
GraphQueryID string `json:"graph_query_id"`
|
||||||
|
Predicates []string `json:"predicates,omitempty"` // index = predicate_id
|
||||||
NodeLimit int `json:"node_limit"`
|
NodeLimit int `json:"node_limit"`
|
||||||
EdgeLimit int `json:"edge_limit"`
|
EdgeLimit int `json:"edge_limit"`
|
||||||
Nodes int `json:"nodes"`
|
Nodes int `json:"nodes"`
|
||||||
@@ -55,27 +56,27 @@ type SparqlQueryRequest struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
type NeighborsRequest struct {
|
type NeighborsRequest struct {
|
||||||
SelectedIDs []int `json:"selected_ids"`
|
SelectedIDs []uint32 `json:"selected_ids"`
|
||||||
NodeLimit *int `json:"node_limit,omitempty"`
|
NodeLimit *int `json:"node_limit,omitempty"`
|
||||||
EdgeLimit *int `json:"edge_limit,omitempty"`
|
EdgeLimit *int `json:"edge_limit,omitempty"`
|
||||||
GraphQueryID *string `json:"graph_query_id,omitempty"`
|
GraphQueryID *string `json:"graph_query_id,omitempty"`
|
||||||
}
|
}
|
||||||
|
|
||||||
type NeighborsResponse struct {
|
type NeighborsResponse struct {
|
||||||
SelectedIDs []int `json:"selected_ids"`
|
SelectedIDs []uint32 `json:"selected_ids"`
|
||||||
NeighborIDs []int `json:"neighbor_ids"`
|
NeighborIDs []uint32 `json:"neighbor_ids"`
|
||||||
}
|
}
|
||||||
|
|
||||||
type SelectionQueryRequest struct {
|
type SelectionQueryRequest struct {
|
||||||
QueryID string `json:"query_id"`
|
QueryID string `json:"query_id"`
|
||||||
SelectedIDs []int `json:"selected_ids"`
|
SelectedIDs []uint32 `json:"selected_ids"`
|
||||||
NodeLimit *int `json:"node_limit,omitempty"`
|
NodeLimit *int `json:"node_limit,omitempty"`
|
||||||
EdgeLimit *int `json:"edge_limit,omitempty"`
|
EdgeLimit *int `json:"edge_limit,omitempty"`
|
||||||
GraphQueryID *string `json:"graph_query_id,omitempty"`
|
GraphQueryID *string `json:"graph_query_id,omitempty"`
|
||||||
}
|
}
|
||||||
|
|
||||||
type SelectionQueryResponse struct {
|
type SelectionQueryResponse struct {
|
||||||
QueryID string `json:"query_id"`
|
QueryID string `json:"query_id"`
|
||||||
SelectedIDs []int `json:"selected_ids"`
|
SelectedIDs []uint32 `json:"selected_ids"`
|
||||||
NeighborIDs []int `json:"neighbor_ids"`
|
NeighborIDs []uint32 `json:"neighbor_ids"`
|
||||||
}
|
}
|
||||||
|
|||||||
40
backend_go/predicate_dict.go
Normal file
40
backend_go/predicate_dict.go
Normal file
@@ -0,0 +1,40 @@
|
|||||||
|
package main
|
||||||
|
|
||||||
|
type PredicateDict struct {
|
||||||
|
idByIRI map[string]uint32
|
||||||
|
iriByID []string
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewPredicateDict(predicates []string) *PredicateDict {
|
||||||
|
idByIRI := make(map[string]uint32, len(predicates))
|
||||||
|
iriByID := make([]string, 0, len(predicates))
|
||||||
|
for _, iri := range predicates {
|
||||||
|
if iri == "" {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if _, ok := idByIRI[iri]; ok {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
id := uint32(len(iriByID))
|
||||||
|
idByIRI[iri] = id
|
||||||
|
iriByID = append(iriByID, iri)
|
||||||
|
}
|
||||||
|
return &PredicateDict{idByIRI: idByIRI, iriByID: iriByID}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d *PredicateDict) GetOrAdd(iri string) (uint32, bool) {
|
||||||
|
if iri == "" {
|
||||||
|
return 0, false
|
||||||
|
}
|
||||||
|
if id, ok := d.idByIRI[iri]; ok {
|
||||||
|
return id, true
|
||||||
|
}
|
||||||
|
id := uint32(len(d.iriByID))
|
||||||
|
d.idByIRI[iri] = id
|
||||||
|
d.iriByID = append(d.iriByID, iri)
|
||||||
|
return id, true
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d *PredicateDict) IRIs() []string {
|
||||||
|
return d.iriByID
|
||||||
|
}
|
||||||
@@ -49,9 +49,9 @@ func termKeyFromSparqlTerm(term sparqlTerm, includeBNodes bool) (string, bool) {
|
|||||||
return "", false
|
return "", false
|
||||||
}
|
}
|
||||||
|
|
||||||
func selectedNodesFromIDs(idx Index, selectedIDs []int, includeBNodes bool) ([]NodeRef, map[int]struct{}) {
|
func selectedNodesFromIDs(idx Index, selectedIDs []uint32, includeBNodes bool) ([]NodeRef, map[uint32]struct{}) {
|
||||||
out := make([]NodeRef, 0, len(selectedIDs))
|
out := make([]NodeRef, 0, len(selectedIDs))
|
||||||
set := make(map[int]struct{}, len(selectedIDs))
|
set := make(map[uint32]struct{}, len(selectedIDs))
|
||||||
for _, nid := range selectedIDs {
|
for _, nid := range selectedIDs {
|
||||||
n, ok := idx.IDToNode[nid]
|
n, ok := idx.IDToNode[nid]
|
||||||
if !ok {
|
if !ok {
|
||||||
@@ -66,13 +66,13 @@ func selectedNodesFromIDs(idx Index, selectedIDs []int, includeBNodes bool) ([]N
|
|||||||
return out, set
|
return out, set
|
||||||
}
|
}
|
||||||
|
|
||||||
func idsFromBindings(raw []byte, varName string, idx Index, selectedSet map[int]struct{}, includeBNodes bool) ([]int, error) {
|
func idsFromBindings(raw []byte, varName string, idx Index, selectedSet map[uint32]struct{}, includeBNodes bool) ([]uint32, error) {
|
||||||
var res sparqlResponse
|
var res sparqlResponse
|
||||||
if err := json.Unmarshal(raw, &res); err != nil {
|
if err := json.Unmarshal(raw, &res); err != nil {
|
||||||
return nil, fmt.Errorf("failed to parse SPARQL JSON: %w", err)
|
return nil, fmt.Errorf("failed to parse SPARQL JSON: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
neighborSet := make(map[int]struct{})
|
neighborSet := make(map[uint32]struct{})
|
||||||
for _, b := range res.Results.Bindings {
|
for _, b := range res.Results.Bindings {
|
||||||
term, ok := b[varName]
|
term, ok := b[varName]
|
||||||
if !ok {
|
if !ok {
|
||||||
@@ -92,11 +92,10 @@ func idsFromBindings(raw []byte, varName string, idx Index, selectedSet map[int]
|
|||||||
neighborSet[nid] = struct{}{}
|
neighborSet[nid] = struct{}{}
|
||||||
}
|
}
|
||||||
|
|
||||||
ids := make([]int, 0, len(neighborSet))
|
ids := make([]uint32, 0, len(neighborSet))
|
||||||
for nid := range neighborSet {
|
for nid := range neighborSet {
|
||||||
ids = append(ids, nid)
|
ids = append(ids, nid)
|
||||||
}
|
}
|
||||||
sort.Ints(ids)
|
sort.Slice(ids, func(i, j int) bool { return ids[i] < ids[j] })
|
||||||
return ids, nil
|
return ids, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -62,10 +62,10 @@ WHERE {
|
|||||||
`, values, bnodeFilter)
|
`, values, bnodeFilter)
|
||||||
}
|
}
|
||||||
|
|
||||||
func runNeighbors(ctx context.Context, q Querier, idx Index, selectedIDs []int, includeBNodes bool) ([]int, error) {
|
func runNeighbors(ctx context.Context, q Querier, idx Index, selectedIDs []uint32, includeBNodes bool) ([]uint32, error) {
|
||||||
selectedNodes, selectedSet := selectedNodesFromIDs(idx, selectedIDs, includeBNodes)
|
selectedNodes, selectedSet := selectedNodesFromIDs(idx, selectedIDs, includeBNodes)
|
||||||
if len(selectedNodes) == 0 {
|
if len(selectedNodes) == 0 {
|
||||||
return []int{}, nil
|
return []uint32{}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
raw, err := q.Query(ctx, neighborsQuery(selectedNodes, includeBNodes))
|
raw, err := q.Query(ctx, neighborsQuery(selectedNodes, includeBNodes))
|
||||||
@@ -74,4 +74,3 @@ func runNeighbors(ctx context.Context, q Querier, idx Index, selectedIDs []int,
|
|||||||
}
|
}
|
||||||
return idsFromBindings(raw, "nbr", idx, selectedSet, includeBNodes)
|
return idsFromBindings(raw, "nbr", idx, selectedSet, includeBNodes)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -40,10 +40,10 @@ WHERE {
|
|||||||
`, values, bnodeFilter)
|
`, values, bnodeFilter)
|
||||||
}
|
}
|
||||||
|
|
||||||
func runSubclasses(ctx context.Context, q Querier, idx Index, selectedIDs []int, includeBNodes bool) ([]int, error) {
|
func runSubclasses(ctx context.Context, q Querier, idx Index, selectedIDs []uint32, includeBNodes bool) ([]uint32, error) {
|
||||||
selectedNodes, selectedSet := selectedNodesFromIDs(idx, selectedIDs, includeBNodes)
|
selectedNodes, selectedSet := selectedNodesFromIDs(idx, selectedIDs, includeBNodes)
|
||||||
if len(selectedNodes) == 0 {
|
if len(selectedNodes) == 0 {
|
||||||
return []int{}, nil
|
return []uint32{}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
raw, err := q.Query(ctx, subclassesQuery(selectedNodes, includeBNodes))
|
raw, err := q.Query(ctx, subclassesQuery(selectedNodes, includeBNodes))
|
||||||
@@ -52,4 +52,3 @@ func runSubclasses(ctx context.Context, q Querier, idx Index, selectedIDs []int,
|
|||||||
}
|
}
|
||||||
return idsFromBindings(raw, "nbr", idx, selectedSet, includeBNodes)
|
return idsFromBindings(raw, "nbr", idx, selectedSet, includeBNodes)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -40,10 +40,10 @@ WHERE {
|
|||||||
`, values, bnodeFilter)
|
`, values, bnodeFilter)
|
||||||
}
|
}
|
||||||
|
|
||||||
func runSuperclasses(ctx context.Context, q Querier, idx Index, selectedIDs []int, includeBNodes bool) ([]int, error) {
|
func runSuperclasses(ctx context.Context, q Querier, idx Index, selectedIDs []uint32, includeBNodes bool) ([]uint32, error) {
|
||||||
selectedNodes, selectedSet := selectedNodesFromIDs(idx, selectedIDs, includeBNodes)
|
selectedNodes, selectedSet := selectedNodesFromIDs(idx, selectedIDs, includeBNodes)
|
||||||
if len(selectedNodes) == 0 {
|
if len(selectedNodes) == 0 {
|
||||||
return []int{}, nil
|
return []uint32{}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
raw, err := q.Query(ctx, superclassesQuery(selectedNodes, includeBNodes))
|
raw, err := q.Query(ctx, superclassesQuery(selectedNodes, includeBNodes))
|
||||||
@@ -52,4 +52,3 @@ func runSuperclasses(ctx context.Context, q Querier, idx Index, selectedIDs []in
|
|||||||
}
|
}
|
||||||
return idsFromBindings(raw, "nbr", idx, selectedSet, includeBNodes)
|
return idsFromBindings(raw, "nbr", idx, selectedSet, includeBNodes)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -7,14 +7,14 @@ type Querier interface {
|
|||||||
}
|
}
|
||||||
|
|
||||||
type NodeRef struct {
|
type NodeRef struct {
|
||||||
ID int
|
ID uint32
|
||||||
TermType string // "uri" | "bnode"
|
TermType string // "uri" | "bnode"
|
||||||
IRI string
|
IRI string
|
||||||
}
|
}
|
||||||
|
|
||||||
type Index struct {
|
type Index struct {
|
||||||
IDToNode map[int]NodeRef
|
IDToNode map[uint32]NodeRef
|
||||||
KeyToID map[string]int
|
KeyToID map[string]uint32
|
||||||
}
|
}
|
||||||
|
|
||||||
type Meta struct {
|
type Meta struct {
|
||||||
@@ -24,6 +24,5 @@ type Meta struct {
|
|||||||
|
|
||||||
type Definition struct {
|
type Definition struct {
|
||||||
Meta Meta
|
Meta Meta
|
||||||
Run func(ctx context.Context, q Querier, idx Index, selectedIDs []int, includeBNodes bool) ([]int, error)
|
Run func(ctx context.Context, q Querier, idx Index, selectedIDs []uint32, includeBNodes bool) ([]uint32, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -12,16 +12,16 @@ func runSelectionQuery(
|
|||||||
sparql *AnzoGraphClient,
|
sparql *AnzoGraphClient,
|
||||||
snapshot GraphResponse,
|
snapshot GraphResponse,
|
||||||
queryID string,
|
queryID string,
|
||||||
selectedIDs []int,
|
selectedIDs []uint32,
|
||||||
includeBNodes bool,
|
includeBNodes bool,
|
||||||
) ([]int, error) {
|
) ([]uint32, error) {
|
||||||
def, ok := selectionqueries.Get(queryID)
|
def, ok := selectionqueries.Get(queryID)
|
||||||
if !ok {
|
if !ok {
|
||||||
return nil, fmt.Errorf("unknown query_id: %s", queryID)
|
return nil, fmt.Errorf("unknown query_id: %s", queryID)
|
||||||
}
|
}
|
||||||
|
|
||||||
idToNode := make(map[int]selectionqueries.NodeRef, len(snapshot.Nodes))
|
idToNode := make(map[uint32]selectionqueries.NodeRef, len(snapshot.Nodes))
|
||||||
keyToID := make(map[string]int, len(snapshot.Nodes))
|
keyToID := make(map[string]uint32, len(snapshot.Nodes))
|
||||||
for _, n := range snapshot.Nodes {
|
for _, n := range snapshot.Nodes {
|
||||||
nr := selectionqueries.NodeRef{ID: n.ID, TermType: n.TermType, IRI: n.IRI}
|
nr := selectionqueries.NodeRef{ID: n.ID, TermType: n.TermType, IRI: n.IRI}
|
||||||
idToNode[n.ID] = nr
|
idToNode[n.ID] = nr
|
||||||
@@ -30,4 +30,3 @@ func runSelectionQuery(
|
|||||||
|
|
||||||
return def.Run(ctx, sparql, selectionqueries.Index{IDToNode: idToNode, KeyToID: keyToID}, selectedIDs, includeBNodes)
|
return def.Run(ctx, sparql, selectionqueries.Index{IDToNode: idToNode, KeyToID: keyToID}, selectedIDs, includeBNodes)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -2,6 +2,7 @@ package main
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"log"
|
||||||
"net/http"
|
"net/http"
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
@@ -75,11 +76,12 @@ func (s *APIServer) handleStats(w http.ResponseWriter, r *http.Request) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
ctx := r.Context()
|
ctx := r.Context()
|
||||||
snap, err := s.snapshots.Get(ctx, s.cfg.DefaultNodeLimit, s.cfg.DefaultEdgeLimit, graphqueries.DefaultID)
|
snap, err := s.snapshots.Get(ctx, s.cfg.DefaultNodeLimit, s.cfg.DefaultEdgeLimit, graphqueries.DefaultID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
writeError(w, http.StatusInternalServerError, err.Error())
|
log.Printf("handleStats: snapshot error: %v", err)
|
||||||
return
|
writeError(w, http.StatusInternalServerError, err.Error())
|
||||||
}
|
return
|
||||||
|
}
|
||||||
|
|
||||||
endpoint := snap.Meta.SparqlEndpoint
|
endpoint := snap.Meta.SparqlEndpoint
|
||||||
writeJSON(w, http.StatusOK, StatsResponse{
|
writeJSON(w, http.StatusOK, StatsResponse{
|
||||||
@@ -141,11 +143,12 @@ func (s *APIServer) handleGraph(w http.ResponseWriter, r *http.Request) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
snap, err := s.snapshots.Get(r.Context(), nodeLimit, edgeLimit, graphQueryID)
|
snap, err := s.snapshots.Get(r.Context(), nodeLimit, edgeLimit, graphQueryID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if _, ok := err.(*CycleError); ok {
|
log.Printf("handleGraph: snapshot error graph_query_id=%s node_limit=%d edge_limit=%d err=%v", graphQueryID, nodeLimit, edgeLimit, err)
|
||||||
writeError(w, http.StatusUnprocessableEntity, err.Error())
|
if _, ok := err.(*CycleError); ok {
|
||||||
return
|
writeError(w, http.StatusUnprocessableEntity, err.Error())
|
||||||
|
return
|
||||||
}
|
}
|
||||||
writeError(w, http.StatusInternalServerError, err.Error())
|
writeError(w, http.StatusInternalServerError, err.Error())
|
||||||
return
|
return
|
||||||
@@ -189,7 +192,7 @@ func (s *APIServer) handleSelectionQuery(w http.ResponseWriter, r *http.Request)
|
|||||||
writeJSON(w, http.StatusOK, SelectionQueryResponse{
|
writeJSON(w, http.StatusOK, SelectionQueryResponse{
|
||||||
QueryID: req.QueryID,
|
QueryID: req.QueryID,
|
||||||
SelectedIDs: req.SelectedIDs,
|
SelectedIDs: req.SelectedIDs,
|
||||||
NeighborIDs: []int{},
|
NeighborIDs: []uint32{},
|
||||||
})
|
})
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@@ -247,7 +250,7 @@ func (s *APIServer) handleNeighbors(w http.ResponseWriter, r *http.Request) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
if len(req.SelectedIDs) == 0 {
|
if len(req.SelectedIDs) == 0 {
|
||||||
writeJSON(w, http.StatusOK, NeighborsResponse{SelectedIDs: req.SelectedIDs, NeighborIDs: []int{}})
|
writeJSON(w, http.StatusOK, NeighborsResponse{SelectedIDs: req.SelectedIDs, NeighborIDs: []uint32{}})
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -34,9 +34,16 @@ services:
|
|||||||
- SPARQL_READY_RETRIES=${SPARQL_READY_RETRIES:-30}
|
- SPARQL_READY_RETRIES=${SPARQL_READY_RETRIES:-30}
|
||||||
- SPARQL_READY_DELAY_S=${SPARQL_READY_DELAY_S:-4}
|
- SPARQL_READY_DELAY_S=${SPARQL_READY_DELAY_S:-4}
|
||||||
- SPARQL_READY_TIMEOUT_S=${SPARQL_READY_TIMEOUT_S:-10}
|
- SPARQL_READY_TIMEOUT_S=${SPARQL_READY_TIMEOUT_S:-10}
|
||||||
|
- EDGE_BATCH_SIZE=${EDGE_BATCH_SIZE:-100000}
|
||||||
|
- FREE_OS_MEMORY_AFTER_SNAPSHOT=${FREE_OS_MEMORY_AFTER_SNAPSHOT:-false}
|
||||||
|
- LOG_SNAPSHOT_TIMINGS=${LOG_SNAPSHOT_TIMINGS:-false}
|
||||||
depends_on:
|
depends_on:
|
||||||
- owl_imports_combiner
|
owl_imports_combiner:
|
||||||
- anzograph
|
condition: service_completed_successfully
|
||||||
|
anzograph:
|
||||||
|
condition: service_started
|
||||||
|
volumes:
|
||||||
|
- ./data:/data:Z
|
||||||
healthcheck:
|
healthcheck:
|
||||||
test: ["CMD", "curl", "-fsS", "http://localhost:8000/api/health"]
|
test: ["CMD", "curl", "-fsS", "http://localhost:8000/api/health"]
|
||||||
interval: 5s
|
interval: 5s
|
||||||
@@ -53,15 +60,26 @@ services:
|
|||||||
- ./frontend:/app
|
- ./frontend:/app
|
||||||
- /app/node_modules
|
- /app/node_modules
|
||||||
depends_on:
|
depends_on:
|
||||||
- backend
|
backend:
|
||||||
# Docker Compose v1 doesn't support depends_on:condition. Do an explicit wait here.
|
condition: service_healthy
|
||||||
command: sh -c "until wget -qO- http://backend:8000/api/health >/dev/null 2>&1; do echo 'waiting for backend...'; sleep 1; done; npm run dev -- --host --port 5173"
|
|
||||||
|
|
||||||
anzograph:
|
anzograph:
|
||||||
image: cambridgesemantics/anzograph:latest
|
image: cambridgesemantics/anzograph:latest
|
||||||
container_name: anzograph
|
container_name: anzograph
|
||||||
|
mem_limit: 20g
|
||||||
ports:
|
ports:
|
||||||
- "8080:8080"
|
- "8080:8080"
|
||||||
- "8443:8443"
|
- "8443:8443"
|
||||||
volumes:
|
volumes:
|
||||||
- ./data:/opt/shared-files:Z
|
- ./data:/opt/shared-files:Z
|
||||||
|
# Persist AnzoGraph state across container recreation (EULA acceptance, machine-id, settings, persistence).
|
||||||
|
- anzograph_app_home:/opt/anzograph/app-home
|
||||||
|
- anzograph_persistence:/opt/anzograph/persistence
|
||||||
|
- anzograph_config:/opt/anzograph/config
|
||||||
|
- anzograph_internal:/opt/anzograph/internal
|
||||||
|
|
||||||
|
volumes:
|
||||||
|
anzograph_app_home:
|
||||||
|
anzograph_persistence:
|
||||||
|
anzograph_config:
|
||||||
|
anzograph_internal:
|
||||||
|
|||||||
@@ -43,7 +43,18 @@ export default function App() {
|
|||||||
|
|
||||||
setStatus("Fetching graph…");
|
setStatus("Fetching graph…");
|
||||||
const graphRes = await fetch(`/api/graph?graph_query_id=${encodeURIComponent(graphQueryId)}`, { signal });
|
const graphRes = await fetch(`/api/graph?graph_query_id=${encodeURIComponent(graphQueryId)}`, { signal });
|
||||||
if (!graphRes.ok) throw new Error(`Failed to fetch graph: ${graphRes.status}`);
|
if (!graphRes.ok) {
|
||||||
|
let detail = "";
|
||||||
|
try {
|
||||||
|
const err = await graphRes.json();
|
||||||
|
if (err && typeof err === "object" && typeof (err as any).detail === "string") {
|
||||||
|
detail = (err as any).detail;
|
||||||
|
}
|
||||||
|
} catch {
|
||||||
|
// ignore
|
||||||
|
}
|
||||||
|
throw new Error(`Failed to fetch graph: ${graphRes.status}${detail ? ` (${detail})` : ""}`);
|
||||||
|
}
|
||||||
const graph = await graphRes.json();
|
const graph = await graphRes.json();
|
||||||
if (signal.aborted) return;
|
if (signal.aborted) return;
|
||||||
|
|
||||||
|
|||||||
@@ -10,7 +10,23 @@ const __dirname = path.dirname(__filename);
|
|||||||
|
|
||||||
// https://vite.dev/config/
|
// https://vite.dev/config/
|
||||||
export default defineConfig({
|
export default defineConfig({
|
||||||
plugins: [react(), tailwindcss(), viteSingleFile()],
|
plugins: [
|
||||||
|
react(),
|
||||||
|
tailwindcss(),
|
||||||
|
viteSingleFile(),
|
||||||
|
{
|
||||||
|
name: "long-timeouts",
|
||||||
|
configureServer(server) {
|
||||||
|
// Large graph snapshots can take minutes; keep the dev server from killing the request.
|
||||||
|
const httpServer = server.httpServer;
|
||||||
|
if (!httpServer) return;
|
||||||
|
const ms30m = 30 * 60 * 1000;
|
||||||
|
httpServer.headersTimeout = ms30m;
|
||||||
|
httpServer.requestTimeout = ms30m;
|
||||||
|
httpServer.keepAliveTimeout = ms30m;
|
||||||
|
},
|
||||||
|
},
|
||||||
|
],
|
||||||
resolve: {
|
resolve: {
|
||||||
alias: {
|
alias: {
|
||||||
"@": path.resolve(__dirname, "src"),
|
"@": path.resolve(__dirname, "src"),
|
||||||
@@ -19,7 +35,20 @@ export default defineConfig({
|
|||||||
server: {
|
server: {
|
||||||
proxy: {
|
proxy: {
|
||||||
// Backend is reachable as http://backend:8000 inside docker-compose; localhost outside.
|
// Backend is reachable as http://backend:8000 inside docker-compose; localhost outside.
|
||||||
"/api": process.env.VITE_BACKEND_URL || "http://localhost:8000",
|
"/api": {
|
||||||
|
target: process.env.VITE_BACKEND_URL || "http://localhost:8000",
|
||||||
|
changeOrigin: true,
|
||||||
|
configure: (proxy) => {
|
||||||
|
proxy.on("error", (err) => {
|
||||||
|
// Surface upstream timeouts/socket errors in `docker compose logs frontend`.
|
||||||
|
console.error("[vite-proxy] /api error:", err);
|
||||||
|
});
|
||||||
|
},
|
||||||
|
// The initial graph snapshot can take minutes with large limits (SPARQL + layout + labels).
|
||||||
|
// Prevent the dev proxy from timing out and returning a 500 to the browser.
|
||||||
|
timeout: 30 * 60 * 1000,
|
||||||
|
proxyTimeout: 30 * 60 * 1000,
|
||||||
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
});
|
});
|
||||||
|
|||||||
@@ -38,8 +38,8 @@ def main() -> None:
|
|||||||
output_location=os.getenv("COMBINE_OUTPUT_LOCATION"),
|
output_location=os.getenv("COMBINE_OUTPUT_LOCATION"),
|
||||||
output_name=output_name,
|
output_name=output_name,
|
||||||
)
|
)
|
||||||
|
|
||||||
output_path = output_location_to_path(output_location)
|
output_path = output_location_to_path(output_location)
|
||||||
|
|
||||||
force = _env_bool("COMBINE_FORCE", default=False)
|
force = _env_bool("COMBINE_FORCE", default=False)
|
||||||
if output_path.exists() and not force:
|
if output_path.exists() and not force:
|
||||||
logger.info("Skipping combine step (output exists): %s", output_location)
|
logger.info("Skipping combine step (output exists): %s", output_location)
|
||||||
|
|||||||
Reference in New Issue
Block a user