diff --git a/backend_go/README.md b/backend_go/README.md index 769f408..0fba306 100644 --- a/backend_go/README.md +++ b/backend_go/README.md @@ -27,10 +27,14 @@ Important variables: - `DEFAULT_NODE_LIMIT`, `DEFAULT_EDGE_LIMIT` - `MAX_NODE_LIMIT`, `MAX_EDGE_LIMIT` - SPARQL connectivity: + - `SPARQL_SOURCE_MODE` (`local` or `external`) - `SPARQL_HOST` (default `http://anzograph:8080`) or `SPARQL_ENDPOINT` + - `EXTERNAL_SPARQL_ENDPOINT` for external AnzoGraph access + - `KEYCLOAK_TOKEN_ENDPOINT`, `KEYCLOAK_CLIENT_ID`, `KEYCLOAK_USERNAME`, `KEYCLOAK_PASSWORD`, `KEYCLOAK_SCOPE` - `SPARQL_USER`, `SPARQL_PASS` + - External mode fetches a bearer token from Keycloak at startup, sends `Authorization: Bearer ...` to `EXTERNAL_SPARQL_ENDPOINT`, and refreshes once on `401 Unauthorized: Jwt is expired` - Startup behavior: - - `SPARQL_LOAD_ON_START`, `SPARQL_CLEAR_ON_START` + - `SPARQL_LOAD_ON_START` - `SPARQL_DATA_FILE` (typically `file:///opt/shared-files/.ttl`) - Other: - `INCLUDE_BNODES` (include blank nodes in snapshots) @@ -68,9 +72,9 @@ Stored under `backend_go/graph_queries/` and listed by `GET /api/graph_queries`. Built-in modes: -- `default` – `rdf:type` (to `owl:Class`) + `rdfs:subClassOf` -- `hierarchy` – `rdfs:subClassOf` only -- `types` – `rdf:type` (to `owl:Class`) only +- `default` – `rdf:type` + `rdfs:subClassOf` +- `hierarchy` – `rdfs:subClassOf` + `rdf:type` +- `types` – `rdf:type` only To add a new mode: @@ -94,5 +98,6 @@ To add a new mode: ## Performance notes -- Memory usage is dominated by the cached snapshot (`[]Node`, `[]Edge`) and the temporary SPARQL JSON unmarshalling step. +- Memory usage is dominated by the cached snapshot (`[]Node`, `[]Edge`) and large SPARQL result sets. +- The backend streams SPARQL JSON bindings for snapshot edge batches to reduce decode overhead. - Tune `DEFAULT_NODE_LIMIT`/`DEFAULT_EDGE_LIMIT` first if memory is too high. diff --git a/backend_go/config.go b/backend_go/config.go index c60a961..d6d8731 100644 --- a/backend_go/config.go +++ b/backend_go/config.go @@ -23,15 +23,22 @@ type Config struct { FreeOSMemoryAfterSnapshot bool LogSnapshotTimings bool + SparqlSourceMode string SparqlHost string SparqlEndpoint string + ExternalSparqlEndpoint string + AccessToken string + KeycloakTokenEndpoint string + KeycloakClientID string + KeycloakUsername string + KeycloakPassword string + KeycloakScope string SparqlUser string SparqlPass string SparqlInsecureTLS bool SparqlDataFile string SparqlGraphIRI string SparqlLoadOnStart bool - SparqlClearOnStart bool SparqlTimeout time.Duration SparqlReadyRetries int @@ -60,20 +67,27 @@ func LoadConfig() (Config, error) { FreeOSMemoryAfterSnapshot: envBool("FREE_OS_MEMORY_AFTER_SNAPSHOT", false), LogSnapshotTimings: envBool("LOG_SNAPSHOT_TIMINGS", false), + SparqlSourceMode: envString("SPARQL_SOURCE_MODE", "local"), SparqlHost: envString("SPARQL_HOST", "http://anzograph:8080"), SparqlEndpoint: envString("SPARQL_ENDPOINT", ""), + ExternalSparqlEndpoint: envString("EXTERNAL_SPARQL_ENDPOINT", ""), + AccessToken: envString("ACCESS_TOKEN", ""), + KeycloakTokenEndpoint: envString("KEYCLOAK_TOKEN_ENDPOINT", ""), + KeycloakClientID: envString("KEYCLOAK_CLIENT_ID", ""), + KeycloakUsername: envString("KEYCLOAK_USERNAME", ""), + KeycloakPassword: envString("KEYCLOAK_PASSWORD", ""), + KeycloakScope: envString("KEYCLOAK_SCOPE", "openid"), SparqlUser: envString("SPARQL_USER", ""), SparqlPass: envString("SPARQL_PASS", ""), SparqlInsecureTLS: envBool("SPARQL_INSECURE_TLS", false), SparqlDataFile: envString("SPARQL_DATA_FILE", ""), SparqlGraphIRI: envString("SPARQL_GRAPH_IRI", ""), SparqlLoadOnStart: envBool("SPARQL_LOAD_ON_START", false), - SparqlClearOnStart: envBool("SPARQL_CLEAR_ON_START", false), - HierarchyLayoutEngine: envString("HIERARCHY_LAYOUT_ENGINE", "go"), - HierarchyLayoutBridgeBin: envString("HIERARCHY_LAYOUT_BRIDGE_BIN", "/app/radial_sugiyama_go_bridge"), + HierarchyLayoutEngine: envString("HIERARCHY_LAYOUT_ENGINE", "go"), + HierarchyLayoutBridgeBin: envString("HIERARCHY_LAYOUT_BRIDGE_BIN", "/app/radial_sugiyama_go_bridge"), HierarchyLayoutBridgeWorkdir: envString("HIERARCHY_LAYOUT_BRIDGE_WORKDIR", "/workspace/radial_sugiyama"), - HierarchyLayoutRootIRI: envString("HIERARCHY_LAYOUT_ROOT_IRI", "http://purl.obolibrary.org/obo/BFO_0000001"), + HierarchyLayoutRootIRI: envString("HIERARCHY_LAYOUT_ROOT_IRI", "http://purl.obolibrary.org/obo/BFO_0000001"), SparqlReadyRetries: envInt("SPARQL_READY_RETRIES", 30), ListenAddr: envString("LISTEN_ADDR", ":8000"), @@ -100,6 +114,35 @@ func LoadConfig() (Config, error) { if cfg.SparqlLoadOnStart && strings.TrimSpace(cfg.SparqlDataFile) == "" { return Config{}, fmt.Errorf("SPARQL_LOAD_ON_START=true but SPARQL_DATA_FILE is not set") } + switch strings.ToLower(strings.TrimSpace(cfg.SparqlSourceMode)) { + case "local", "external": + cfg.SparqlSourceMode = strings.ToLower(strings.TrimSpace(cfg.SparqlSourceMode)) + default: + return Config{}, fmt.Errorf("SPARQL_SOURCE_MODE must be 'local' or 'external'") + } + if cfg.UsesExternalSparql() { + if strings.TrimSpace(cfg.ExternalSparqlEndpoint) == "" { + return Config{}, fmt.Errorf("EXTERNAL_SPARQL_ENDPOINT must be set when SPARQL_SOURCE_MODE=external") + } + if strings.TrimSpace(cfg.KeycloakTokenEndpoint) == "" { + return Config{}, fmt.Errorf("KEYCLOAK_TOKEN_ENDPOINT must be set when SPARQL_SOURCE_MODE=external") + } + if strings.TrimSpace(cfg.KeycloakClientID) == "" { + return Config{}, fmt.Errorf("KEYCLOAK_CLIENT_ID must be set when SPARQL_SOURCE_MODE=external") + } + if strings.TrimSpace(cfg.KeycloakUsername) == "" { + return Config{}, fmt.Errorf("KEYCLOAK_USERNAME must be set when SPARQL_SOURCE_MODE=external") + } + if strings.TrimSpace(cfg.KeycloakPassword) == "" { + return Config{}, fmt.Errorf("KEYCLOAK_PASSWORD must be set when SPARQL_SOURCE_MODE=external") + } + if strings.TrimSpace(cfg.KeycloakScope) == "" { + cfg.KeycloakScope = "openid" + } + if cfg.SparqlLoadOnStart { + return Config{}, fmt.Errorf("SPARQL_LOAD_ON_START is not supported when SPARQL_SOURCE_MODE=external") + } + } if cfg.DefaultNodeLimit < 1 { return Config{}, fmt.Errorf("DEFAULT_NODE_LIMIT must be >= 1") @@ -148,12 +191,19 @@ func LoadConfig() (Config, error) { } func (c Config) EffectiveSparqlEndpoint() string { + if c.UsesExternalSparql() { + return strings.TrimSpace(c.ExternalSparqlEndpoint) + } if strings.TrimSpace(c.SparqlEndpoint) != "" { return strings.TrimSpace(c.SparqlEndpoint) } return strings.TrimRight(c.SparqlHost, "/") + "/sparql" } +func (c Config) UsesExternalSparql() bool { + return strings.EqualFold(strings.TrimSpace(c.SparqlSourceMode), "external") +} + func (c Config) corsOriginList() []string { raw := strings.TrimSpace(c.CorsOrigins) if raw == "" || raw == "*" { diff --git a/backend_go/graph_export.go b/backend_go/graph_export.go index 03f29ce..da6ad97 100644 --- a/backend_go/graph_export.go +++ b/backend_go/graph_export.go @@ -5,10 +5,17 @@ type termKey struct { key string } +type edgeKey struct { + source uint32 + target uint32 + predicateID uint32 +} + type graphAccumulator struct { includeBNodes bool nodeLimit int nodeIDByKey map[termKey]uint32 + seenEdges map[edgeKey]struct{} nodes []Node edges []Edge preds *PredicateDict @@ -22,6 +29,7 @@ func newGraphAccumulator(nodeLimit int, includeBNodes bool, edgeCapHint int, pre includeBNodes: includeBNodes, nodeLimit: nodeLimit, nodeIDByKey: make(map[termKey]uint32), + seenEdges: make(map[edgeKey]struct{}, min(edgeCapHint, 4096)), nodes: make([]Node, 0, min(nodeLimit, 4096)), edges: make([]Edge, 0, min(edgeCapHint, 4096)), preds: preds, @@ -63,29 +71,29 @@ func (g *graphAccumulator) getOrAddNode(term sparqlTerm) (uint32, bool) { return nid, true } -func (g *graphAccumulator) addBindings(bindings []map[string]sparqlTerm) { - for _, b := range bindings { - sTerm := b["s"] - oTerm := b["o"] - pTerm := b["p"] - - sid, okS := g.getOrAddNode(sTerm) - oid, okO := g.getOrAddNode(oTerm) - if !okS || !okO { - continue - } - - predID, ok := g.preds.GetOrAdd(pTerm.Value) - if !ok { - continue - } - - g.edges = append(g.edges, Edge{ - Source: sid, - Target: oid, - PredicateID: predID, - }) +func (g *graphAccumulator) addTripleBinding(binding sparqlTripleBinding) { + sid, okS := g.getOrAddNode(binding.S) + oid, okO := g.getOrAddNode(binding.O) + if !okS || !okO { + return } + + predID, ok := g.preds.GetOrAdd(binding.P.Value) + if !ok { + return + } + + key := edgeKey{source: sid, target: oid, predicateID: predID} + if _, seen := g.seenEdges[key]; seen { + return + } + g.seenEdges[key] = struct{}{} + + g.edges = append(g.edges, Edge{ + Source: sid, + Target: oid, + PredicateID: predID, + }) } func min(a, b int) int { diff --git a/backend_go/graph_export_test.go b/backend_go/graph_export_test.go new file mode 100644 index 0000000..eb6b714 --- /dev/null +++ b/backend_go/graph_export_test.go @@ -0,0 +1,24 @@ +package main + +import "testing" + +func TestGraphAccumulatorDeduplicatesEdges(t *testing.T) { + preds := NewPredicateDict([]string{"http://example.com/p"}) + acc := newGraphAccumulator(16, false, 16, preds) + + binding := sparqlTripleBinding{ + S: sparqlTerm{Type: "uri", Value: "http://example.com/s"}, + P: sparqlTerm{Type: "uri", Value: "http://example.com/p"}, + O: sparqlTerm{Type: "uri", Value: "http://example.com/o"}, + } + + acc.addTripleBinding(binding) + acc.addTripleBinding(binding) + + if len(acc.nodes) != 2 { + t.Fatalf("expected 2 nodes after duplicate bindings, got %d", len(acc.nodes)) + } + if len(acc.edges) != 1 { + t.Fatalf("expected 1 deduplicated edge, got %d", len(acc.edges)) + } +} diff --git a/backend_go/graph_queries/default.go b/backend_go/graph_queries/default.go index a1acb7d..f862aa1 100644 --- a/backend_go/graph_queries/default.go +++ b/backend_go/graph_queries/default.go @@ -1,6 +1,10 @@ package graph_queries -import "fmt" +import ( + "fmt" + + "visualizador_instanciados/backend_go/queryscope" +) func defaultEdgeQuery(limit int, offset int, includeBNodes bool) string { bnodeFilter := "" @@ -8,30 +12,33 @@ func defaultEdgeQuery(limit int, offset int, includeBNodes bool) string { bnodeFilter = "FILTER(!isBlank(?s) && !isBlank(?o))" } + pattern := queryscope.NamedGraph(` +{ + VALUES ?p { rdf:type } + ?s ?p ?o . +} +UNION +{ + VALUES ?p { rdfs:subClassOf } + ?s ?p ?o . +} +`) + return fmt.Sprintf(` PREFIX rdf: PREFIX rdfs: PREFIX owl: -SELECT ?s ?p ?o +SELECT DISTINCT ?s ?p ?o WHERE { - { - VALUES ?p { rdf:type } - ?s ?p ?o . - ?o rdf:type owl:Class . - } - UNION - { - VALUES ?p { rdfs:subClassOf } - ?s ?p ?o . - } +%s FILTER(!isLiteral(?o)) %s } ORDER BY ?s ?p ?o LIMIT %d OFFSET %d -`, bnodeFilter, limit, offset) +`, pattern, bnodeFilter, limit, offset) } func defaultPredicateQuery(includeBNodes bool) string { @@ -40,6 +47,18 @@ func defaultPredicateQuery(includeBNodes bool) string { bnodeFilter = "FILTER(!isBlank(?s) && !isBlank(?o))" } + pattern := queryscope.NamedGraph(` +{ + VALUES ?p { rdf:type } + ?s ?p ?o . +} +UNION +{ + VALUES ?p { rdfs:subClassOf } + ?s ?p ?o . +} +`) + return fmt.Sprintf(` PREFIX rdf: PREFIX rdfs: @@ -47,19 +66,10 @@ PREFIX owl: SELECT DISTINCT ?p WHERE { - { - VALUES ?p { rdf:type } - ?s ?p ?o . - ?o rdf:type owl:Class . - } - UNION - { - VALUES ?p { rdfs:subClassOf } - ?s ?p ?o . - } +%s FILTER(!isLiteral(?o)) %s } ORDER BY ?p -`, bnodeFilter) +`, pattern, bnodeFilter) } diff --git a/backend_go/graph_queries/hierarchy.go b/backend_go/graph_queries/hierarchy.go index 125aa75..85a4ba9 100644 --- a/backend_go/graph_queries/hierarchy.go +++ b/backend_go/graph_queries/hierarchy.go @@ -1,6 +1,10 @@ package graph_queries -import "fmt" +import ( + "fmt" + + "visualizador_instanciados/backend_go/queryscope" +) func hierarchyEdgeQuery(limit int, offset int, includeBNodes bool) string { bnodeFilter := "" @@ -8,20 +12,31 @@ func hierarchyEdgeQuery(limit int, offset int, includeBNodes bool) string { bnodeFilter = "FILTER(!isBlank(?s) && !isBlank(?o))" } + pattern := queryscope.NamedGraph(` +{ +VALUES ?p { rdfs:subClassOf } +?s ?p ?o . +} +UNION +{ +VALUES ?p { rdf:type } +?s ?p ?o . +} +`) + return fmt.Sprintf(` PREFIX rdfs: -SELECT ?s ?p ?o +SELECT DISTINCT ?s ?p ?o WHERE { - VALUES ?p { rdfs:subClassOf } - ?s ?p ?o . +%s FILTER(!isLiteral(?o)) %s } ORDER BY ?s ?p ?o LIMIT %d OFFSET %d -`, bnodeFilter, limit, offset) +`, pattern, bnodeFilter, limit, offset) } func hierarchyPredicateQuery(includeBNodes bool) string { @@ -30,16 +45,27 @@ func hierarchyPredicateQuery(includeBNodes bool) string { bnodeFilter = "FILTER(!isBlank(?s) && !isBlank(?o))" } + pattern := queryscope.NamedGraph(` +{ +VALUES ?p { rdfs:subClassOf } +?s ?p ?o . +} +UNION +{ +VALUES ?p { rdf:type } +?s ?p ?o . +} +`) + return fmt.Sprintf(` PREFIX rdfs: SELECT DISTINCT ?p WHERE { - VALUES ?p { rdfs:subClassOf } - ?s ?p ?o . +%s FILTER(!isLiteral(?o)) %s } ORDER BY ?p -`, bnodeFilter) +`, pattern, bnodeFilter) } diff --git a/backend_go/graph_queries/named_graph_test.go b/backend_go/graph_queries/named_graph_test.go new file mode 100644 index 0000000..3b128ec --- /dev/null +++ b/backend_go/graph_queries/named_graph_test.go @@ -0,0 +1,49 @@ +package graph_queries + +import ( + "strings" + "testing" +) + +func TestEdgeQueriesUseNamedGraphsAndDistinct(t *testing.T) { + tests := []struct { + name string + query string + }{ + {name: "default", query: defaultEdgeQuery(100, 25, false)}, + {name: "hierarchy", query: hierarchyEdgeQuery(100, 25, false)}, + {name: "types_only", query: typesOnlyEdgeQuery(100, 25, false)}, + } + + for _, tt := range tests { + if !strings.Contains(tt.query, "SELECT DISTINCT ?s ?p ?o") { + t.Fatalf("%s edge query should de-duplicate triples across named graphs:\n%s", tt.name, tt.query) + } + if !strings.Contains(tt.query, "GRAPH ?g") { + t.Fatalf("%s edge query should read from named graphs:\n%s", tt.name, tt.query) + } + if strings.Contains(tt.query, "owl:Class") { + t.Fatalf("%s edge query should no longer require owl:Class declarations:\n%s", tt.name, tt.query) + } + } +} + +func TestPredicateQueriesUseNamedGraphs(t *testing.T) { + tests := []struct { + name string + query string + }{ + {name: "default", query: defaultPredicateQuery(false)}, + {name: "hierarchy", query: hierarchyPredicateQuery(false)}, + {name: "types_only", query: typesOnlyPredicateQuery(false)}, + } + + for _, tt := range tests { + if !strings.Contains(tt.query, "SELECT DISTINCT ?p") { + t.Fatalf("%s predicate query should remain distinct:\n%s", tt.name, tt.query) + } + if !strings.Contains(tt.query, "GRAPH ?g") { + t.Fatalf("%s predicate query should read from named graphs:\n%s", tt.name, tt.query) + } + } +} diff --git a/backend_go/graph_queries/types_only.go b/backend_go/graph_queries/types_only.go index 1082202..099c4db 100644 --- a/backend_go/graph_queries/types_only.go +++ b/backend_go/graph_queries/types_only.go @@ -1,6 +1,10 @@ package graph_queries -import "fmt" +import ( + "fmt" + + "visualizador_instanciados/backend_go/queryscope" +) func typesOnlyEdgeQuery(limit int, offset int, includeBNodes bool) string { bnodeFilter := "" @@ -8,22 +12,25 @@ func typesOnlyEdgeQuery(limit int, offset int, includeBNodes bool) string { bnodeFilter = "FILTER(!isBlank(?s) && !isBlank(?o))" } + pattern := queryscope.NamedGraph(` +VALUES ?p { rdf:type } +?s ?p ?o . +`) + return fmt.Sprintf(` PREFIX rdf: PREFIX owl: -SELECT ?s ?p ?o +SELECT DISTINCT ?s ?p ?o WHERE { - VALUES ?p { rdf:type } - ?s ?p ?o . - ?o rdf:type owl:Class . +%s FILTER(!isLiteral(?o)) %s } ORDER BY ?s ?p ?o LIMIT %d OFFSET %d -`, bnodeFilter, limit, offset) +`, pattern, bnodeFilter, limit, offset) } func typesOnlyPredicateQuery(includeBNodes bool) string { @@ -32,18 +39,21 @@ func typesOnlyPredicateQuery(includeBNodes bool) string { bnodeFilter = "FILTER(!isBlank(?s) && !isBlank(?o))" } + pattern := queryscope.NamedGraph(` +VALUES ?p { rdf:type } +?s ?p ?o . +`) + return fmt.Sprintf(` PREFIX rdf: PREFIX owl: SELECT DISTINCT ?p WHERE { - VALUES ?p { rdf:type } - ?s ?p ?o . - ?o rdf:type owl:Class . +%s FILTER(!isLiteral(?o)) %s } ORDER BY ?p -`, bnodeFilter) +`, pattern, bnodeFilter) } diff --git a/backend_go/graph_snapshot.go b/backend_go/graph_snapshot.go index a35b302..0dcc0e5 100644 --- a/backend_go/graph_snapshot.go +++ b/backend_go/graph_snapshot.go @@ -2,7 +2,6 @@ package main import ( "context" - "encoding/json" "fmt" "log" "runtime" @@ -12,6 +11,7 @@ import ( "time" graphqueries "visualizador_instanciados/backend_go/graph_queries" + "visualizador_instanciados/backend_go/queryscope" ) const ( @@ -56,29 +56,26 @@ func fetchGraphSnapshot( preds, err := func() (*PredicateDict, error) { logStats("predicates_query_start") predQ := def.PredicateQuery(cfg.IncludeBNodes) - t0 := time.Now() - rawPred, err := sparql.Query(ctx, predQ) + var predRes sparqlBindingsResponse[sparqlPredicateBinding] + metrics, err := sparql.QueryJSON(ctx, predQ, &predRes) if err != nil { return nil, fmt.Errorf("predicates query failed: %w", err) } if cfg.LogSnapshotTimings { - log.Printf("[snapshot] predicates_query_returned bytes=%d query_time=%s", len(rawPred), time.Since(t0).Truncate(time.Millisecond)) - } - var predRes sparqlResponse - t1 := time.Now() - if err := json.Unmarshal(rawPred, &predRes); err != nil { - return nil, fmt.Errorf("predicates unmarshal failed: %w", err) - } - if cfg.LogSnapshotTimings { - log.Printf("[snapshot] predicates_unmarshal_done bindings=%d unmarshal_time=%s", len(predRes.Results.Bindings), time.Since(t1).Truncate(time.Millisecond)) + log.Printf( + "[snapshot] predicates_query_done bytes=%d bindings=%d round_trip_time=%s decode_time=%s", + metrics.ResponseBytes, + len(predRes.Results.Bindings), + metrics.RoundTripTime.Truncate(time.Millisecond), + metrics.BodyDecodeTime.Truncate(time.Millisecond), + ) } predicateIRIs := make([]string, 0, len(predRes.Results.Bindings)) for _, b := range predRes.Results.Bindings { - pTerm, ok := b["p"] - if !ok || pTerm.Type != "uri" || pTerm.Value == "" { + if b.P.Type != "uri" || b.P.Value == "" { continue } - predicateIRIs = append(predicateIRIs, pTerm.Value) + predicateIRIs = append(predicateIRIs, b.P.Value) } logStats("predicates_dict_built") return NewPredicateDict(predicateIRIs), nil @@ -102,55 +99,48 @@ func fetchGraphSnapshot( } logStats(fmt.Sprintf("edges_batch_start batch=%d offset=%d limit=%d", batch, offset, limit)) - bindings, err := func() ([]map[string]sparqlTerm, error) { - edgesQ := def.EdgeQuery(limit, offset, cfg.IncludeBNodes) - t0 := time.Now() - raw, err := sparql.Query(ctx, edgesQ) - if err != nil { - return nil, fmt.Errorf("edges query failed: %w", err) - } - if cfg.LogSnapshotTimings { - log.Printf("[snapshot] edges_batch_query_returned batch=%d offset=%d limit=%d bytes=%d query_time=%s", batch, offset, limit, len(raw), time.Since(t0).Truncate(time.Millisecond)) + edgesQ := def.EdgeQuery(limit, offset, cfg.IncludeBNodes) + var batchConvertTime time.Duration + metrics, err := sparql.QueryTripleBindingsStream(ctx, edgesQ, func(binding sparqlTripleBinding) error { + if !cfg.LogSnapshotTimings { + acc.addTripleBinding(binding) + return nil } - var res sparqlResponse - t1 := time.Now() - if err := json.Unmarshal(raw, &res); err != nil { - return nil, fmt.Errorf("edges unmarshal failed: %w", err) - } - if cfg.LogSnapshotTimings { - log.Printf("[snapshot] edges_batch_unmarshal_done batch=%d bindings=%d unmarshal_time=%s", batch, len(res.Results.Bindings), time.Since(t1).Truncate(time.Millisecond)) - } - return res.Results.Bindings, nil - }() + convertStart := time.Now() + acc.addTripleBinding(binding) + batchConvertTime += time.Since(convertStart) + return nil + }) if err != nil { return GraphResponse{}, fmt.Errorf("edges batch=%d offset=%d limit=%d: %w", batch, offset, limit, err) } - got := len(bindings) + got := metrics.BindingCount totalBindings += got if got == 0 { - bindings = nil logStats(fmt.Sprintf("edges_batch_done_empty batch=%d offset=%d", batch, offset)) break } - convT0 := time.Now() - acc.addBindings(bindings) if cfg.LogSnapshotTimings { log.Printf( - "[snapshot] edges_batch_convert_done batch=%d got_bindings=%d total_bindings=%d nodes=%d edges=%d convert_time=%s", + "[snapshot] edges_batch_stream_done batch=%d offset=%d limit=%d bytes=%d got_bindings=%d total_bindings=%d round_trip_time=%s stream_time=%s decode_overhead_time=%s convert_time=%s nodes=%d edges=%d", batch, + offset, + limit, + metrics.ResponseBytes, got, totalBindings, + metrics.RoundTripTime.Truncate(time.Millisecond), + metrics.BodyDecodeTime.Truncate(time.Millisecond), + maxDuration(metrics.BodyDecodeTime-batchConvertTime, 0).Truncate(time.Millisecond), + batchConvertTime.Truncate(time.Millisecond), len(acc.nodes), len(acc.edges), - time.Since(convT0).Truncate(time.Millisecond), ) } - // Make the batch eligible for GC. - bindings = nil logStats(fmt.Sprintf("edges_batch_done batch=%d offset=%d", batch, offset)) if cfg.FreeOSMemoryAfterSnapshot { debug.FreeOSMemory() @@ -165,6 +155,13 @@ func fetchGraphSnapshot( log.Printf("[snapshot] convert_batches_done total_bindings=%d total_time=%s", totalBindings, time.Since(convAllT0).Truncate(time.Millisecond)) } logStats("edges_batched_done") + if totalBindings == 0 { + log.Printf( + "[snapshot] empty_graph_result graph_query_id=%s endpoint=%s hint=app-generated reads now query named graphs only with GRAPH ?g; verify expected triples are present in named graphs and match the graph query shape", + graphQueryID, + cfg.EffectiveSparqlEndpoint(), + ) + } nodes := acc.nodes edges := acc.edges @@ -283,43 +280,26 @@ func fetchRDFSLabels( } batch := iris[i:end] - values := make([]string, 0, len(batch)) - for _, u := range batch { - values = append(values, "<"+u+">") - } + q := rdfsLabelQuery(batch) - q := fmt.Sprintf(` -SELECT ?s ?label -WHERE { - VALUES ?s { %s } - ?s <%s> ?label . -} -`, strings.Join(values, " "), rdfsLabelIRI) - - raw, err := sparql.Query(ctx, q) + var res sparqlBindingsResponse[sparqlLabelBinding] + _, err := sparql.QueryJSON(ctx, q, &res) if err != nil { return nil, err } - var res sparqlResponse - if err := json.Unmarshal(raw, &res); err != nil { - return nil, fmt.Errorf("failed to parse SPARQL JSON: %w", err) - } - for _, b := range res.Results.Bindings { - sTerm, ok := b["s"] - if !ok || sTerm.Value == "" { + if b.S.Value == "" { continue } - lblTerm, ok := b["label"] - if !ok || lblTerm.Type != "literal" || lblTerm.Value == "" { + if b.Label.Type != "literal" || b.Label.Value == "" { continue } - score := labelScore(lblTerm.Lang) - prev, ok := best[sTerm.Value] + score := labelScore(b.Label.Lang) + prev, ok := best[b.S.Value] if !ok || score > prev.score { - best[sTerm.Value] = bestLabel{score: score, value: lblTerm.Value} + best[b.S.Value] = bestLabel{score: score, value: b.Label.Value} } } } @@ -331,6 +311,35 @@ WHERE { return out, nil } +func rdfsLabelQuery(iris []string) string { + if len(iris) == 0 { + return "SELECT ?s ?label WHERE { FILTER(false) }" + } + + values := make([]string, 0, len(iris)) + for _, u := range iris { + if strings.TrimSpace(u) == "" { + continue + } + values = append(values, "<"+u+">") + } + if len(values) == 0 { + return "SELECT ?s ?label WHERE { FILTER(false) }" + } + + pattern := queryscope.NamedGraph(fmt.Sprintf(` +VALUES ?s { %s } +?s <%s> ?label . +`, strings.Join(values, " "), rdfsLabelIRI)) + + return fmt.Sprintf(` +SELECT DISTINCT ?s ?label +WHERE { +%s +} +`, pattern) +} + func labelScore(lang string) int { lang = strings.ToLower(strings.TrimSpace(lang)) if lang == "en" { @@ -357,3 +366,10 @@ func sortIntsUnique(xs []int) []int { } return out } + +func maxDuration(a time.Duration, b time.Duration) time.Duration { + if a > b { + return a + } + return b +} diff --git a/backend_go/graph_snapshot_named_graph_test.go b/backend_go/graph_snapshot_named_graph_test.go new file mode 100644 index 0000000..94ae1fd --- /dev/null +++ b/backend_go/graph_snapshot_named_graph_test.go @@ -0,0 +1,23 @@ +package main + +import ( + "strings" + "testing" +) + +func TestRDFSLabelQueryUsesNamedGraphs(t *testing.T) { + query := rdfsLabelQuery([]string{ + "http://example.com/A", + "http://example.com/B", + }) + + if !strings.Contains(query, "SELECT DISTINCT ?s ?label") { + t.Fatalf("label query should de-duplicate rows across named graphs:\n%s", query) + } + if !strings.Contains(query, "GRAPH ?g") { + t.Fatalf("label query should read from named graphs:\n%s", query) + } + if !strings.Contains(query, "<"+rdfsLabelIRI+">") { + t.Fatalf("label query should still fetch rdfs:label:\n%s", query) + } +} diff --git a/backend_go/keycloak_token.go b/backend_go/keycloak_token.go new file mode 100644 index 0000000..cab53f5 --- /dev/null +++ b/backend_go/keycloak_token.go @@ -0,0 +1,149 @@ +package main + +import ( + "context" + "encoding/json" + "fmt" + "io" + "log" + "net/http" + "net/url" + "strings" + "sync" + "time" +) + +type keycloakTokenResponse struct { + AccessToken string `json:"access_token"` +} + +type keycloakTokenManager struct { + cfg Config + client *http.Client + + mu sync.Mutex + token string + refreshCh chan struct{} + lastErr error +} + +func newKeycloakTokenManager(cfg Config, client *http.Client) *keycloakTokenManager { + return &keycloakTokenManager{ + cfg: cfg, + client: client, + token: strings.TrimSpace(cfg.AccessToken), + } +} + +func (m *keycloakTokenManager) CurrentToken() string { + m.mu.Lock() + defer m.mu.Unlock() + return strings.TrimSpace(m.token) +} + +func (m *keycloakTokenManager) EnsureToken(ctx context.Context, reason string) (string, error) { + if token := m.CurrentToken(); token != "" { + return token, nil + } + return m.Refresh(ctx, reason) +} + +func (m *keycloakTokenManager) Refresh(ctx context.Context, reason string) (string, error) { + m.mu.Lock() + if ch := m.refreshCh; ch != nil { + m.mu.Unlock() + select { + case <-ctx.Done(): + return "", ctx.Err() + case <-ch: + m.mu.Lock() + token := strings.TrimSpace(m.token) + err := m.lastErr + m.mu.Unlock() + if err != nil { + return "", err + } + if token == "" { + return "", fmt.Errorf("keycloak token refresh completed without access_token") + } + return token, nil + } + } + + ch := make(chan struct{}) + m.refreshCh = ch + m.mu.Unlock() + + log.Printf("[auth] keycloak_token_refresh_start reason=%s endpoint=%s", reason, m.cfg.KeycloakTokenEndpoint) + start := time.Now() + token, err := m.fetchToken(ctx) + if err != nil { + log.Printf("[auth] keycloak_token_refresh_failed reason=%s endpoint=%s err=%v", reason, m.cfg.KeycloakTokenEndpoint, err) + } else { + log.Printf( + "[auth] keycloak_token_refresh_ok reason=%s endpoint=%s elapsed=%s", + reason, + m.cfg.KeycloakTokenEndpoint, + time.Since(start).Truncate(time.Millisecond), + ) + } + + m.mu.Lock() + if err == nil { + m.token = token + } + m.lastErr = err + close(ch) + m.refreshCh = nil + currentToken := strings.TrimSpace(m.token) + m.mu.Unlock() + + if err != nil { + return "", err + } + return currentToken, nil +} + +func (m *keycloakTokenManager) fetchToken(ctx context.Context) (string, error) { + form := url.Values{} + form.Set("grant_type", "password") + form.Set("client_id", strings.TrimSpace(m.cfg.KeycloakClientID)) + form.Set("username", strings.TrimSpace(m.cfg.KeycloakUsername)) + form.Set("password", m.cfg.KeycloakPassword) + scope := strings.TrimSpace(m.cfg.KeycloakScope) + if scope != "" { + form.Set("scope", scope) + } + + req, err := http.NewRequestWithContext(ctx, http.MethodPost, strings.TrimSpace(m.cfg.KeycloakTokenEndpoint), strings.NewReader(form.Encode())) + if err != nil { + return "", err + } + req.Header.Set("Content-Type", "application/x-www-form-urlencoded") + req.Header.Set("Accept", "application/json") + + resp, err := m.client.Do(req) + if err != nil { + return "", err + } + defer resp.Body.Close() + + body, err := io.ReadAll(resp.Body) + if err != nil { + return "", err + } + if resp.StatusCode < 200 || resp.StatusCode >= 300 { + return "", fmt.Errorf("keycloak token request failed: %s: %s", resp.Status, strings.TrimSpace(string(body))) + } + + var tokenResp keycloakTokenResponse + if err := json.Unmarshal(body, &tokenResp); err != nil { + return "", fmt.Errorf("keycloak token parse failed: %w", err) + } + + token := strings.TrimSpace(tokenResp.AccessToken) + if token == "" { + return "", fmt.Errorf("keycloak token response missing access_token") + } + return token, nil +} diff --git a/backend_go/keycloak_token_test.go b/backend_go/keycloak_token_test.go new file mode 100644 index 0000000..55ff730 --- /dev/null +++ b/backend_go/keycloak_token_test.go @@ -0,0 +1,273 @@ +package main + +import ( + "context" + "io" + "net/http" + "net/http/httptest" + "sync" + "sync/atomic" + "testing" + "time" +) + +func TestKeycloakTokenManagerFetchTokenParsesAccessToken(t *testing.T) { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.URL.Path != "/token" { + http.NotFound(w, r) + return + } + if got := r.Header.Get("Content-Type"); got != "application/x-www-form-urlencoded" { + t.Errorf("unexpected content-type: %s", got) + w.WriteHeader(http.StatusInternalServerError) + return + } + _, _ = io.WriteString(w, `{"access_token":"fresh-token"}`) + })) + defer server.Close() + + cfg := Config{ + KeycloakTokenEndpoint: server.URL + "/token", + KeycloakClientID: "anzograph", + KeycloakUsername: "user", + KeycloakPassword: "pass", + KeycloakScope: "openid", + } + manager := newKeycloakTokenManager(cfg, server.Client()) + + token, err := manager.fetchToken(context.Background()) + if err != nil { + t.Fatalf("fetchToken returned error: %v", err) + } + if token != "fresh-token" { + t.Fatalf("expected fresh-token, got %q", token) + } +} + +func TestAnzoGraphClientStartupFetchesFreshToken(t *testing.T) { + var tokenCalls atomic.Int32 + + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + switch r.URL.Path { + case "/token": + tokenCalls.Add(1) + _, _ = io.WriteString(w, `{"access_token":"startup-token"}`) + case "/sparql": + if got := r.Header.Get("Authorization"); got != "Bearer startup-token" { + t.Errorf("expected startup bearer token, got %q", got) + w.WriteHeader(http.StatusInternalServerError) + return + } + _, _ = io.WriteString(w, `{"head":{},"boolean":true}`) + default: + http.NotFound(w, r) + } + })) + defer server.Close() + + cfg := Config{ + SparqlSourceMode: "external", + ExternalSparqlEndpoint: server.URL + "/sparql", + KeycloakTokenEndpoint: server.URL + "/token", + KeycloakClientID: "anzograph", + KeycloakUsername: "user", + KeycloakPassword: "pass", + KeycloakScope: "openid", + SparqlReadyTimeout: 2 * time.Second, + SparqlReadyRetries: 1, + SparqlReadyDelay: 1 * time.Millisecond, + SparqlTimeout: 2 * time.Second, + } + + client := NewAnzoGraphClient(cfg) + client.client = server.Client() + client.tokenManager.client = server.Client() + + if err := client.Startup(context.Background()); err != nil { + t.Fatalf("Startup returned error: %v", err) + } + if tokenCalls.Load() != 1 { + t.Fatalf("expected 1 startup token request, got %d", tokenCalls.Load()) + } +} + +func TestQueryRetriesOnceWhenJWTExpires(t *testing.T) { + var tokenCalls atomic.Int32 + + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + switch r.URL.Path { + case "/token": + call := tokenCalls.Add(1) + if call != 1 { + t.Errorf("expected exactly 1 refresh call, got %d", call) + w.WriteHeader(http.StatusInternalServerError) + return + } + _, _ = io.WriteString(w, `{"access_token":"fresh-token"}`) + case "/sparql": + switch r.Header.Get("Authorization") { + case "Bearer expired-token": + w.WriteHeader(http.StatusUnauthorized) + _, _ = io.WriteString(w, "Jwt is expired") + case "Bearer fresh-token": + _, _ = io.WriteString(w, `{"results":{"bindings":[{"s":{"type":"uri","value":"http://example.com/s"},"p":{"type":"uri","value":"http://example.com/p"},"o":{"type":"uri","value":"http://example.com/o"}}]}}`) + default: + t.Errorf("unexpected authorization header %q", r.Header.Get("Authorization")) + w.WriteHeader(http.StatusInternalServerError) + return + } + default: + http.NotFound(w, r) + } + })) + defer server.Close() + + cfg := Config{ + SparqlSourceMode: "external", + ExternalSparqlEndpoint: server.URL + "/sparql", + KeycloakTokenEndpoint: server.URL + "/token", + KeycloakClientID: "anzograph", + KeycloakUsername: "user", + KeycloakPassword: "pass", + KeycloakScope: "openid", + SparqlTimeout: 2 * time.Second, + } + + client := NewAnzoGraphClient(cfg) + client.client = server.Client() + client.tokenManager.client = server.Client() + client.tokenManager.token = "expired-token" + + raw, err := client.Query(context.Background(), "SELECT ?s ?p ?o WHERE { ?s ?p ?o }") + if err != nil { + t.Fatalf("Query returned error: %v", err) + } + if string(raw) == "" { + t.Fatalf("expected successful response body after refresh") + } + if tokenCalls.Load() != 1 { + t.Fatalf("expected 1 refresh call, got %d", tokenCalls.Load()) + } +} + +func TestQueryDoesNotRefreshForNonExpiry401(t *testing.T) { + var tokenCalls atomic.Int32 + + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + switch r.URL.Path { + case "/token": + tokenCalls.Add(1) + _, _ = io.WriteString(w, `{"access_token":"fresh-token"}`) + case "/sparql": + w.WriteHeader(http.StatusUnauthorized) + _, _ = io.WriteString(w, "RBAC: access denied") + default: + http.NotFound(w, r) + } + })) + defer server.Close() + + cfg := Config{ + SparqlSourceMode: "external", + ExternalSparqlEndpoint: server.URL + "/sparql", + KeycloakTokenEndpoint: server.URL + "/token", + KeycloakClientID: "anzograph", + KeycloakUsername: "user", + KeycloakPassword: "pass", + KeycloakScope: "openid", + SparqlTimeout: 2 * time.Second, + } + + client := NewAnzoGraphClient(cfg) + client.client = server.Client() + client.tokenManager.client = server.Client() + client.tokenManager.token = "still-bad-token" + + _, err := client.Query(context.Background(), "SELECT ?s ?p ?o WHERE { ?s ?p ?o }") + if err == nil { + t.Fatalf("expected non-expiry 401 to fail") + } + if tokenCalls.Load() != 0 { + t.Fatalf("expected no token refresh for non-expiry 401, got %d", tokenCalls.Load()) + } +} + +func TestConcurrentExpiredQueriesShareOneRefresh(t *testing.T) { + var tokenCalls atomic.Int32 + var sparqlCalls atomic.Int32 + var mu sync.Mutex + seenFresh := 0 + + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + switch r.URL.Path { + case "/token": + tokenCalls.Add(1) + time.Sleep(50 * time.Millisecond) + _, _ = io.WriteString(w, `{"access_token":"fresh-token"}`) + case "/sparql": + sparqlCalls.Add(1) + switch r.Header.Get("Authorization") { + case "Bearer expired-token": + w.WriteHeader(http.StatusUnauthorized) + _, _ = io.WriteString(w, "Jwt is expired") + case "Bearer fresh-token": + mu.Lock() + seenFresh++ + mu.Unlock() + _, _ = io.WriteString(w, `{"head":{},"boolean":true}`) + default: + t.Errorf("unexpected authorization header %q", r.Header.Get("Authorization")) + w.WriteHeader(http.StatusInternalServerError) + return + } + default: + http.NotFound(w, r) + } + })) + defer server.Close() + + cfg := Config{ + SparqlSourceMode: "external", + ExternalSparqlEndpoint: server.URL + "/sparql", + KeycloakTokenEndpoint: server.URL + "/token", + KeycloakClientID: "anzograph", + KeycloakUsername: "user", + KeycloakPassword: "pass", + KeycloakScope: "openid", + SparqlTimeout: 2 * time.Second, + } + + client := NewAnzoGraphClient(cfg) + client.client = server.Client() + client.tokenManager.client = server.Client() + client.tokenManager.token = "expired-token" + + const workers = 5 + var wg sync.WaitGroup + errs := make(chan error, workers) + for i := 0; i < workers; i++ { + wg.Add(1) + go func() { + defer wg.Done() + _, err := client.Query(context.Background(), "ASK WHERE { ?s ?p ?o }") + errs <- err + }() + } + wg.Wait() + close(errs) + + for err := range errs { + if err != nil { + t.Fatalf("concurrent query returned error: %v", err) + } + } + if tokenCalls.Load() != 1 { + t.Fatalf("expected exactly 1 shared refresh, got %d", tokenCalls.Load()) + } + if seenFresh != workers { + t.Fatalf("expected %d successful retried queries, got %d", workers, seenFresh) + } + if sparqlCalls.Load() < workers*2 { + t.Fatalf("expected each worker to hit sparql before and after refresh, got %d calls", sparqlCalls.Load()) + } +} diff --git a/backend_go/queryscope/scope.go b/backend_go/queryscope/scope.go new file mode 100644 index 0000000..6e91741 --- /dev/null +++ b/backend_go/queryscope/scope.go @@ -0,0 +1,25 @@ +package queryscope + +import "strings" + +// NamedGraph wraps a read pattern so app-generated queries read from any named graph via GRAPH ?g. +func NamedGraph(pattern string) string { + trimmed := strings.TrimSpace(pattern) + if trimmed == "" { + return " GRAPH ?g {\n }" + } + + return indent("GRAPH ?g {\n"+indent(trimmed, " ")+"\n}", " ") +} + +func AskAnyTripleQuery() string { + return "ASK WHERE {\n" + NamedGraph("?s ?p ?o .") + "\n}" +} + +func indent(text string, prefix string) string { + lines := strings.Split(text, "\n") + for i, line := range lines { + lines[i] = prefix + line + } + return strings.Join(lines, "\n") +} diff --git a/backend_go/selection_queries/named_graph_test.go b/backend_go/selection_queries/named_graph_test.go new file mode 100644 index 0000000..9a1e0d4 --- /dev/null +++ b/backend_go/selection_queries/named_graph_test.go @@ -0,0 +1,33 @@ +package selection_queries + +import ( + "strings" + "testing" +) + +func TestSelectionQueriesUseNamedGraphs(t *testing.T) { + selected := []NodeRef{ + {ID: 1, TermType: "uri", IRI: "http://example.com/A"}, + } + + tests := []struct { + name string + query string + }{ + {name: "neighbors", query: neighborsQuery(selected, false)}, + {name: "superclasses", query: superclassesQuery(selected, false)}, + {name: "subclasses", query: subclassesQuery(selected, false)}, + } + + for _, tt := range tests { + if !strings.Contains(tt.query, "SELECT DISTINCT ?s ?p ?o") { + t.Fatalf("%s query should de-duplicate triples across named graphs:\n%s", tt.name, tt.query) + } + if !strings.Contains(tt.query, "GRAPH ?g") { + t.Fatalf("%s query should read from named graphs:\n%s", tt.name, tt.query) + } + if strings.Contains(tt.query, "owl:Class") { + t.Fatalf("%s query should no longer depend on owl:Class:\n%s", tt.name, tt.query) + } + } +} diff --git a/backend_go/selection_queries/neighbors.go b/backend_go/selection_queries/neighbors.go index 4722466..7f0c55b 100644 --- a/backend_go/selection_queries/neighbors.go +++ b/backend_go/selection_queries/neighbors.go @@ -4,6 +4,8 @@ import ( "context" "fmt" "strings" + + "visualizador_instanciados/backend_go/queryscope" ) func neighborsQuery(selectedNodes []NodeRef, includeBNodes bool) string { @@ -26,6 +28,36 @@ func neighborsQuery(selectedNodes []NodeRef, includeBNodes bool) string { } values := strings.Join(valuesTerms, " ") + pattern := queryscope.NamedGraph(fmt.Sprintf(` +{ + VALUES ?sel { %s } + BIND(?sel AS ?s) + VALUES ?p { rdf:type } + ?s ?p ?o . +} +UNION +{ + VALUES ?sel { %s } + VALUES ?p { rdf:type } + ?s ?p ?sel . + BIND(?sel AS ?o) +} +UNION +{ + VALUES ?sel { %s } + BIND(?sel AS ?s) + VALUES ?p { rdfs:subClassOf } + ?s ?p ?o . +} +UNION +{ + VALUES ?sel { %s } + VALUES ?p { rdfs:subClassOf } + ?s ?p ?sel . + BIND(?sel AS ?o) +} +`, values, values, values, values)) + return fmt.Sprintf(` PREFIX rdf: PREFIX rdfs: @@ -33,40 +65,12 @@ PREFIX owl: SELECT DISTINCT ?s ?p ?o WHERE { - { - VALUES ?sel { %s } - BIND(?sel AS ?s) - VALUES ?p { rdf:type } - ?s ?p ?o . - ?o rdf:type owl:Class . - } - UNION - { - VALUES ?sel { %s } - VALUES ?p { rdf:type } - ?s ?p ?sel . - ?sel rdf:type owl:Class . - BIND(?sel AS ?o) - } - UNION - { - VALUES ?sel { %s } - BIND(?sel AS ?s) - VALUES ?p { rdfs:subClassOf } - ?s ?p ?o . - } - UNION - { - VALUES ?sel { %s } - VALUES ?p { rdfs:subClassOf } - ?s ?p ?sel . - BIND(?sel AS ?o) - } +%s FILTER(!isLiteral(?o)) FILTER(?s != ?o) %s } -`, values, values, values, values, bnodeFilter) +`, pattern, bnodeFilter) } func runNeighbors(ctx context.Context, q Querier, idx Index, selectedIDs []uint32, includeBNodes bool) (Result, error) { diff --git a/backend_go/selection_queries/subclasses.go b/backend_go/selection_queries/subclasses.go index fba0b1a..809f85e 100644 --- a/backend_go/selection_queries/subclasses.go +++ b/backend_go/selection_queries/subclasses.go @@ -4,6 +4,8 @@ import ( "context" "fmt" "strings" + + "visualizador_instanciados/backend_go/queryscope" ) func subclassesQuery(selectedNodes []NodeRef, includeBNodes bool) string { @@ -26,20 +28,33 @@ func subclassesQuery(selectedNodes []NodeRef, includeBNodes bool) string { } values := strings.Join(valuesTerms, " ") + pattern := queryscope.NamedGraph(fmt.Sprintf(` +{ +VALUES ?sel { %s } +VALUES ?p { rdfs:subClassOf } +?s ?p ?sel . +BIND(?sel AS ?o) +} +UNION +{ +VALUES ?sel { %s } +VALUES ?p { } +?s ?p ?sel . +BIND(?sel AS ?o) +} +`, values, values)) + return fmt.Sprintf(` PREFIX rdfs: SELECT DISTINCT ?s ?p ?o WHERE { - VALUES ?sel { %s } - VALUES ?p { rdfs:subClassOf } - ?s ?p ?sel . - BIND(?sel AS ?o) +%s FILTER(!isLiteral(?o)) FILTER(?s != ?o) %s } -`, values, bnodeFilter) +`, pattern, bnodeFilter) } func runSubclasses(ctx context.Context, q Querier, idx Index, selectedIDs []uint32, includeBNodes bool) (Result, error) { diff --git a/backend_go/selection_queries/superclasses.go b/backend_go/selection_queries/superclasses.go index c4c2220..d9510d4 100644 --- a/backend_go/selection_queries/superclasses.go +++ b/backend_go/selection_queries/superclasses.go @@ -4,6 +4,8 @@ import ( "context" "fmt" "strings" + + "visualizador_instanciados/backend_go/queryscope" ) func superclassesQuery(selectedNodes []NodeRef, includeBNodes bool) string { @@ -26,20 +28,33 @@ func superclassesQuery(selectedNodes []NodeRef, includeBNodes bool) string { } values := strings.Join(valuesTerms, " ") + pattern := queryscope.NamedGraph(fmt.Sprintf(` +{ +VALUES ?sel { %s } +BIND(?sel AS ?s) +VALUES ?p { rdfs:subClassOf } +?s ?p ?o . +} +UNION +{ +VALUES ?sel { %s } +BIND(?sel AS ?s) +VALUES ?p { } +?s ?p ?o . +} +`, values, values)) + return fmt.Sprintf(` PREFIX rdfs: SELECT DISTINCT ?s ?p ?o WHERE { - VALUES ?sel { %s } - BIND(?sel AS ?s) - VALUES ?p { rdfs:subClassOf } - ?s ?p ?o . +%s FILTER(!isLiteral(?o)) FILTER(?s != ?o) %s } -`, values, bnodeFilter) +`, pattern, bnodeFilter) } func runSuperclasses(ctx context.Context, q Querier, idx Index, selectedIDs []uint32, includeBNodes bool) (Result, error) { diff --git a/backend_go/snapshot_service.go b/backend_go/snapshot_service.go index 99e2896..64c4eba 100644 --- a/backend_go/snapshot_service.go +++ b/backend_go/snapshot_service.go @@ -2,6 +2,7 @@ package main import ( "context" + "log" "sync" ) @@ -24,6 +25,8 @@ type GraphSnapshotService struct { sparql *AnzoGraphClient cfg Config + fetchSnapshot func(context.Context, *AnzoGraphClient, Config, int, int, string) (GraphResponse, error) + mu sync.Mutex cache map[snapshotKey]GraphResponse inflight map[snapshotKey]*snapshotInflight @@ -31,10 +34,11 @@ type GraphSnapshotService struct { func NewGraphSnapshotService(sparql *AnzoGraphClient, cfg Config) *GraphSnapshotService { return &GraphSnapshotService{ - sparql: sparql, - cfg: cfg, - cache: make(map[snapshotKey]GraphResponse), - inflight: make(map[snapshotKey]*snapshotInflight), + sparql: sparql, + cfg: cfg, + fetchSnapshot: fetchGraphSnapshot, + cache: make(map[snapshotKey]GraphResponse), + inflight: make(map[snapshotKey]*snapshotInflight), } } @@ -69,7 +73,20 @@ func (s *GraphSnapshotService) Get(ctx context.Context, nodeLimit int, edgeLimit s.inflight[key] = inf s.mu.Unlock() - snap, err := fetchGraphSnapshot(ctx, s.sparql, s.cfg, nodeLimit, edgeLimit, graphQueryID) + log.Printf("[snapshot] build_start graph_query_id=%s node_limit=%d edge_limit=%d detached=true", graphQueryID, nodeLimit, edgeLimit) + go s.buildSnapshotInBackground(key, inf, nodeLimit, edgeLimit, graphQueryID) + + select { + case <-ctx.Done(): + log.Printf("[snapshot] requester_canceled graph_query_id=%s node_limit=%d edge_limit=%d err=%v build_continues=true", graphQueryID, nodeLimit, edgeLimit, ctx.Err()) + return GraphResponse{}, ctx.Err() + case <-inf.ready: + return inf.snapshot, inf.err + } +} + +func (s *GraphSnapshotService) buildSnapshotInBackground(key snapshotKey, inf *snapshotInflight, nodeLimit int, edgeLimit int, graphQueryID string) { + snap, err := s.fetchSnapshot(context.Background(), s.sparql, s.cfg, nodeLimit, edgeLimit, graphQueryID) s.mu.Lock() inf.snapshot = snap @@ -81,5 +98,9 @@ func (s *GraphSnapshotService) Get(ctx context.Context, nodeLimit int, edgeLimit close(inf.ready) s.mu.Unlock() - return snap, err + if err != nil { + log.Printf("[snapshot] build_done graph_query_id=%s node_limit=%d edge_limit=%d detached=true cached=false err=%v", graphQueryID, nodeLimit, edgeLimit, err) + return + } + log.Printf("[snapshot] build_done graph_query_id=%s node_limit=%d edge_limit=%d detached=true cached=true", graphQueryID, nodeLimit, edgeLimit) } diff --git a/backend_go/snapshot_service_test.go b/backend_go/snapshot_service_test.go new file mode 100644 index 0000000..e6ebd46 --- /dev/null +++ b/backend_go/snapshot_service_test.go @@ -0,0 +1,97 @@ +package main + +import ( + "context" + "errors" + "sync/atomic" + "testing" + "time" +) + +func TestSnapshotBuildContinuesAfterRequesterCancellation(t *testing.T) { + svc := NewGraphSnapshotService(nil, Config{}) + + var fetchCalls atomic.Int32 + started := make(chan struct{}) + release := make(chan struct{}) + expected := GraphResponse{ + Nodes: []Node{{ID: 1}}, + Edges: []Edge{{Source: 1, Target: 1, PredicateID: 0}}, + Meta: &GraphMeta{GraphQueryID: "default", Nodes: 1, Edges: 1}, + } + + svc.fetchSnapshot = func(ctx context.Context, _ *AnzoGraphClient, _ Config, nodeLimit int, edgeLimit int, graphQueryID string) (GraphResponse, error) { + fetchCalls.Add(1) + if nodeLimit != 10 || edgeLimit != 20 || graphQueryID != "default" { + t.Fatalf("unexpected fetch args nodeLimit=%d edgeLimit=%d graphQueryID=%s", nodeLimit, edgeLimit, graphQueryID) + } + close(started) + <-release + return expected, nil + } + + ctx1, cancel1 := context.WithCancel(context.Background()) + defer cancel1() + + firstErrCh := make(chan error, 1) + go func() { + _, err := svc.Get(ctx1, 10, 20, "default") + firstErrCh <- err + }() + + <-started + cancel1() + + select { + case err := <-firstErrCh: + if !errors.Is(err, context.Canceled) { + t.Fatalf("first Get error = %v, want context.Canceled", err) + } + case <-time.After(2 * time.Second): + t.Fatal("timed out waiting for first Get to return after cancellation") + } + + secondSnapCh := make(chan GraphResponse, 1) + secondErrCh := make(chan error, 1) + go func() { + snap, err := svc.Get(context.Background(), 10, 20, "default") + if err != nil { + secondErrCh <- err + return + } + secondSnapCh <- snap + }() + + time.Sleep(50 * time.Millisecond) + if got := fetchCalls.Load(); got != 1 { + t.Fatalf("fetchCalls after second waiter start = %d, want 1", got) + } + + close(release) + + select { + case err := <-secondErrCh: + t.Fatalf("second Get error = %v", err) + case snap := <-secondSnapCh: + if snap.Meta == nil || snap.Meta.Nodes != expected.Meta.Nodes || snap.Meta.Edges != expected.Meta.Edges { + t.Fatalf("second Get snapshot meta = %#v, want %#v", snap.Meta, expected.Meta) + } + case <-time.After(2 * time.Second): + t.Fatal("timed out waiting for second Get to return") + } + + if got := fetchCalls.Load(); got != 1 { + t.Fatalf("fetchCalls after background completion = %d, want 1", got) + } + + snap, err := svc.Get(context.Background(), 10, 20, "default") + if err != nil { + t.Fatalf("cached Get error = %v", err) + } + if snap.Meta == nil || snap.Meta.Nodes != expected.Meta.Nodes || snap.Meta.Edges != expected.Meta.Edges { + t.Fatalf("cached Get snapshot meta = %#v, want %#v", snap.Meta, expected.Meta) + } + if got := fetchCalls.Load(); got != 1 { + t.Fatalf("fetchCalls after cached Get = %d, want 1", got) + } +} diff --git a/backend_go/sparql.go b/backend_go/sparql.go index a51e8d7..2c3bcdf 100644 --- a/backend_go/sparql.go +++ b/backend_go/sparql.go @@ -5,50 +5,68 @@ import ( "encoding/base64" "fmt" "io" + "log" "net/http" - "net/url" "strings" "time" + + "visualizador_instanciados/backend_go/queryscope" ) type AnzoGraphClient struct { - cfg Config - endpoint string - authHeader string - client *http.Client + cfg Config + endpoint string + basicAuthHeader string + client *http.Client + tokenManager *keycloakTokenManager } func NewAnzoGraphClient(cfg Config) *AnzoGraphClient { endpoint := cfg.EffectiveSparqlEndpoint() - authHeader := "" - user := strings.TrimSpace(cfg.SparqlUser) - pass := strings.TrimSpace(cfg.SparqlPass) - if user != "" && pass != "" { - token := base64.StdEncoding.EncodeToString([]byte(user + ":" + pass)) - authHeader = "Basic " + token + client := &http.Client{} + basicAuthHeader := "" + if !cfg.UsesExternalSparql() { + user := strings.TrimSpace(cfg.SparqlUser) + pass := strings.TrimSpace(cfg.SparqlPass) + if user != "" && pass != "" { + token := base64.StdEncoding.EncodeToString([]byte(user + ":" + pass)) + basicAuthHeader = "Basic " + token + } } - return &AnzoGraphClient{ - cfg: cfg, - endpoint: endpoint, - authHeader: authHeader, - client: &http.Client{}, + agc := &AnzoGraphClient{ + cfg: cfg, + endpoint: endpoint, + basicAuthHeader: basicAuthHeader, + client: client, } + if cfg.UsesExternalSparql() { + agc.tokenManager = newKeycloakTokenManager(cfg, client) + } + return agc } func (c *AnzoGraphClient) Startup(ctx context.Context) error { + log.Printf( + "[sparql] startup source_mode=%s endpoint=%s auth_mode=%s load_on_start=%t", + c.cfg.SparqlSourceMode, + c.endpoint, + c.authMode(), + c.cfg.SparqlLoadOnStart, + ) + + if c.cfg.UsesExternalSparql() { + tokenCtx, cancel := context.WithTimeout(ctx, c.cfg.SparqlReadyTimeout) + defer cancel() + if _, err := c.refreshExternalToken(tokenCtx, "startup"); err != nil { + return fmt.Errorf("keycloak startup token fetch failed: %w", err) + } + } + if err := c.waitReady(ctx); err != nil { return err } - - if c.cfg.SparqlClearOnStart { - if err := c.update(ctx, "CLEAR ALL"); err != nil { - return err - } - if err := c.waitReady(ctx); err != nil { - return err - } - } + c.logNamedGraphDatasetProbe(ctx, "startup_initial") if c.cfg.SparqlLoadOnStart { df := strings.TrimSpace(c.cfg.SparqlDataFile) @@ -68,6 +86,7 @@ func (c *AnzoGraphClient) Startup(ctx context.Context) error { if err := c.waitReady(ctx); err != nil { return err } + c.logNamedGraphDatasetProbe(ctx, "startup_post_load") } return nil @@ -83,23 +102,7 @@ func (c *AnzoGraphClient) Query(ctx context.Context, query string) ([]byte, erro } func (c *AnzoGraphClient) queryWithTimeout(ctx context.Context, query string, timeout time.Duration) ([]byte, error) { - ctx2, cancel := context.WithTimeout(ctx, timeout) - defer cancel() - - form := url.Values{} - form.Set("query", query) - - req, err := http.NewRequestWithContext(ctx2, http.MethodPost, c.endpoint, strings.NewReader(form.Encode())) - if err != nil { - return nil, err - } - req.Header.Set("Content-Type", "application/x-www-form-urlencoded") - req.Header.Set("Accept", "application/sparql-results+json") - if c.authHeader != "" { - req.Header.Set("Authorization", c.authHeader) - } - - resp, err := c.client.Do(req) + resp, _, err := c.queryRequestWithTimeout(ctx, query, timeout) if err != nil { return nil, err } @@ -109,9 +112,6 @@ func (c *AnzoGraphClient) queryWithTimeout(ctx context.Context, query string, ti if err != nil { return nil, err } - if resp.StatusCode < 200 || resp.StatusCode >= 300 { - return nil, fmt.Errorf("sparql query failed: %s: %s", resp.Status, strings.TrimSpace(string(body))) - } return body, nil } @@ -125,8 +125,12 @@ func (c *AnzoGraphClient) update(ctx context.Context, update string) error { } req.Header.Set("Content-Type", "application/sparql-update") req.Header.Set("Accept", "application/json") - if c.authHeader != "" { - req.Header.Set("Authorization", c.authHeader) + authHeader, err := c.authorizationHeader(ctx2, "sparql_update") + if err != nil { + return err + } + if authHeader != "" { + req.Header.Set("Authorization", authHeader) } resp, err := c.client.Do(req) @@ -144,6 +148,13 @@ func (c *AnzoGraphClient) update(ctx context.Context, update string) error { func (c *AnzoGraphClient) waitReady(ctx context.Context) error { var lastErr error + log.Printf( + "[sparql] readiness_wait_start endpoint=%s retries=%d timeout=%s delay=%s query_scope=named_graphs", + c.endpoint, + c.cfg.SparqlReadyRetries, + c.cfg.SparqlReadyTimeout, + c.cfg.SparqlReadyDelay, + ) for i := 0; i < c.cfg.SparqlReadyRetries; i++ { select { case <-ctx.Done(): @@ -154,16 +165,73 @@ func (c *AnzoGraphClient) waitReady(ctx context.Context) error { default: } - body, err := c.queryWithTimeout(ctx, "ASK WHERE { ?s ?p ?o }", c.cfg.SparqlReadyTimeout) + var ask sparqlBooleanResponse + _, err := c.queryJSONWithTimeout(ctx, namedGraphAnyTripleAskQuery(), c.cfg.SparqlReadyTimeout, &ask) if err == nil { - // Ensure it's JSON, not HTML/text during boot. - if strings.HasPrefix(strings.TrimSpace(string(body)), "{") { - return nil - } - err = fmt.Errorf("unexpected readiness response: %s", strings.TrimSpace(string(body))) + log.Printf("[sparql] readiness_wait_ok endpoint=%s attempt=%d/%d", c.endpoint, i+1, c.cfg.SparqlReadyRetries) + return nil } lastErr = err + log.Printf("[sparql] readiness_wait_retry endpoint=%s attempt=%d/%d err=%v", c.endpoint, i+1, c.cfg.SparqlReadyRetries, err) time.Sleep(c.cfg.SparqlReadyDelay) } return fmt.Errorf("anzograph not ready at %s: %w", c.endpoint, lastErr) } + +func namedGraphAnyTripleAskQuery() string { + return queryscope.AskAnyTripleQuery() +} + +func (c *AnzoGraphClient) authMode() string { + switch { + case c.cfg.UsesExternalSparql(): + return "bearer" + case strings.HasPrefix(c.basicAuthHeader, "Basic "): + return "basic" + default: + return "none" + } +} + +func (c *AnzoGraphClient) authorizationHeader(ctx context.Context, reason string) (string, error) { + if !c.cfg.UsesExternalSparql() { + return c.basicAuthHeader, nil + } + if c.tokenManager == nil { + return "", fmt.Errorf("external sparql mode is enabled but token manager is not configured") + } + token, err := c.tokenManager.EnsureToken(ctx, reason) + if err != nil { + return "", err + } + return "Bearer " + token, nil +} + +func (c *AnzoGraphClient) refreshExternalToken(ctx context.Context, reason string) (string, error) { + if !c.cfg.UsesExternalSparql() { + return "", nil + } + if c.tokenManager == nil { + return "", fmt.Errorf("external sparql mode is enabled but token manager is not configured") + } + return c.tokenManager.Refresh(ctx, reason) +} + +func (c *AnzoGraphClient) logNamedGraphDatasetProbe(ctx context.Context, stage string) { + var ask sparqlBooleanResponse + metrics, err := c.queryJSONWithTimeout(ctx, namedGraphAnyTripleAskQuery(), c.cfg.SparqlReadyTimeout, &ask) + if err != nil { + log.Printf("[sparql] dataset_probe_failed stage=%s endpoint=%s err=%v", stage, c.endpoint, err) + return + } + + log.Printf( + "[sparql] dataset_probe stage=%s endpoint=%s named_graph_has_triples=%t bytes=%d round_trip_time=%s decode_time=%s", + stage, + c.endpoint, + ask.Boolean, + metrics.ResponseBytes, + metrics.RoundTripTime.Truncate(time.Millisecond), + metrics.BodyDecodeTime.Truncate(time.Millisecond), + ) +} diff --git a/backend_go/sparql_decode.go b/backend_go/sparql_decode.go new file mode 100644 index 0000000..0ab87c3 --- /dev/null +++ b/backend_go/sparql_decode.go @@ -0,0 +1,341 @@ +package main + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "io" + "log" + "net/http" + "net/url" + "strings" + "time" +) + +type sparqlQueryMetrics struct { + ResponseBytes int64 + RoundTripTime time.Duration + BodyDecodeTime time.Duration + BindingCount int +} + +type countingReadCloser struct { + io.ReadCloser + bytesRead int64 +} + +func (c *countingReadCloser) Read(p []byte) (int, error) { + n, err := c.ReadCloser.Read(p) + c.bytesRead += int64(n) + return n, err +} + +type cancelOnCloseReadCloser struct { + io.ReadCloser + cancel context.CancelFunc +} + +func (c *cancelOnCloseReadCloser) Close() error { + err := c.ReadCloser.Close() + c.cancel() + return err +} + +type sparqlHTTPStatusError struct { + StatusCode int + Status string + Body string +} + +func (e *sparqlHTTPStatusError) Error() string { + return fmt.Sprintf("sparql query failed: %s: %s", e.Status, e.Body) +} + +func (c *AnzoGraphClient) queryRequestWithTimeout(ctx context.Context, query string, timeout time.Duration) (*http.Response, time.Duration, error) { + ctx2, cancel := context.WithTimeout(ctx, timeout) + resp, roundTripTime, err := c.queryRequest(ctx2, query, true) + if err != nil { + cancel() + return nil, roundTripTime, err + } + resp.Body = &cancelOnCloseReadCloser{ReadCloser: resp.Body, cancel: cancel} + return resp, roundTripTime, nil +} + +func (c *AnzoGraphClient) queryRequest(ctx context.Context, query string, allowRefresh bool) (*http.Response, time.Duration, error) { + resp, roundTripTime, err := c.queryRequestAttempt(ctx, query) + if err == nil { + return resp, roundTripTime, nil + } + + var statusErr *sparqlHTTPStatusError + if allowRefresh && errors.As(err, &statusErr) && c.shouldRefreshExpiredJWT(statusErr) { + log.Printf("[auth] sparql_token_retry endpoint=%s reason=jwt_expired", c.endpoint) + if _, refreshErr := c.refreshExternalToken(ctx, "sparql_jwt_expired"); refreshErr != nil { + return nil, roundTripTime, fmt.Errorf("%w (token refresh failed: %v)", statusErr, refreshErr) + } + + retryResp, retryRoundTripTime, retryErr := c.queryRequest(ctx, query, false) + return retryResp, roundTripTime + retryRoundTripTime, retryErr + } + + return nil, roundTripTime, err +} + +func (c *AnzoGraphClient) queryRequestAttempt(ctx context.Context, query string) (*http.Response, time.Duration, error) { + form := url.Values{} + form.Set("query", query) + + req, err := http.NewRequestWithContext(ctx, http.MethodPost, c.endpoint, strings.NewReader(form.Encode())) + if err != nil { + return nil, 0, err + } + req.Header.Set("Content-Type", "application/x-www-form-urlencoded") + req.Header.Set("Accept", "application/sparql-results+json") + authHeader, err := c.authorizationHeader(ctx, "sparql_query") + if err != nil { + return nil, 0, err + } + if authHeader != "" { + req.Header.Set("Authorization", authHeader) + } + + start := time.Now() + resp, err := c.client.Do(req) + if err != nil { + return nil, 0, err + } + roundTripTime := time.Since(start) + + if resp.StatusCode < 200 || resp.StatusCode >= 300 { + defer resp.Body.Close() + body, readErr := io.ReadAll(resp.Body) + if readErr != nil { + return nil, roundTripTime, readErr + } + return nil, roundTripTime, &sparqlHTTPStatusError{ + StatusCode: resp.StatusCode, + Status: resp.Status, + Body: strings.TrimSpace(string(body)), + } + } + + return resp, roundTripTime, nil +} + +func (c *AnzoGraphClient) shouldRefreshExpiredJWT(err *sparqlHTTPStatusError) bool { + if err == nil || !c.cfg.UsesExternalSparql() { + return false + } + return err.StatusCode == http.StatusUnauthorized && strings.Contains(err.Body, "Jwt is expired") +} + +func (c *AnzoGraphClient) QueryJSON(ctx context.Context, query string, target any) (sparqlQueryMetrics, error) { + return c.queryJSONWithTimeout(ctx, query, c.cfg.SparqlTimeout, target) +} + +func (c *AnzoGraphClient) queryJSONWithTimeout(ctx context.Context, query string, timeout time.Duration, target any) (sparqlQueryMetrics, error) { + resp, roundTripTime, err := c.queryRequestWithTimeout(ctx, query, timeout) + if err != nil { + return sparqlQueryMetrics{}, err + } + counter := &countingReadCloser{ReadCloser: resp.Body} + defer counter.Close() + + decodeStart := time.Now() + if err := json.NewDecoder(counter).Decode(target); err != nil { + return sparqlQueryMetrics{ + ResponseBytes: counter.bytesRead, + RoundTripTime: roundTripTime, + BodyDecodeTime: time.Since(decodeStart), + }, wrapSparqlJSONDecodeError(err) + } + + return sparqlQueryMetrics{ + ResponseBytes: counter.bytesRead, + RoundTripTime: roundTripTime, + BodyDecodeTime: time.Since(decodeStart), + }, nil +} + +func (c *AnzoGraphClient) QueryTripleBindingsStream( + ctx context.Context, + query string, + visit func(binding sparqlTripleBinding) error, +) (sparqlQueryMetrics, error) { + return c.queryTripleBindingsStreamWithTimeout(ctx, query, c.cfg.SparqlTimeout, visit) +} + +func (c *AnzoGraphClient) queryTripleBindingsStreamWithTimeout( + ctx context.Context, + query string, + timeout time.Duration, + visit func(binding sparqlTripleBinding) error, +) (sparqlQueryMetrics, error) { + resp, roundTripTime, err := c.queryRequestWithTimeout(ctx, query, timeout) + if err != nil { + return sparqlQueryMetrics{}, err + } + counter := &countingReadCloser{ReadCloser: resp.Body} + defer counter.Close() + + decodeStart := time.Now() + bindingCount, err := decodeBindingsStream(json.NewDecoder(counter), visit) + if err != nil { + return sparqlQueryMetrics{ + ResponseBytes: counter.bytesRead, + RoundTripTime: roundTripTime, + BodyDecodeTime: time.Since(decodeStart), + BindingCount: bindingCount, + }, wrapSparqlJSONDecodeError(err) + } + + return sparqlQueryMetrics{ + ResponseBytes: counter.bytesRead, + RoundTripTime: roundTripTime, + BodyDecodeTime: time.Since(decodeStart), + BindingCount: bindingCount, + }, nil +} + +func decodeBindingsStream(dec *json.Decoder, visit func(binding sparqlTripleBinding) error) (int, error) { + tok, err := dec.Token() + if err != nil { + return 0, err + } + if delim, ok := tok.(json.Delim); !ok || delim != '{' { + return 0, fmt.Errorf("invalid SPARQL JSON: expected top-level object") + } + + foundResults := false + bindingCount := 0 + for dec.More() { + keyToken, err := dec.Token() + if err != nil { + return bindingCount, err + } + + key, ok := keyToken.(string) + if !ok { + return bindingCount, fmt.Errorf("invalid SPARQL JSON: expected top-level field name") + } + + switch key { + case "results": + foundResults = true + n, err := decodeTripleBindingsObject(dec, visit) + bindingCount += n + if err != nil { + return bindingCount, err + } + default: + if err := discardJSONValue(dec); err != nil { + return bindingCount, err + } + } + } + + tok, err = dec.Token() + if err != nil { + return bindingCount, err + } + if delim, ok := tok.(json.Delim); !ok || delim != '}' { + return bindingCount, fmt.Errorf("invalid SPARQL JSON: expected top-level object terminator") + } + + if !foundResults { + return 0, fmt.Errorf("invalid SPARQL JSON: missing results field") + } + + return bindingCount, nil +} + +func decodeTripleBindingsObject(dec *json.Decoder, visit func(binding sparqlTripleBinding) error) (int, error) { + tok, err := dec.Token() + if err != nil { + return 0, err + } + if delim, ok := tok.(json.Delim); !ok || delim != '{' { + return 0, fmt.Errorf("invalid SPARQL JSON: expected results object") + } + + bindingCount := 0 + for dec.More() { + keyToken, err := dec.Token() + if err != nil { + return bindingCount, err + } + + key, ok := keyToken.(string) + if !ok { + return bindingCount, fmt.Errorf("invalid SPARQL JSON: expected results field name") + } + + if key != "bindings" { + if err := discardJSONValue(dec); err != nil { + return bindingCount, err + } + continue + } + + tok, err := dec.Token() + if err != nil { + return bindingCount, err + } + if delim, ok := tok.(json.Delim); !ok || delim != '[' { + return bindingCount, fmt.Errorf("invalid SPARQL JSON: expected bindings array") + } + + for dec.More() { + var binding sparqlTripleBinding + if err := dec.Decode(&binding); err != nil { + return bindingCount, err + } + bindingCount++ + if err := visit(binding); err != nil { + return bindingCount, err + } + } + + tok, err = dec.Token() + if err != nil { + return bindingCount, err + } + if delim, ok := tok.(json.Delim); !ok || delim != ']' { + return bindingCount, fmt.Errorf("invalid SPARQL JSON: expected bindings array terminator") + } + } + + tok, err = dec.Token() + if err != nil { + return bindingCount, err + } + if delim, ok := tok.(json.Delim); !ok || delim != '}' { + return bindingCount, fmt.Errorf("invalid SPARQL JSON: expected results object terminator") + } + + return bindingCount, nil +} + +func discardJSONValue(dec *json.Decoder) error { + var discard json.RawMessage + return dec.Decode(&discard) +} + +func wrapSparqlJSONDecodeError(err error) error { + if err == nil { + return nil + } + if isTruncatedJSONError(err) { + return fmt.Errorf("truncated SPARQL JSON: %w", err) + } + return fmt.Errorf("failed to decode SPARQL JSON: %w", err) +} + +func isTruncatedJSONError(err error) bool { + return errors.Is(err, io.ErrUnexpectedEOF) || + errors.Is(err, io.EOF) || + strings.Contains(err.Error(), "unexpected end of JSON input") || + strings.Contains(err.Error(), "unexpected EOF") +} diff --git a/backend_go/sparql_decode_test.go b/backend_go/sparql_decode_test.go new file mode 100644 index 0000000..5d839c9 --- /dev/null +++ b/backend_go/sparql_decode_test.go @@ -0,0 +1,144 @@ +package main + +import ( + "context" + "encoding/json" + "net/http" + "net/http/httptest" + "strings" + "testing" + "time" +) + +func TestDecodeBindingsStreamStreamsTripleBindings(t *testing.T) { + payload := `{ + "head": {"vars": ["s", "p", "o"]}, + "results": { + "bindings": [ + { + "s": {"type": "uri", "value": "http://example.com/s1"}, + "p": {"type": "uri", "value": "http://example.com/p"}, + "o": {"type": "uri", "value": "http://example.com/o1"} + }, + { + "s": {"type": "uri", "value": "http://example.com/s2"}, + "p": {"type": "uri", "value": "http://example.com/p"}, + "o": {"type": "uri", "value": "http://example.com/o2"} + } + ] + } +}` + + var got []sparqlTripleBinding + count, err := decodeBindingsStream(json.NewDecoder(strings.NewReader(payload)), func(binding sparqlTripleBinding) error { + got = append(got, binding) + return nil + }) + if err != nil { + t.Fatalf("decodeBindingsStream returned error: %v", err) + } + if count != 2 { + t.Fatalf("expected 2 bindings, got %d", count) + } + if len(got) != 2 { + t.Fatalf("expected 2 streamed bindings, got %d", len(got)) + } + if got[0].S.Value != "http://example.com/s1" || got[1].O.Value != "http://example.com/o2" { + t.Fatalf("unexpected streamed bindings: %+v", got) + } +} + +func TestQueryJSONDecodesTypedBindings(t *testing.T) { + t.Run("predicates", func(t *testing.T) { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/sparql-results+json") + _, _ = w.Write([]byte(`{ + "head": {"vars": ["p"]}, + "results": { + "bindings": [ + {"p": {"type": "uri", "value": "http://example.com/p"}} + ] + } +}`)) + })) + defer server.Close() + + client := &AnzoGraphClient{ + cfg: Config{SparqlTimeout: 5 * time.Second}, + endpoint: server.URL, + client: server.Client(), + } + + var res sparqlBindingsResponse[sparqlPredicateBinding] + metrics, err := client.QueryJSON(context.Background(), "SELECT ?p WHERE { ?s ?p ?o }", &res) + if err != nil { + t.Fatalf("QueryJSON returned error: %v", err) + } + if len(res.Results.Bindings) != 1 || res.Results.Bindings[0].P.Value != "http://example.com/p" { + t.Fatalf("unexpected predicate bindings: %+v", res.Results.Bindings) + } + if metrics.ResponseBytes == 0 { + t.Fatalf("expected QueryJSON to record response bytes") + } + })) + + t.Run("labels", func(t *testing.T) { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/sparql-results+json") + _, _ = w.Write([]byte(`{ + "head": {"vars": ["s", "label"]}, + "results": { + "bindings": [ + { + "s": {"type": "uri", "value": "http://example.com/s"}, + "label": {"type": "literal", "xml:lang": "en", "value": "Example"} + } + ] + } +}`)) + })) + defer server.Close() + + client := &AnzoGraphClient{ + cfg: Config{SparqlTimeout: 5 * time.Second}, + endpoint: server.URL, + client: server.Client(), + } + + var res sparqlBindingsResponse[sparqlLabelBinding] + _, err := client.QueryJSON(context.Background(), "SELECT ?s ?label WHERE { ?s ?p ?label }", &res) + if err != nil { + t.Fatalf("QueryJSON returned error: %v", err) + } + if len(res.Results.Bindings) != 1 { + t.Fatalf("expected 1 label binding, got %d", len(res.Results.Bindings)) + } + if res.Results.Bindings[0].Label.Value != "Example" || res.Results.Bindings[0].Label.Lang != "en" { + t.Fatalf("unexpected label binding: %+v", res.Results.Bindings[0]) + } + })) +} + +func TestQueryTripleBindingsStreamReportsTruncatedJSON(t *testing.T) { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/sparql-results+json") + _, _ = w.Write([]byte(`{"results":{"bindings":[{"s":{"type":"uri","value":"http://example.com/s"}`)) + })) + defer server.Close() + + client := &AnzoGraphClient{ + cfg: Config{SparqlTimeout: 5 * time.Second}, + endpoint: server.URL, + client: server.Client(), + } + + _, err := client.QueryTripleBindingsStream(context.Background(), "SELECT ?s ?p ?o WHERE { ?s ?p ?o }", func(binding sparqlTripleBinding) error { + return nil + }) + if err == nil { + t.Fatalf("expected truncated JSON error") + } + if !strings.Contains(err.Error(), "truncated SPARQL JSON") { + t.Fatalf("expected truncated JSON error, got %v", err) + } +} diff --git a/backend_go/sparql_named_graph_test.go b/backend_go/sparql_named_graph_test.go new file mode 100644 index 0000000..0cfbc71 --- /dev/null +++ b/backend_go/sparql_named_graph_test.go @@ -0,0 +1,20 @@ +package main + +import ( + "strings" + "testing" +) + +func TestNamedGraphAnyTripleAskQueryUsesGraphVariable(t *testing.T) { + query := namedGraphAnyTripleAskQuery() + + if !strings.Contains(query, "ASK WHERE") { + t.Fatalf("readiness query should be an ASK query:\n%s", query) + } + if !strings.Contains(query, "GRAPH ?g") { + t.Fatalf("readiness query should probe named graphs:\n%s", query) + } + if strings.Contains(query, "ASK WHERE { ?s ?p ?o }") { + t.Fatalf("readiness query should no longer probe only the default graph:\n%s", query) + } +} diff --git a/backend_go/sparql_types.go b/backend_go/sparql_types.go index 12683d6..9ad182c 100644 --- a/backend_go/sparql_types.go +++ b/backend_go/sparql_types.go @@ -6,8 +6,27 @@ type sparqlTerm struct { Lang string `json:"xml:lang,omitempty"` } -type sparqlResponse struct { +type sparqlBooleanResponse struct { + Boolean bool `json:"boolean"` +} + +type sparqlTripleBinding struct { + S sparqlTerm `json:"s"` + P sparqlTerm `json:"p"` + O sparqlTerm `json:"o"` +} + +type sparqlPredicateBinding struct { + P sparqlTerm `json:"p"` +} + +type sparqlLabelBinding struct { + S sparqlTerm `json:"s"` + Label sparqlTerm `json:"label"` +} + +type sparqlBindingsResponse[T any] struct { Results struct { - Bindings []map[string]sparqlTerm `json:"bindings"` + Bindings []T `json:"bindings"` } `json:"results"` } diff --git a/docker-compose.yml b/docker-compose.yml index 18c8aa4..af02fb5 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -36,14 +36,21 @@ services: - MAX_EDGE_LIMIT=${MAX_EDGE_LIMIT:-20000000} - INCLUDE_BNODES=${INCLUDE_BNODES:-false} - CORS_ORIGINS=${CORS_ORIGINS:-http://localhost:5173} + - SPARQL_SOURCE_MODE=${SPARQL_SOURCE_MODE:-local} - SPARQL_HOST=${SPARQL_HOST:-http://anzograph:8080} - SPARQL_ENDPOINT + - EXTERNAL_SPARQL_ENDPOINT + - KEYCLOAK_TOKEN_ENDPOINT + - KEYCLOAK_CLIENT_ID + - KEYCLOAK_USERNAME + - KEYCLOAK_PASSWORD + - KEYCLOAK_SCOPE=${KEYCLOAK_SCOPE:-openid} + - ACCESS_TOKEN - SPARQL_USER=${SPARQL_USER:-admin} - SPARQL_PASS=${SPARQL_PASS:-Passw0rd1} - SPARQL_DATA_FILE=${SPARQL_DATA_FILE:-file:///opt/shared-files/o3po.ttl} - SPARQL_GRAPH_IRI - SPARQL_LOAD_ON_START=${SPARQL_LOAD_ON_START:-false} - - SPARQL_CLEAR_ON_START=${SPARQL_CLEAR_ON_START:-false} - SPARQL_TIMEOUT_S=${SPARQL_TIMEOUT_S:-300} - SPARQL_READY_RETRIES=${SPARQL_READY_RETRIES:-30} - SPARQL_READY_DELAY_S=${SPARQL_READY_DELAY_S:-4}