From 5badcd8d6f409740add5560e56a65d56bac77e92 Mon Sep 17 00:00:00 2001 From: Oxy8 <34687508+Oxy8@users.noreply.github.com> Date: Tue, 10 Mar 2026 17:21:47 -0300 Subject: [PATCH] Visualizando todo grafo com anzograph --- .env.example | 44 ++++++ anzograph/README.md | 13 +- backend_go/config.go | 14 ++ backend_go/graph_export.go | 126 ++++++++-------- backend_go/graph_queries/default.go | 35 ++++- backend_go/graph_queries/hierarchy.go | 25 ++- backend_go/graph_queries/registry.go | 7 +- backend_go/graph_queries/types.go | 4 +- backend_go/graph_queries/types_only.go | 27 +++- backend_go/graph_snapshot.go | 151 +++++++++++++++++-- backend_go/models.go | 3 +- backend_go/predicate_dict.go | 40 +++++ backend_go/server.go | 23 +-- docker-compose.yml | 28 +++- frontend/src/App.tsx | 13 +- frontend/vite.config.ts | 33 +++- python_services/owl_imports_combiner/main.py | 2 +- 17 files changed, 482 insertions(+), 106 deletions(-) create mode 100644 .env.example create mode 100644 backend_go/predicate_dict.go diff --git a/.env.example b/.env.example new file mode 100644 index 0000000..e6a45f9 --- /dev/null +++ b/.env.example @@ -0,0 +1,44 @@ +# TTL path used as a fallback by the owl-imports combiner when COMBINE_ENTRY_LOCATION is not set. +TTL_PATH=/data/merged_ontologies_pruned_patched.ttl + +# Backend behavior +INCLUDE_BNODES=false + +# Graph snapshot limits (used by Go backend defaults/validation) +DEFAULT_NODE_LIMIT=800000 +DEFAULT_EDGE_LIMIT=2000000 +MAX_NODE_LIMIT=10000000 +MAX_EDGE_LIMIT=20000000 + +# SPARQL paging: number of triples per batch (LIMIT/OFFSET) +EDGE_BATCH_SIZE=100000 + +# Owl imports combiner service (docker-compose `owl_imports_combiner`) +COMBINE_OWL_IMPORTS_ON_START=true +COMBINE_ENTRY_LOCATION=/data/vkg.ttl +COMBINE_OUTPUT_LOCATION=/data/vkg_full.ttl +# COMBINE_OUTPUT_NAME=combined_ontology.ttl # Only used if COMBINE_OUTPUT_LOCATION is not set. +COMBINE_FORCE=true + +# AnzoGraph / SPARQL endpoint settings +SPARQL_HOST=http://anzograph:8080 +# SPARQL_ENDPOINT=http://anzograph:8080/sparql +SPARQL_USER=admin +SPARQL_PASS=Passw0rd1 + +# File URI as seen by the AnzoGraph container (used by SPARQL `LOAD`) # Currently not used. +SPARQL_DATA_FILE=file:///opt/shared-files/o3po.ttl # Currently not used. +# SPARQL_GRAPH_IRI=http://example.org/graph + +# Startup behavior for AnzoGraph mode +SPARQL_LOAD_ON_START=false +SPARQL_CLEAR_ON_START=false +SPARQL_READY_TIMEOUT_S=10 + +# Dev UX +CORS_ORIGINS=http://localhost:5173 +VITE_BACKEND_URL=http://backend:8000 + +# Debugging +LOG_SNAPSHOT_TIMINGS=false +FREE_OS_MEMORY_AFTER_SNAPSHOT=false diff --git a/anzograph/README.md b/anzograph/README.md index 5d15fc6..1142ea7 100644 --- a/anzograph/README.md +++ b/anzograph/README.md @@ -11,6 +11,18 @@ The backend connects to AnzoGraph via: - `SPARQL_HOST` (default `http://anzograph:8080`) and the `/sparql` path, or - an explicit `SPARQL_ENDPOINT` +## Persistence + +The `docker-compose.yml` config mounts named volumes into the AnzoGraph container so its state survives +container recreation (e.g. `docker compose up --force-recreate`): + +- `anzograph_app_home → /opt/anzograph/app-home` (machine-id / user home) +- `anzograph_persistence → /opt/anzograph/persistence` (database persistence dir) +- `anzograph_config → /opt/anzograph/config` (settings + activation markers) +- `anzograph_internal → /opt/anzograph/internal` (internal state, including EULA acceptance marker) + +To fully reset AnzoGraph state, remove volumes with `docker compose down -v`. + ## Loading data The backend can optionally load a TTL file on startup (after AnzoGraph is ready): @@ -24,4 +36,3 @@ Because `./data` is mounted at `/opt/shared-files`, anything placed in `./data` - Authentication defaults are configured via the backend env (`SPARQL_USER` / `SPARQL_PASS`). - The AnzoGraph container in this repo is not customized; consult the upstream image documentation for persistence, licensing, and advanced configuration. - diff --git a/backend_go/config.go b/backend_go/config.go index 3e99f90..98753af 100644 --- a/backend_go/config.go +++ b/backend_go/config.go @@ -18,6 +18,11 @@ type Config struct { MaxNodeLimit int MaxEdgeLimit int + EdgeBatchSize int + + FreeOSMemoryAfterSnapshot bool + LogSnapshotTimings bool + SparqlHost string SparqlEndpoint string SparqlUser string @@ -45,6 +50,9 @@ func LoadConfig() (Config, error) { DefaultEdgeLimit: envInt("DEFAULT_EDGE_LIMIT", 2_000_000), MaxNodeLimit: envInt("MAX_NODE_LIMIT", 10_000_000), MaxEdgeLimit: envInt("MAX_EDGE_LIMIT", 20_000_000), + EdgeBatchSize: envInt("EDGE_BATCH_SIZE", 100_000), + FreeOSMemoryAfterSnapshot: envBool("FREE_OS_MEMORY_AFTER_SNAPSHOT", false), + LogSnapshotTimings: envBool("LOG_SNAPSHOT_TIMINGS", false), SparqlHost: envString("SPARQL_HOST", "http://anzograph:8080"), SparqlEndpoint: envString("SPARQL_ENDPOINT", ""), @@ -96,6 +104,12 @@ func LoadConfig() (Config, error) { if cfg.DefaultEdgeLimit > cfg.MaxEdgeLimit { return Config{}, fmt.Errorf("DEFAULT_EDGE_LIMIT must be <= MAX_EDGE_LIMIT") } + if cfg.EdgeBatchSize < 1 { + return Config{}, fmt.Errorf("EDGE_BATCH_SIZE must be >= 1") + } + if cfg.EdgeBatchSize > cfg.MaxEdgeLimit { + return Config{}, fmt.Errorf("EDGE_BATCH_SIZE must be <= MAX_EDGE_LIMIT") + } return cfg, nil } diff --git a/backend_go/graph_export.go b/backend_go/graph_export.go index 4fc980f..03f29ce 100644 --- a/backend_go/graph_export.go +++ b/backend_go/graph_export.go @@ -5,89 +5,87 @@ type termKey struct { key string } -type termMeta struct { - termType string - iri string +type graphAccumulator struct { + includeBNodes bool + nodeLimit int + nodeIDByKey map[termKey]uint32 + nodes []Node + edges []Edge + preds *PredicateDict } -func graphFromSparqlBindings( - bindings []map[string]sparqlTerm, - nodeLimit int, - includeBNodes bool, -) (nodes []Node, edges []Edge) { - nodeIDByKey := map[termKey]uint32{} - nodeMeta := make([]termMeta, 0, min(nodeLimit, 4096)) +func newGraphAccumulator(nodeLimit int, includeBNodes bool, edgeCapHint int, preds *PredicateDict) *graphAccumulator { + if preds == nil { + preds = NewPredicateDict(nil) + } + return &graphAccumulator{ + includeBNodes: includeBNodes, + nodeLimit: nodeLimit, + nodeIDByKey: make(map[termKey]uint32), + nodes: make([]Node, 0, min(nodeLimit, 4096)), + edges: make([]Edge, 0, min(edgeCapHint, 4096)), + preds: preds, + } +} - getOrAdd := func(term sparqlTerm) (uint32, bool) { - if term.Type == "" || term.Value == "" { - return 0, false - } - if term.Type == "literal" { - return 0, false - } - - var key termKey - var meta termMeta - - if term.Type == "bnode" { - if !includeBNodes { - return 0, false - } - key = termKey{termType: "bnode", key: term.Value} - meta = termMeta{termType: "bnode", iri: "_:" + term.Value} - } else { - key = termKey{termType: "uri", key: term.Value} - meta = termMeta{termType: "uri", iri: term.Value} - } - - if existing, ok := nodeIDByKey[key]; ok { - return existing, true - } - if len(nodeMeta) >= nodeLimit { - return 0, false - } - nid := uint32(len(nodeMeta)) - nodeIDByKey[key] = nid - nodeMeta = append(nodeMeta, meta) - return nid, true +func (g *graphAccumulator) getOrAddNode(term sparqlTerm) (uint32, bool) { + if term.Type == "" || term.Value == "" { + return 0, false + } + if term.Type == "literal" { + return 0, false } + var key termKey + var node Node + + if term.Type == "bnode" { + if !g.includeBNodes { + return 0, false + } + key = termKey{termType: "bnode", key: term.Value} + node = Node{ID: 0, TermType: "bnode", IRI: "_:" + term.Value, Label: nil, X: 0, Y: 0} + } else { + key = termKey{termType: "uri", key: term.Value} + node = Node{ID: 0, TermType: "uri", IRI: term.Value, Label: nil, X: 0, Y: 0} + } + + if existing, ok := g.nodeIDByKey[key]; ok { + return existing, true + } + if len(g.nodes) >= g.nodeLimit { + return 0, false + } + nid := uint32(len(g.nodes)) + g.nodeIDByKey[key] = nid + node.ID = nid + g.nodes = append(g.nodes, node) + return nid, true +} + +func (g *graphAccumulator) addBindings(bindings []map[string]sparqlTerm) { for _, b := range bindings { sTerm := b["s"] oTerm := b["o"] pTerm := b["p"] - sid, okS := getOrAdd(sTerm) - oid, okO := getOrAdd(oTerm) + sid, okS := g.getOrAddNode(sTerm) + oid, okO := g.getOrAddNode(oTerm) if !okS || !okO { continue } - pred := pTerm.Value - if pred == "" { + predID, ok := g.preds.GetOrAdd(pTerm.Value) + if !ok { continue } - edges = append(edges, Edge{ - Source: sid, - Target: oid, - Predicate: pred, + g.edges = append(g.edges, Edge{ + Source: sid, + Target: oid, + PredicateID: predID, }) } - - nodes = make([]Node, len(nodeMeta)) - for i, m := range nodeMeta { - nodes[i] = Node{ - ID: uint32(i), - TermType: m.termType, - IRI: m.iri, - Label: nil, - X: 0, - Y: 0, - } - } - - return nodes, edges } func min(a, b int) int { diff --git a/backend_go/graph_queries/default.go b/backend_go/graph_queries/default.go index afba840..a1acb7d 100644 --- a/backend_go/graph_queries/default.go +++ b/backend_go/graph_queries/default.go @@ -2,7 +2,7 @@ package graph_queries import "fmt" -func defaultEdgeQuery(edgeLimit int, includeBNodes bool) string { +func defaultEdgeQuery(limit int, offset int, includeBNodes bool) string { bnodeFilter := "" if !includeBNodes { bnodeFilter = "FILTER(!isBlank(?s) && !isBlank(?o))" @@ -28,7 +28,38 @@ WHERE { FILTER(!isLiteral(?o)) %s } +ORDER BY ?s ?p ?o LIMIT %d -`, bnodeFilter, edgeLimit) +OFFSET %d +`, bnodeFilter, limit, offset) } +func defaultPredicateQuery(includeBNodes bool) string { + bnodeFilter := "" + if !includeBNodes { + bnodeFilter = "FILTER(!isBlank(?s) && !isBlank(?o))" + } + + return fmt.Sprintf(` +PREFIX rdf: +PREFIX rdfs: +PREFIX owl: + +SELECT DISTINCT ?p +WHERE { + { + VALUES ?p { rdf:type } + ?s ?p ?o . + ?o rdf:type owl:Class . + } + UNION + { + VALUES ?p { rdfs:subClassOf } + ?s ?p ?o . + } + FILTER(!isLiteral(?o)) + %s +} +ORDER BY ?p +`, bnodeFilter) +} diff --git a/backend_go/graph_queries/hierarchy.go b/backend_go/graph_queries/hierarchy.go index 61835a0..125aa75 100644 --- a/backend_go/graph_queries/hierarchy.go +++ b/backend_go/graph_queries/hierarchy.go @@ -2,7 +2,7 @@ package graph_queries import "fmt" -func hierarchyEdgeQuery(edgeLimit int, includeBNodes bool) string { +func hierarchyEdgeQuery(limit int, offset int, includeBNodes bool) string { bnodeFilter := "" if !includeBNodes { bnodeFilter = "FILTER(!isBlank(?s) && !isBlank(?o))" @@ -18,7 +18,28 @@ WHERE { FILTER(!isLiteral(?o)) %s } +ORDER BY ?s ?p ?o LIMIT %d -`, bnodeFilter, edgeLimit) +OFFSET %d +`, bnodeFilter, limit, offset) } +func hierarchyPredicateQuery(includeBNodes bool) string { + bnodeFilter := "" + if !includeBNodes { + bnodeFilter = "FILTER(!isBlank(?s) && !isBlank(?o))" + } + + return fmt.Sprintf(` +PREFIX rdfs: + +SELECT DISTINCT ?p +WHERE { + VALUES ?p { rdfs:subClassOf } + ?s ?p ?o . + FILTER(!isLiteral(?o)) + %s +} +ORDER BY ?p +`, bnodeFilter) +} diff --git a/backend_go/graph_queries/registry.go b/backend_go/graph_queries/registry.go index 9967330..bc88b1e 100644 --- a/backend_go/graph_queries/registry.go +++ b/backend_go/graph_queries/registry.go @@ -3,9 +3,9 @@ package graph_queries const DefaultID = "default" var definitions = []Definition{ - {Meta: Meta{ID: DefaultID, Label: "Default"}, EdgeQuery: defaultEdgeQuery}, - {Meta: Meta{ID: "hierarchy", Label: "Hierarchy"}, EdgeQuery: hierarchyEdgeQuery}, - {Meta: Meta{ID: "types", Label: "Types"}, EdgeQuery: typesOnlyEdgeQuery}, + {Meta: Meta{ID: DefaultID, Label: "Default"}, EdgeQuery: defaultEdgeQuery, PredicateQuery: defaultPredicateQuery}, + {Meta: Meta{ID: "hierarchy", Label: "Hierarchy"}, EdgeQuery: hierarchyEdgeQuery, PredicateQuery: hierarchyPredicateQuery}, + {Meta: Meta{ID: "types", Label: "Types"}, EdgeQuery: typesOnlyEdgeQuery, PredicateQuery: typesOnlyPredicateQuery}, } func List() []Meta { @@ -24,4 +24,3 @@ func Get(id string) (Definition, bool) { } return Definition{}, false } - diff --git a/backend_go/graph_queries/types.go b/backend_go/graph_queries/types.go index d6e4481..d383735 100644 --- a/backend_go/graph_queries/types.go +++ b/backend_go/graph_queries/types.go @@ -7,6 +7,6 @@ type Meta struct { type Definition struct { Meta Meta - EdgeQuery func(edgeLimit int, includeBNodes bool) string + EdgeQuery func(limit int, offset int, includeBNodes bool) string + PredicateQuery func(includeBNodes bool) string } - diff --git a/backend_go/graph_queries/types_only.go b/backend_go/graph_queries/types_only.go index c47afd3..1082202 100644 --- a/backend_go/graph_queries/types_only.go +++ b/backend_go/graph_queries/types_only.go @@ -2,7 +2,7 @@ package graph_queries import "fmt" -func typesOnlyEdgeQuery(edgeLimit int, includeBNodes bool) string { +func typesOnlyEdgeQuery(limit int, offset int, includeBNodes bool) string { bnodeFilter := "" if !includeBNodes { bnodeFilter = "FILTER(!isBlank(?s) && !isBlank(?o))" @@ -20,7 +20,30 @@ WHERE { FILTER(!isLiteral(?o)) %s } +ORDER BY ?s ?p ?o LIMIT %d -`, bnodeFilter, edgeLimit) +OFFSET %d +`, bnodeFilter, limit, offset) } +func typesOnlyPredicateQuery(includeBNodes bool) string { + bnodeFilter := "" + if !includeBNodes { + bnodeFilter = "FILTER(!isBlank(?s) && !isBlank(?o))" + } + + return fmt.Sprintf(` +PREFIX rdf: +PREFIX owl: + +SELECT DISTINCT ?p +WHERE { + VALUES ?p { rdf:type } + ?s ?p ?o . + ?o rdf:type owl:Class . + FILTER(!isLiteral(?o)) + %s +} +ORDER BY ?p +`, bnodeFilter) +} diff --git a/backend_go/graph_snapshot.go b/backend_go/graph_snapshot.go index 6f3b3f3..4e97036 100644 --- a/backend_go/graph_snapshot.go +++ b/backend_go/graph_snapshot.go @@ -4,13 +4,19 @@ import ( "context" "encoding/json" "fmt" + "log" + "runtime" + "runtime/debug" "sort" "strings" + "time" graphqueries "visualizador_instanciados/backend_go/graph_queries" ) -const rdfsLabelIRI = "http://www.w3.org/2000/01/rdf-schema#label" +const ( + rdfsLabelIRI = "http://www.w3.org/2000/01/rdf-schema#label" +) func fetchGraphSnapshot( ctx context.Context, @@ -20,22 +26,148 @@ func fetchGraphSnapshot( edgeLimit int, graphQueryID string, ) (GraphResponse, error) { + start := time.Now() + logStats := func(stage string) { + if !cfg.LogSnapshotTimings { + return + } + var ms runtime.MemStats + runtime.ReadMemStats(&ms) + log.Printf( + "[snapshot] %s graph_query_id=%s node_limit=%d edge_limit=%d elapsed=%s alloc=%dMB heap_inuse=%dMB sys=%dMB numgc=%d", + stage, + graphQueryID, + nodeLimit, + edgeLimit, + time.Since(start).Truncate(time.Millisecond), + ms.Alloc/1024/1024, + ms.HeapInuse/1024/1024, + ms.Sys/1024/1024, + ms.NumGC, + ) + } + def, ok := graphqueries.Get(graphQueryID) if !ok { return GraphResponse{}, fmt.Errorf("unknown graph_query_id: %s", graphQueryID) } - edgesQ := def.EdgeQuery(edgeLimit, cfg.IncludeBNodes) - raw, err := sparql.Query(ctx, edgesQ) + + // Build predicate dictionary (predicate IRI -> uint32 ID) before fetching edges. + preds, err := func() (*PredicateDict, error) { + logStats("predicates_query_start") + predQ := def.PredicateQuery(cfg.IncludeBNodes) + t0 := time.Now() + rawPred, err := sparql.Query(ctx, predQ) + if err != nil { + return nil, fmt.Errorf("predicates query failed: %w", err) + } + if cfg.LogSnapshotTimings { + log.Printf("[snapshot] predicates_query_returned bytes=%d query_time=%s", len(rawPred), time.Since(t0).Truncate(time.Millisecond)) + } + var predRes sparqlResponse + t1 := time.Now() + if err := json.Unmarshal(rawPred, &predRes); err != nil { + return nil, fmt.Errorf("predicates unmarshal failed: %w", err) + } + if cfg.LogSnapshotTimings { + log.Printf("[snapshot] predicates_unmarshal_done bindings=%d unmarshal_time=%s", len(predRes.Results.Bindings), time.Since(t1).Truncate(time.Millisecond)) + } + predicateIRIs := make([]string, 0, len(predRes.Results.Bindings)) + for _, b := range predRes.Results.Bindings { + pTerm, ok := b["p"] + if !ok || pTerm.Type != "uri" || pTerm.Value == "" { + continue + } + predicateIRIs = append(predicateIRIs, pTerm.Value) + } + logStats("predicates_dict_built") + return NewPredicateDict(predicateIRIs), nil + }() if err != nil { return GraphResponse{}, err } - var res sparqlResponse - if err := json.Unmarshal(raw, &res); err != nil { - return GraphResponse{}, fmt.Errorf("failed to parse SPARQL JSON: %w", err) - } + // Fetch edges in batches to avoid decoding a single huge SPARQL JSON response. + logStats("edges_batched_start") + batchSize := cfg.EdgeBatchSize + acc := newGraphAccumulator(nodeLimit, cfg.IncludeBNodes, min(edgeLimit, batchSize), preds) - nodes, edges := graphFromSparqlBindings(res.Results.Bindings, nodeLimit, cfg.IncludeBNodes) + totalBindings := 0 + convAllT0 := time.Now() + for batch, offset := 0, 0; offset < edgeLimit; batch, offset = batch+1, offset+batchSize { + limit := batchSize + remaining := edgeLimit - offset + if remaining < limit { + limit = remaining + } + + logStats(fmt.Sprintf("edges_batch_start batch=%d offset=%d limit=%d", batch, offset, limit)) + bindings, err := func() ([]map[string]sparqlTerm, error) { + edgesQ := def.EdgeQuery(limit, offset, cfg.IncludeBNodes) + t0 := time.Now() + raw, err := sparql.Query(ctx, edgesQ) + if err != nil { + return nil, fmt.Errorf("edges query failed: %w", err) + } + if cfg.LogSnapshotTimings { + log.Printf("[snapshot] edges_batch_query_returned batch=%d offset=%d limit=%d bytes=%d query_time=%s", batch, offset, limit, len(raw), time.Since(t0).Truncate(time.Millisecond)) + } + + var res sparqlResponse + t1 := time.Now() + if err := json.Unmarshal(raw, &res); err != nil { + return nil, fmt.Errorf("edges unmarshal failed: %w", err) + } + if cfg.LogSnapshotTimings { + log.Printf("[snapshot] edges_batch_unmarshal_done batch=%d bindings=%d unmarshal_time=%s", batch, len(res.Results.Bindings), time.Since(t1).Truncate(time.Millisecond)) + } + return res.Results.Bindings, nil + }() + if err != nil { + return GraphResponse{}, fmt.Errorf("edges batch=%d offset=%d limit=%d: %w", batch, offset, limit, err) + } + + got := len(bindings) + totalBindings += got + if got == 0 { + bindings = nil + logStats(fmt.Sprintf("edges_batch_done_empty batch=%d offset=%d", batch, offset)) + break + } + + convT0 := time.Now() + acc.addBindings(bindings) + if cfg.LogSnapshotTimings { + log.Printf( + "[snapshot] edges_batch_convert_done batch=%d got_bindings=%d total_bindings=%d nodes=%d edges=%d convert_time=%s", + batch, + got, + totalBindings, + len(acc.nodes), + len(acc.edges), + time.Since(convT0).Truncate(time.Millisecond), + ) + } + + // Make the batch eligible for GC. + bindings = nil + logStats(fmt.Sprintf("edges_batch_done batch=%d offset=%d", batch, offset)) + if cfg.FreeOSMemoryAfterSnapshot { + debug.FreeOSMemory() + logStats(fmt.Sprintf("edges_batch_free_os_memory_done batch=%d offset=%d", batch, offset)) + } + + if got < limit { + break + } + } + if cfg.LogSnapshotTimings { + log.Printf("[snapshot] convert_batches_done total_bindings=%d total_time=%s", totalBindings, time.Since(convAllT0).Truncate(time.Millisecond)) + } + logStats("edges_batched_done") + + nodes := acc.nodes + edges := acc.edges // Layout: invert edges for hierarchy (target -> source). hierEdges := make([][2]int, 0, len(edges)) @@ -82,7 +214,7 @@ func fetchGraphSnapshot( if len(iris) > 0 { labelByIRI, err := fetchRDFSLabels(ctx, sparql, iris, 500) if err != nil { - return GraphResponse{}, err + return GraphResponse{}, fmt.Errorf("fetch rdfs:label failed: %w", err) } for i := range nodes { if nodes[i].TermType != "uri" { @@ -103,6 +235,7 @@ func fetchGraphSnapshot( SparqlEndpoint: cfg.EffectiveSparqlEndpoint(), IncludeBNodes: cfg.IncludeBNodes, GraphQueryID: graphQueryID, + Predicates: preds.IRIs(), NodeLimit: nodeLimit, EdgeLimit: edgeLimit, Nodes: len(nodes), diff --git a/backend_go/models.go b/backend_go/models.go index 2abd658..77a0dbc 100644 --- a/backend_go/models.go +++ b/backend_go/models.go @@ -20,7 +20,7 @@ type Node struct { type Edge struct { Source uint32 `json:"source"` Target uint32 `json:"target"` - Predicate string `json:"predicate"` + PredicateID uint32 `json:"predicate_id"` } type GraphMeta struct { @@ -29,6 +29,7 @@ type GraphMeta struct { SparqlEndpoint string `json:"sparql_endpoint"` IncludeBNodes bool `json:"include_bnodes"` GraphQueryID string `json:"graph_query_id"` + Predicates []string `json:"predicates,omitempty"` // index = predicate_id NodeLimit int `json:"node_limit"` EdgeLimit int `json:"edge_limit"` Nodes int `json:"nodes"` diff --git a/backend_go/predicate_dict.go b/backend_go/predicate_dict.go new file mode 100644 index 0000000..d4e5972 --- /dev/null +++ b/backend_go/predicate_dict.go @@ -0,0 +1,40 @@ +package main + +type PredicateDict struct { + idByIRI map[string]uint32 + iriByID []string +} + +func NewPredicateDict(predicates []string) *PredicateDict { + idByIRI := make(map[string]uint32, len(predicates)) + iriByID := make([]string, 0, len(predicates)) + for _, iri := range predicates { + if iri == "" { + continue + } + if _, ok := idByIRI[iri]; ok { + continue + } + id := uint32(len(iriByID)) + idByIRI[iri] = id + iriByID = append(iriByID, iri) + } + return &PredicateDict{idByIRI: idByIRI, iriByID: iriByID} +} + +func (d *PredicateDict) GetOrAdd(iri string) (uint32, bool) { + if iri == "" { + return 0, false + } + if id, ok := d.idByIRI[iri]; ok { + return id, true + } + id := uint32(len(d.iriByID)) + d.idByIRI[iri] = id + d.iriByID = append(d.iriByID, iri) + return id, true +} + +func (d *PredicateDict) IRIs() []string { + return d.iriByID +} diff --git a/backend_go/server.go b/backend_go/server.go index 9a1da31..5a042ec 100644 --- a/backend_go/server.go +++ b/backend_go/server.go @@ -2,6 +2,7 @@ package main import ( "fmt" + "log" "net/http" "strconv" "strings" @@ -75,11 +76,12 @@ func (s *APIServer) handleStats(w http.ResponseWriter, r *http.Request) { } ctx := r.Context() - snap, err := s.snapshots.Get(ctx, s.cfg.DefaultNodeLimit, s.cfg.DefaultEdgeLimit, graphqueries.DefaultID) - if err != nil { - writeError(w, http.StatusInternalServerError, err.Error()) - return - } + snap, err := s.snapshots.Get(ctx, s.cfg.DefaultNodeLimit, s.cfg.DefaultEdgeLimit, graphqueries.DefaultID) + if err != nil { + log.Printf("handleStats: snapshot error: %v", err) + writeError(w, http.StatusInternalServerError, err.Error()) + return + } endpoint := snap.Meta.SparqlEndpoint writeJSON(w, http.StatusOK, StatsResponse{ @@ -141,11 +143,12 @@ func (s *APIServer) handleGraph(w http.ResponseWriter, r *http.Request) { return } - snap, err := s.snapshots.Get(r.Context(), nodeLimit, edgeLimit, graphQueryID) - if err != nil { - if _, ok := err.(*CycleError); ok { - writeError(w, http.StatusUnprocessableEntity, err.Error()) - return + snap, err := s.snapshots.Get(r.Context(), nodeLimit, edgeLimit, graphQueryID) + if err != nil { + log.Printf("handleGraph: snapshot error graph_query_id=%s node_limit=%d edge_limit=%d err=%v", graphQueryID, nodeLimit, edgeLimit, err) + if _, ok := err.(*CycleError); ok { + writeError(w, http.StatusUnprocessableEntity, err.Error()) + return } writeError(w, http.StatusInternalServerError, err.Error()) return diff --git a/docker-compose.yml b/docker-compose.yml index a8266f9..9043fb2 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -34,9 +34,16 @@ services: - SPARQL_READY_RETRIES=${SPARQL_READY_RETRIES:-30} - SPARQL_READY_DELAY_S=${SPARQL_READY_DELAY_S:-4} - SPARQL_READY_TIMEOUT_S=${SPARQL_READY_TIMEOUT_S:-10} + - EDGE_BATCH_SIZE=${EDGE_BATCH_SIZE:-100000} + - FREE_OS_MEMORY_AFTER_SNAPSHOT=${FREE_OS_MEMORY_AFTER_SNAPSHOT:-false} + - LOG_SNAPSHOT_TIMINGS=${LOG_SNAPSHOT_TIMINGS:-false} depends_on: - - owl_imports_combiner - - anzograph + owl_imports_combiner: + condition: service_completed_successfully + anzograph: + condition: service_started + volumes: + - ./data:/data:Z healthcheck: test: ["CMD", "curl", "-fsS", "http://localhost:8000/api/health"] interval: 5s @@ -53,15 +60,26 @@ services: - ./frontend:/app - /app/node_modules depends_on: - - backend - # Docker Compose v1 doesn't support depends_on:condition. Do an explicit wait here. - command: sh -c "until wget -qO- http://backend:8000/api/health >/dev/null 2>&1; do echo 'waiting for backend...'; sleep 1; done; npm run dev -- --host --port 5173" + backend: + condition: service_healthy anzograph: image: cambridgesemantics/anzograph:latest container_name: anzograph + mem_limit: 20g ports: - "8080:8080" - "8443:8443" volumes: - ./data:/opt/shared-files:Z + # Persist AnzoGraph state across container recreation (EULA acceptance, machine-id, settings, persistence). + - anzograph_app_home:/opt/anzograph/app-home + - anzograph_persistence:/opt/anzograph/persistence + - anzograph_config:/opt/anzograph/config + - anzograph_internal:/opt/anzograph/internal + +volumes: + anzograph_app_home: + anzograph_persistence: + anzograph_config: + anzograph_internal: diff --git a/frontend/src/App.tsx b/frontend/src/App.tsx index 6fc2883..1a91e57 100644 --- a/frontend/src/App.tsx +++ b/frontend/src/App.tsx @@ -43,7 +43,18 @@ export default function App() { setStatus("Fetching graph…"); const graphRes = await fetch(`/api/graph?graph_query_id=${encodeURIComponent(graphQueryId)}`, { signal }); - if (!graphRes.ok) throw new Error(`Failed to fetch graph: ${graphRes.status}`); + if (!graphRes.ok) { + let detail = ""; + try { + const err = await graphRes.json(); + if (err && typeof err === "object" && typeof (err as any).detail === "string") { + detail = (err as any).detail; + } + } catch { + // ignore + } + throw new Error(`Failed to fetch graph: ${graphRes.status}${detail ? ` (${detail})` : ""}`); + } const graph = await graphRes.json(); if (signal.aborted) return; diff --git a/frontend/vite.config.ts b/frontend/vite.config.ts index 5e33f5d..77d3338 100644 --- a/frontend/vite.config.ts +++ b/frontend/vite.config.ts @@ -10,7 +10,23 @@ const __dirname = path.dirname(__filename); // https://vite.dev/config/ export default defineConfig({ - plugins: [react(), tailwindcss(), viteSingleFile()], + plugins: [ + react(), + tailwindcss(), + viteSingleFile(), + { + name: "long-timeouts", + configureServer(server) { + // Large graph snapshots can take minutes; keep the dev server from killing the request. + const httpServer = server.httpServer; + if (!httpServer) return; + const ms30m = 30 * 60 * 1000; + httpServer.headersTimeout = ms30m; + httpServer.requestTimeout = ms30m; + httpServer.keepAliveTimeout = ms30m; + }, + }, + ], resolve: { alias: { "@": path.resolve(__dirname, "src"), @@ -19,7 +35,20 @@ export default defineConfig({ server: { proxy: { // Backend is reachable as http://backend:8000 inside docker-compose; localhost outside. - "/api": process.env.VITE_BACKEND_URL || "http://localhost:8000", + "/api": { + target: process.env.VITE_BACKEND_URL || "http://localhost:8000", + changeOrigin: true, + configure: (proxy) => { + proxy.on("error", (err) => { + // Surface upstream timeouts/socket errors in `docker compose logs frontend`. + console.error("[vite-proxy] /api error:", err); + }); + }, + // The initial graph snapshot can take minutes with large limits (SPARQL + layout + labels). + // Prevent the dev proxy from timing out and returning a 500 to the browser. + timeout: 30 * 60 * 1000, + proxyTimeout: 30 * 60 * 1000, + }, }, }, }); diff --git a/python_services/owl_imports_combiner/main.py b/python_services/owl_imports_combiner/main.py index 9f65fab..9b4d3b2 100644 --- a/python_services/owl_imports_combiner/main.py +++ b/python_services/owl_imports_combiner/main.py @@ -38,8 +38,8 @@ def main() -> None: output_location=os.getenv("COMBINE_OUTPUT_LOCATION"), output_name=output_name, ) - output_path = output_location_to_path(output_location) + force = _env_bool("COMBINE_FORCE", default=False) if output_path.exists() and not force: logger.info("Skipping combine step (output exists): %s", output_location)