Add filter, add READMES

This commit is contained in:
Oxy8
2026-03-06 15:35:04 -03:00
parent b44867abfa
commit 3c487d088b
56 changed files with 2495 additions and 1424 deletions

23
backend_go/Dockerfile Normal file
View File

@@ -0,0 +1,23 @@
FROM golang:1.22-alpine AS builder
WORKDIR /src
COPY go.mod /src/go.mod
RUN go mod download
COPY . /src
RUN CGO_ENABLED=0 GOOS=linux go build -trimpath -ldflags="-s -w" -o /out/backend ./
FROM alpine:3.20
RUN apk add --no-cache ca-certificates curl
WORKDIR /app
COPY --from=builder /out/backend /app/backend
EXPOSE 8000
CMD ["/app/backend"]

98
backend_go/README.md Normal file
View File

@@ -0,0 +1,98 @@
# Backend (Go) Graph + Selection API
This service exposes a small HTTP API for:
- Building and caching a “graph snapshot” from AnzoGraph via SPARQL (`/api/graph`)
- Returning available “graph query” and “selection query” modes
- Running selection queries for the currently selected node IDs
- (Optionally) issuing raw SPARQL passthrough for debugging
## Run
Via Docker Compose (recommended):
```bash
docker compose up --build backend
```
The backend listens on `:8000` (configurable via `LISTEN_ADDR`).
## Configuration (env)
See `backend_go/config.go` for the full set.
Important variables:
- Snapshot limits:
- `DEFAULT_NODE_LIMIT`, `DEFAULT_EDGE_LIMIT`
- `MAX_NODE_LIMIT`, `MAX_EDGE_LIMIT`
- SPARQL connectivity:
- `SPARQL_HOST` (default `http://anzograph:8080`) or `SPARQL_ENDPOINT`
- `SPARQL_USER`, `SPARQL_PASS`
- Startup behavior:
- `SPARQL_LOAD_ON_START`, `SPARQL_CLEAR_ON_START`
- `SPARQL_DATA_FILE` (typically `file:///opt/shared-files/<file>.ttl`)
- Other:
- `INCLUDE_BNODES` (include blank nodes in snapshots)
- `CORS_ORIGINS`
## Endpoints
- `GET /api/health`
- `GET /api/stats`
- `GET /api/graph?node_limit=&edge_limit=&graph_query_id=`
- `GET /api/graph_queries`
- `GET /api/selection_queries`
- `POST /api/selection_query`
- Body: `{"query_id":"neighbors","selected_ids":[1,2,3],"node_limit":...,"edge_limit":...,"graph_query_id":"default"}`
- `POST /api/sparql` (raw passthrough)
- `POST /api/neighbors` (legacy alias of `query_id="neighbors"`)
## Graph snapshots
Snapshots are built by:
1) Running a SPARQL edge query (controlled by `graph_query_id`)
2) Converting SPARQL bindings into dense integer node IDs + edge list
3) Computing a layout and fetching optional `rdfs:label`
Snapshots are cached in-memory keyed by:
- `node_limit`, `edge_limit`, `INCLUDE_BNODES`, `graph_query_id`
## Query registries
### Graph query modes (`graph_query_id`)
Stored under `backend_go/graph_queries/` and listed by `GET /api/graph_queries`.
Built-in modes:
- `default` `rdf:type` (to `owl:Class`) + `rdfs:subClassOf`
- `hierarchy` `rdfs:subClassOf` only
- `types` `rdf:type` (to `owl:Class`) only
To add a new mode:
1) Add a new file under `backend_go/graph_queries/` that returns a SPARQL query selecting `?s ?p ?o`.
2) Register it in `backend_go/graph_queries/registry.go`.
### Selection query modes (`query_id`)
Stored under `backend_go/selection_queries/` and listed by `GET /api/selection_queries`.
Built-in modes:
- `neighbors` type + subclass neighbors (both directions)
- `superclasses` `?sel rdfs:subClassOf ?nbr`
- `subclasses` `?nbr rdfs:subClassOf ?sel`
To add a new mode:
1) Add a new file under `backend_go/selection_queries/` that returns neighbor node IDs.
2) Register it in `backend_go/selection_queries/registry.go`.
## Performance notes
- Memory usage is dominated by the cached snapshot (`[]Node`, `[]Edge`) and the temporary SPARQL JSON unmarshalling step.
- Tune `DEFAULT_NODE_LIMIT`/`DEFAULT_EDGE_LIMIT` first if memory is too high.

176
backend_go/config.go Normal file
View File

@@ -0,0 +1,176 @@
package main
import (
"fmt"
"os"
"strconv"
"strings"
"time"
)
type Config struct {
IncludeBNodes bool
CorsOrigins string
DefaultNodeLimit int
DefaultEdgeLimit int
MaxNodeLimit int
MaxEdgeLimit int
SparqlHost string
SparqlEndpoint string
SparqlUser string
SparqlPass string
SparqlInsecureTLS bool
SparqlDataFile string
SparqlGraphIRI string
SparqlLoadOnStart bool
SparqlClearOnStart bool
SparqlTimeout time.Duration
SparqlReadyRetries int
SparqlReadyDelay time.Duration
SparqlReadyTimeout time.Duration
ListenAddr string
}
func LoadConfig() (Config, error) {
cfg := Config{
IncludeBNodes: envBool("INCLUDE_BNODES", false),
CorsOrigins: envString("CORS_ORIGINS", "*"),
DefaultNodeLimit: envInt("DEFAULT_NODE_LIMIT", 800_000),
DefaultEdgeLimit: envInt("DEFAULT_EDGE_LIMIT", 2_000_000),
MaxNodeLimit: envInt("MAX_NODE_LIMIT", 10_000_000),
MaxEdgeLimit: envInt("MAX_EDGE_LIMIT", 20_000_000),
SparqlHost: envString("SPARQL_HOST", "http://anzograph:8080"),
SparqlEndpoint: envString("SPARQL_ENDPOINT", ""),
SparqlUser: envString("SPARQL_USER", ""),
SparqlPass: envString("SPARQL_PASS", ""),
SparqlInsecureTLS: envBool("SPARQL_INSECURE_TLS", false),
SparqlDataFile: envString("SPARQL_DATA_FILE", ""),
SparqlGraphIRI: envString("SPARQL_GRAPH_IRI", ""),
SparqlLoadOnStart: envBool("SPARQL_LOAD_ON_START", false),
SparqlClearOnStart: envBool("SPARQL_CLEAR_ON_START", false),
SparqlReadyRetries: envInt("SPARQL_READY_RETRIES", 30),
ListenAddr: envString("LISTEN_ADDR", ":8000"),
}
var err error
cfg.SparqlTimeout, err = envSeconds("SPARQL_TIMEOUT_S", 300)
if err != nil {
return Config{}, err
}
cfg.SparqlReadyDelay, err = envSeconds("SPARQL_READY_DELAY_S", 4)
if err != nil {
return Config{}, err
}
cfg.SparqlReadyTimeout, err = envSeconds("SPARQL_READY_TIMEOUT_S", 10)
if err != nil {
return Config{}, err
}
if cfg.SparqlLoadOnStart && strings.TrimSpace(cfg.SparqlDataFile) == "" {
return Config{}, fmt.Errorf("SPARQL_LOAD_ON_START=true but SPARQL_DATA_FILE is not set")
}
if cfg.DefaultNodeLimit < 1 {
return Config{}, fmt.Errorf("DEFAULT_NODE_LIMIT must be >= 1")
}
if cfg.DefaultEdgeLimit < 1 {
return Config{}, fmt.Errorf("DEFAULT_EDGE_LIMIT must be >= 1")
}
if cfg.MaxNodeLimit < 1 {
return Config{}, fmt.Errorf("MAX_NODE_LIMIT must be >= 1")
}
if cfg.MaxEdgeLimit < 1 {
return Config{}, fmt.Errorf("MAX_EDGE_LIMIT must be >= 1")
}
if cfg.DefaultNodeLimit > cfg.MaxNodeLimit {
return Config{}, fmt.Errorf("DEFAULT_NODE_LIMIT must be <= MAX_NODE_LIMIT")
}
if cfg.DefaultEdgeLimit > cfg.MaxEdgeLimit {
return Config{}, fmt.Errorf("DEFAULT_EDGE_LIMIT must be <= MAX_EDGE_LIMIT")
}
return cfg, nil
}
func (c Config) EffectiveSparqlEndpoint() string {
if strings.TrimSpace(c.SparqlEndpoint) != "" {
return strings.TrimSpace(c.SparqlEndpoint)
}
return strings.TrimRight(c.SparqlHost, "/") + "/sparql"
}
func (c Config) corsOriginList() []string {
raw := strings.TrimSpace(c.CorsOrigins)
if raw == "" || raw == "*" {
return []string{"*"}
}
parts := strings.Split(raw, ",")
out := make([]string, 0, len(parts))
for _, p := range parts {
p = strings.TrimSpace(p)
if p == "" {
continue
}
out = append(out, p)
}
if len(out) == 0 {
return []string{"*"}
}
return out
}
func envString(name, def string) string {
v := os.Getenv(name)
if strings.TrimSpace(v) == "" {
return def
}
return v
}
func envBool(name string, def bool) bool {
v := strings.TrimSpace(os.Getenv(name))
if v == "" {
return def
}
switch strings.ToLower(v) {
case "1", "true", "yes", "y", "on":
return true
case "0", "false", "no", "n", "off":
return false
default:
return def
}
}
func envInt(name string, def int) int {
v := strings.TrimSpace(os.Getenv(name))
if v == "" {
return def
}
v = strings.ReplaceAll(v, "_", "")
n, err := strconv.Atoi(v)
if err != nil {
return def
}
return n
}
func envSeconds(name string, def float64) (time.Duration, error) {
v := strings.TrimSpace(os.Getenv(name))
if v == "" {
return time.Duration(def * float64(time.Second)), nil
}
f, err := strconv.ParseFloat(v, 64)
if err != nil {
return 0, fmt.Errorf("%s must be a number (seconds): %w", name, err)
}
return time.Duration(f * float64(time.Second)), nil
}

3
backend_go/go.mod Normal file
View File

@@ -0,0 +1,3 @@
module visualizador_instanciados/backend_go
go 1.22

View File

@@ -0,0 +1,98 @@
package main
type termKey struct {
termType string
key string
}
type termMeta struct {
termType string
iri string
}
func graphFromSparqlBindings(
bindings []map[string]sparqlTerm,
nodeLimit int,
includeBNodes bool,
) (nodes []Node, edges []Edge) {
nodeIDByKey := map[termKey]int{}
nodeMeta := make([]termMeta, 0, min(nodeLimit, 4096))
getOrAdd := func(term sparqlTerm) (int, bool) {
if term.Type == "" || term.Value == "" {
return 0, false
}
if term.Type == "literal" {
return 0, false
}
var key termKey
var meta termMeta
if term.Type == "bnode" {
if !includeBNodes {
return 0, false
}
key = termKey{termType: "bnode", key: term.Value}
meta = termMeta{termType: "bnode", iri: "_:" + term.Value}
} else {
key = termKey{termType: "uri", key: term.Value}
meta = termMeta{termType: "uri", iri: term.Value}
}
if existing, ok := nodeIDByKey[key]; ok {
return existing, true
}
if len(nodeMeta) >= nodeLimit {
return 0, false
}
nid := len(nodeMeta)
nodeIDByKey[key] = nid
nodeMeta = append(nodeMeta, meta)
return nid, true
}
for _, b := range bindings {
sTerm := b["s"]
oTerm := b["o"]
pTerm := b["p"]
sid, okS := getOrAdd(sTerm)
oid, okO := getOrAdd(oTerm)
if !okS || !okO {
continue
}
pred := pTerm.Value
if pred == "" {
continue
}
edges = append(edges, Edge{
Source: sid,
Target: oid,
Predicate: pred,
})
}
nodes = make([]Node, len(nodeMeta))
for i, m := range nodeMeta {
nodes[i] = Node{
ID: i,
TermType: m.termType,
IRI: m.iri,
Label: nil,
X: 0,
Y: 0,
}
}
return nodes, edges
}
func min(a, b int) int {
if a < b {
return a
}
return b
}

View File

@@ -0,0 +1,34 @@
package graph_queries
import "fmt"
func defaultEdgeQuery(edgeLimit int, includeBNodes bool) string {
bnodeFilter := ""
if !includeBNodes {
bnodeFilter = "FILTER(!isBlank(?s) && !isBlank(?o))"
}
return fmt.Sprintf(`
PREFIX rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#>
PREFIX rdfs: <http://www.w3.org/2000/01/rdf-schema#>
PREFIX owl: <http://www.w3.org/2002/07/owl#>
SELECT ?s ?p ?o
WHERE {
{
VALUES ?p { rdf:type }
?s ?p ?o .
?o rdf:type owl:Class .
}
UNION
{
VALUES ?p { rdfs:subClassOf }
?s ?p ?o .
}
FILTER(!isLiteral(?o))
%s
}
LIMIT %d
`, bnodeFilter, edgeLimit)
}

View File

@@ -0,0 +1,24 @@
package graph_queries
import "fmt"
func hierarchyEdgeQuery(edgeLimit int, includeBNodes bool) string {
bnodeFilter := ""
if !includeBNodes {
bnodeFilter = "FILTER(!isBlank(?s) && !isBlank(?o))"
}
return fmt.Sprintf(`
PREFIX rdfs: <http://www.w3.org/2000/01/rdf-schema#>
SELECT ?s ?p ?o
WHERE {
VALUES ?p { rdfs:subClassOf }
?s ?p ?o .
FILTER(!isLiteral(?o))
%s
}
LIMIT %d
`, bnodeFilter, edgeLimit)
}

View File

@@ -0,0 +1,27 @@
package graph_queries
const DefaultID = "default"
var definitions = []Definition{
{Meta: Meta{ID: DefaultID, Label: "Default"}, EdgeQuery: defaultEdgeQuery},
{Meta: Meta{ID: "hierarchy", Label: "Hierarchy"}, EdgeQuery: hierarchyEdgeQuery},
{Meta: Meta{ID: "types", Label: "Types"}, EdgeQuery: typesOnlyEdgeQuery},
}
func List() []Meta {
out := make([]Meta, 0, len(definitions))
for _, d := range definitions {
out = append(out, d.Meta)
}
return out
}
func Get(id string) (Definition, bool) {
for _, d := range definitions {
if d.Meta.ID == id {
return d, true
}
}
return Definition{}, false
}

View File

@@ -0,0 +1,12 @@
package graph_queries
type Meta struct {
ID string `json:"id"`
Label string `json:"label"`
}
type Definition struct {
Meta Meta
EdgeQuery func(edgeLimit int, includeBNodes bool) string
}

View File

@@ -0,0 +1,26 @@
package graph_queries
import "fmt"
func typesOnlyEdgeQuery(edgeLimit int, includeBNodes bool) string {
bnodeFilter := ""
if !includeBNodes {
bnodeFilter = "FILTER(!isBlank(?s) && !isBlank(?o))"
}
return fmt.Sprintf(`
PREFIX rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#>
PREFIX owl: <http://www.w3.org/2002/07/owl#>
SELECT ?s ?p ?o
WHERE {
VALUES ?p { rdf:type }
?s ?p ?o .
?o rdf:type owl:Class .
FILTER(!isLiteral(?o))
%s
}
LIMIT %d
`, bnodeFilter, edgeLimit)
}

View File

@@ -0,0 +1,208 @@
package main
import (
"context"
"encoding/json"
"fmt"
"sort"
"strings"
graphqueries "visualizador_instanciados/backend_go/graph_queries"
)
const rdfsLabelIRI = "http://www.w3.org/2000/01/rdf-schema#label"
func fetchGraphSnapshot(
ctx context.Context,
sparql *AnzoGraphClient,
cfg Config,
nodeLimit int,
edgeLimit int,
graphQueryID string,
) (GraphResponse, error) {
def, ok := graphqueries.Get(graphQueryID)
if !ok {
return GraphResponse{}, fmt.Errorf("unknown graph_query_id: %s", graphQueryID)
}
edgesQ := def.EdgeQuery(edgeLimit, cfg.IncludeBNodes)
raw, err := sparql.Query(ctx, edgesQ)
if err != nil {
return GraphResponse{}, err
}
var res sparqlResponse
if err := json.Unmarshal(raw, &res); err != nil {
return GraphResponse{}, fmt.Errorf("failed to parse SPARQL JSON: %w", err)
}
nodes, edges := graphFromSparqlBindings(res.Results.Bindings, nodeLimit, cfg.IncludeBNodes)
// Layout: invert edges for hierarchy (target -> source).
hierEdges := make([][2]int, 0, len(edges))
for _, e := range edges {
hierEdges = append(hierEdges, [2]int{e.Target, e.Source})
}
layers, cycleErr := levelSynchronousKahnLayers(len(nodes), hierEdges)
if cycleErr != nil {
sample := make([]string, 0, 20)
for _, nid := range cycleErr.RemainingNodeIDs {
if len(sample) >= 20 {
break
}
if nid >= 0 && nid < len(nodes) {
sample = append(sample, nodes[nid].IRI)
}
}
cycleErr.RemainingIRISample = sample
return GraphResponse{}, cycleErr
}
idToIRI := make([]string, len(nodes))
for i := range nodes {
idToIRI[i] = nodes[i].IRI
}
for _, layer := range layers {
sortLayerByIRI(layer, idToIRI)
}
xs, ys := radialPositionsFromLayers(len(nodes), layers, 5000.0)
for i := range nodes {
nodes[i].X = xs[i]
nodes[i].Y = ys[i]
}
// Attach labels for URI nodes.
iris := make([]string, 0)
for _, n := range nodes {
if n.TermType == "uri" && n.IRI != "" {
iris = append(iris, n.IRI)
}
}
if len(iris) > 0 {
labelByIRI, err := fetchRDFSLabels(ctx, sparql, iris, 500)
if err != nil {
return GraphResponse{}, err
}
for i := range nodes {
if nodes[i].TermType != "uri" {
continue
}
lbl, ok := labelByIRI[nodes[i].IRI]
if !ok {
continue
}
val := lbl
nodes[i].Label = &val
}
}
meta := &GraphMeta{
Backend: "anzograph",
TTLPath: nil,
SparqlEndpoint: cfg.EffectiveSparqlEndpoint(),
IncludeBNodes: cfg.IncludeBNodes,
GraphQueryID: graphQueryID,
NodeLimit: nodeLimit,
EdgeLimit: edgeLimit,
Nodes: len(nodes),
Edges: len(edges),
}
return GraphResponse{Nodes: nodes, Edges: edges, Meta: meta}, nil
}
type bestLabel struct {
score int
value string
}
func fetchRDFSLabels(
ctx context.Context,
sparql *AnzoGraphClient,
iris []string,
batchSize int,
) (map[string]string, error) {
best := make(map[string]bestLabel)
for i := 0; i < len(iris); i += batchSize {
end := i + batchSize
if end > len(iris) {
end = len(iris)
}
batch := iris[i:end]
values := make([]string, 0, len(batch))
for _, u := range batch {
values = append(values, "<"+u+">")
}
q := fmt.Sprintf(`
SELECT ?s ?label
WHERE {
VALUES ?s { %s }
?s <%s> ?label .
}
`, strings.Join(values, " "), rdfsLabelIRI)
raw, err := sparql.Query(ctx, q)
if err != nil {
return nil, err
}
var res sparqlResponse
if err := json.Unmarshal(raw, &res); err != nil {
return nil, fmt.Errorf("failed to parse SPARQL JSON: %w", err)
}
for _, b := range res.Results.Bindings {
sTerm, ok := b["s"]
if !ok || sTerm.Value == "" {
continue
}
lblTerm, ok := b["label"]
if !ok || lblTerm.Type != "literal" || lblTerm.Value == "" {
continue
}
score := labelScore(lblTerm.Lang)
prev, ok := best[sTerm.Value]
if !ok || score > prev.score {
best[sTerm.Value] = bestLabel{score: score, value: lblTerm.Value}
}
}
}
out := make(map[string]string, len(best))
for iri, v := range best {
out[iri] = v.value
}
return out, nil
}
func labelScore(lang string) int {
lang = strings.ToLower(strings.TrimSpace(lang))
if lang == "en" {
return 3
}
if lang == "" {
return 2
}
return 1
}
func sortIntsUnique(xs []int) []int {
if len(xs) == 0 {
return xs
}
sort.Ints(xs)
out := xs[:0]
var last int
for i, v := range xs {
if i == 0 || v != last {
out = append(out, v)
}
last = v
}
return out
}

View File

@@ -0,0 +1,23 @@
package main
import (
"encoding/json"
"io"
"net/http"
)
func writeJSON(w http.ResponseWriter, status int, v any) {
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(status)
enc := json.NewEncoder(w)
_ = enc.Encode(v)
}
func writeError(w http.ResponseWriter, status int, msg string) {
writeJSON(w, status, ErrorResponse{Detail: msg})
}
func decodeJSON(r io.Reader, dst any) error {
dec := json.NewDecoder(r)
return dec.Decode(dst)
}

148
backend_go/layout.go Normal file
View File

@@ -0,0 +1,148 @@
package main
import (
"fmt"
"math"
"sort"
)
type CycleError struct {
Processed int
Total int
RemainingNodeIDs []int
RemainingIRISample []string
}
func (e *CycleError) Error() string {
msg := fmt.Sprintf("Cycle detected in subClassOf graph (processed %d/%d nodes).", e.Processed, e.Total)
if len(e.RemainingIRISample) > 0 {
msg += " Example nodes: " + stringsJoin(e.RemainingIRISample, ", ")
}
return msg
}
func levelSynchronousKahnLayers(nodeCount int, edges [][2]int) ([][]int, *CycleError) {
n := nodeCount
if n <= 0 {
return [][]int{}, nil
}
adj := make([][]int, n)
indeg := make([]int, n)
for _, e := range edges {
u, v := e[0], e[1]
if u == v {
continue
}
if u < 0 || u >= n || v < 0 || v >= n {
continue
}
adj[u] = append(adj[u], v)
indeg[v]++
}
q := make([]int, 0, n)
for i, d := range indeg {
if d == 0 {
q = append(q, i)
}
}
layers := make([][]int, 0)
processed := 0
for len(q) > 0 {
layer := append([]int(nil), q...)
q = q[:0]
layers = append(layers, layer)
for _, u := range layer {
processed++
for _, v := range adj[u] {
indeg[v]--
if indeg[v] == 0 {
q = append(q, v)
}
}
}
}
if processed != n {
remaining := make([]int, 0)
for i, d := range indeg {
if d > 0 {
remaining = append(remaining, i)
}
}
return nil, &CycleError{Processed: processed, Total: n, RemainingNodeIDs: remaining}
}
return layers, nil
}
func radialPositionsFromLayers(nodeCount int, layers [][]int, maxR float64) (xs []float64, ys []float64) {
n := nodeCount
if n <= 0 {
return []float64{}, []float64{}
}
xs = make([]float64, n)
ys = make([]float64, n)
if len(layers) == 0 {
return xs, ys
}
twoPi := 2.0 * math.Pi
golden := math.Pi * (3.0 - math.Sqrt(5.0))
layerCount := float64(len(layers))
denom := layerCount + 1.0
for li, layer := range layers {
m := len(layer)
if m == 0 {
continue
}
r := (float64(li+1) / denom) * maxR
offset := math.Mod(float64(li)*golden, twoPi)
if m == 1 {
nid := layer[0]
if nid >= 0 && nid < n {
xs[nid] = r * math.Cos(offset)
ys[nid] = r * math.Sin(offset)
}
continue
}
step := twoPi / float64(m)
for j, nid := range layer {
if nid < 0 || nid >= n {
continue
}
t := offset + step*float64(j)
xs[nid] = r * math.Cos(t)
ys[nid] = r * math.Sin(t)
}
}
return xs, ys
}
func sortLayerByIRI(layer []int, idToIRI []string) {
sort.Slice(layer, func(i, j int) bool {
return idToIRI[layer[i]] < idToIRI[layer[j]]
})
}
func stringsJoin(parts []string, sep string) string {
if len(parts) == 0 {
return ""
}
out := parts[0]
for i := 1; i < len(parts); i++ {
out += sep
out += parts[i]
}
return out
}

35
backend_go/main.go Normal file
View File

@@ -0,0 +1,35 @@
package main
import (
"context"
"log"
"net/http"
"time"
)
func main() {
cfg, err := LoadConfig()
if err != nil {
log.Fatal(err)
}
sparql := NewAnzoGraphClient(cfg)
if err := sparql.Startup(context.Background()); err != nil {
log.Fatal(err)
}
api := &APIServer{
cfg: cfg,
sparql: sparql,
snapshots: NewGraphSnapshotService(sparql, cfg),
}
srv := &http.Server{
Addr: cfg.ListenAddr,
Handler: api.handler(),
ReadHeaderTimeout: 5 * time.Second,
}
log.Printf("backend listening on %s", cfg.ListenAddr)
log.Fatal(srv.ListenAndServe())
}

81
backend_go/models.go Normal file
View File

@@ -0,0 +1,81 @@
package main
type ErrorResponse struct {
Detail string `json:"detail"`
}
type HealthResponse struct {
Status string `json:"status"`
}
type Node struct {
ID int `json:"id"`
TermType string `json:"termType"` // "uri" | "bnode"
IRI string `json:"iri"`
Label *string `json:"label"`
X float64 `json:"x"`
Y float64 `json:"y"`
}
type Edge struct {
Source int `json:"source"`
Target int `json:"target"`
Predicate string `json:"predicate"`
}
type GraphMeta struct {
Backend string `json:"backend"`
TTLPath *string `json:"ttl_path"`
SparqlEndpoint string `json:"sparql_endpoint"`
IncludeBNodes bool `json:"include_bnodes"`
GraphQueryID string `json:"graph_query_id"`
NodeLimit int `json:"node_limit"`
EdgeLimit int `json:"edge_limit"`
Nodes int `json:"nodes"`
Edges int `json:"edges"`
}
type GraphResponse struct {
Nodes []Node `json:"nodes"`
Edges []Edge `json:"edges"`
Meta *GraphMeta `json:"meta"`
}
type StatsResponse struct {
Backend string `json:"backend"`
TTLPath *string `json:"ttl_path"`
SparqlEndpoint *string `json:"sparql_endpoint"`
ParsedTriples int `json:"parsed_triples"`
Nodes int `json:"nodes"`
Edges int `json:"edges"`
}
type SparqlQueryRequest struct {
Query string `json:"query"`
}
type NeighborsRequest struct {
SelectedIDs []int `json:"selected_ids"`
NodeLimit *int `json:"node_limit,omitempty"`
EdgeLimit *int `json:"edge_limit,omitempty"`
GraphQueryID *string `json:"graph_query_id,omitempty"`
}
type NeighborsResponse struct {
SelectedIDs []int `json:"selected_ids"`
NeighborIDs []int `json:"neighbor_ids"`
}
type SelectionQueryRequest struct {
QueryID string `json:"query_id"`
SelectedIDs []int `json:"selected_ids"`
NodeLimit *int `json:"node_limit,omitempty"`
EdgeLimit *int `json:"edge_limit,omitempty"`
GraphQueryID *string `json:"graph_query_id,omitempty"`
}
type SelectionQueryResponse struct {
QueryID string `json:"query_id"`
SelectedIDs []int `json:"selected_ids"`
NeighborIDs []int `json:"neighbor_ids"`
}

View File

@@ -0,0 +1,102 @@
package selection_queries
import (
"encoding/json"
"fmt"
"sort"
"strings"
)
func nodeKey(termType, iri string) string {
return termType + "\x00" + iri
}
func valuesTerm(n NodeRef) string {
if n.TermType == "uri" {
if n.IRI == "" {
return ""
}
return "<" + n.IRI + ">"
}
if n.TermType == "bnode" {
if n.IRI == "" {
return ""
}
if strings.HasPrefix(n.IRI, "_:") {
return n.IRI
}
return "_:" + n.IRI
}
return ""
}
func termKeyFromSparqlTerm(term sparqlTerm, includeBNodes bool) (string, bool) {
if term.Type == "" || term.Value == "" {
return "", false
}
if term.Type == "literal" {
return "", false
}
if term.Type == "bnode" {
if !includeBNodes {
return "", false
}
return nodeKey("bnode", "_:"+term.Value), true
}
if term.Type == "uri" {
return nodeKey("uri", term.Value), true
}
return "", false
}
func selectedNodesFromIDs(idx Index, selectedIDs []int, includeBNodes bool) ([]NodeRef, map[int]struct{}) {
out := make([]NodeRef, 0, len(selectedIDs))
set := make(map[int]struct{}, len(selectedIDs))
for _, nid := range selectedIDs {
n, ok := idx.IDToNode[nid]
if !ok {
continue
}
if n.TermType == "bnode" && !includeBNodes {
continue
}
out = append(out, n)
set[nid] = struct{}{}
}
return out, set
}
func idsFromBindings(raw []byte, varName string, idx Index, selectedSet map[int]struct{}, includeBNodes bool) ([]int, error) {
var res sparqlResponse
if err := json.Unmarshal(raw, &res); err != nil {
return nil, fmt.Errorf("failed to parse SPARQL JSON: %w", err)
}
neighborSet := make(map[int]struct{})
for _, b := range res.Results.Bindings {
term, ok := b[varName]
if !ok {
continue
}
key, ok := termKeyFromSparqlTerm(term, includeBNodes)
if !ok {
continue
}
nid, ok := idx.KeyToID[key]
if !ok {
continue
}
if _, sel := selectedSet[nid]; sel {
continue
}
neighborSet[nid] = struct{}{}
}
ids := make([]int, 0, len(neighborSet))
for nid := range neighborSet {
ids = append(ids, nid)
}
sort.Ints(ids)
return ids, nil
}

View File

@@ -0,0 +1,77 @@
package selection_queries
import (
"context"
"fmt"
"strings"
)
func neighborsQuery(selectedNodes []NodeRef, includeBNodes bool) string {
valuesTerms := make([]string, 0, len(selectedNodes))
for _, n := range selectedNodes {
t := valuesTerm(n)
if t == "" {
continue
}
valuesTerms = append(valuesTerms, t)
}
if len(valuesTerms) == 0 {
return "SELECT ?nbr WHERE { FILTER(false) }"
}
bnodeFilter := ""
if !includeBNodes {
bnodeFilter = "FILTER(!isBlank(?nbr))"
}
values := strings.Join(valuesTerms, " ")
return fmt.Sprintf(`
PREFIX rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#>
PREFIX rdfs: <http://www.w3.org/2000/01/rdf-schema#>
PREFIX owl: <http://www.w3.org/2002/07/owl#>
SELECT DISTINCT ?nbr
WHERE {
VALUES ?sel { %s }
{
?sel rdf:type ?o .
?o rdf:type owl:Class .
BIND(?o AS ?nbr)
}
UNION
{
?s rdf:type ?sel .
?sel rdf:type owl:Class .
BIND(?s AS ?nbr)
}
UNION
{
?sel rdfs:subClassOf ?o .
BIND(?o AS ?nbr)
}
UNION
{
?s rdfs:subClassOf ?sel .
BIND(?s AS ?nbr)
}
FILTER(!isLiteral(?nbr))
FILTER(?nbr != ?sel)
%s
}
`, values, bnodeFilter)
}
func runNeighbors(ctx context.Context, q Querier, idx Index, selectedIDs []int, includeBNodes bool) ([]int, error) {
selectedNodes, selectedSet := selectedNodesFromIDs(idx, selectedIDs, includeBNodes)
if len(selectedNodes) == 0 {
return []int{}, nil
}
raw, err := q.Query(ctx, neighborsQuery(selectedNodes, includeBNodes))
if err != nil {
return nil, err
}
return idsFromBindings(raw, "nbr", idx, selectedSet, includeBNodes)
}

View File

@@ -0,0 +1,33 @@
package selection_queries
var definitions = []Definition{
{
Meta: Meta{ID: "neighbors", Label: "Neighbors"},
Run: runNeighbors,
},
{
Meta: Meta{ID: "superclasses", Label: "Superclasses"},
Run: runSuperclasses,
},
{
Meta: Meta{ID: "subclasses", Label: "Subclasses"},
Run: runSubclasses,
},
}
func List() []Meta {
out := make([]Meta, 0, len(definitions))
for _, d := range definitions {
out = append(out, d.Meta)
}
return out
}
func Get(id string) (Definition, bool) {
for _, d := range definitions {
if d.Meta.ID == id {
return d, true
}
}
return Definition{}, false
}

View File

@@ -0,0 +1,14 @@
package selection_queries
type sparqlTerm struct {
Type string `json:"type"`
Value string `json:"value"`
Lang string `json:"xml:lang,omitempty"`
}
type sparqlResponse struct {
Results struct {
Bindings []map[string]sparqlTerm `json:"bindings"`
} `json:"results"`
}

View File

@@ -0,0 +1,55 @@
package selection_queries
import (
"context"
"fmt"
"strings"
)
func subclassesQuery(selectedNodes []NodeRef, includeBNodes bool) string {
valuesTerms := make([]string, 0, len(selectedNodes))
for _, n := range selectedNodes {
t := valuesTerm(n)
if t == "" {
continue
}
valuesTerms = append(valuesTerms, t)
}
if len(valuesTerms) == 0 {
return "SELECT ?nbr WHERE { FILTER(false) }"
}
bnodeFilter := ""
if !includeBNodes {
bnodeFilter = "FILTER(!isBlank(?nbr))"
}
values := strings.Join(valuesTerms, " ")
return fmt.Sprintf(`
PREFIX rdfs: <http://www.w3.org/2000/01/rdf-schema#>
SELECT DISTINCT ?nbr
WHERE {
VALUES ?sel { %s }
?nbr rdfs:subClassOf ?sel .
FILTER(!isLiteral(?nbr))
FILTER(?nbr != ?sel)
%s
}
`, values, bnodeFilter)
}
func runSubclasses(ctx context.Context, q Querier, idx Index, selectedIDs []int, includeBNodes bool) ([]int, error) {
selectedNodes, selectedSet := selectedNodesFromIDs(idx, selectedIDs, includeBNodes)
if len(selectedNodes) == 0 {
return []int{}, nil
}
raw, err := q.Query(ctx, subclassesQuery(selectedNodes, includeBNodes))
if err != nil {
return nil, err
}
return idsFromBindings(raw, "nbr", idx, selectedSet, includeBNodes)
}

View File

@@ -0,0 +1,55 @@
package selection_queries
import (
"context"
"fmt"
"strings"
)
func superclassesQuery(selectedNodes []NodeRef, includeBNodes bool) string {
valuesTerms := make([]string, 0, len(selectedNodes))
for _, n := range selectedNodes {
t := valuesTerm(n)
if t == "" {
continue
}
valuesTerms = append(valuesTerms, t)
}
if len(valuesTerms) == 0 {
return "SELECT ?nbr WHERE { FILTER(false) }"
}
bnodeFilter := ""
if !includeBNodes {
bnodeFilter = "FILTER(!isBlank(?nbr))"
}
values := strings.Join(valuesTerms, " ")
return fmt.Sprintf(`
PREFIX rdfs: <http://www.w3.org/2000/01/rdf-schema#>
SELECT DISTINCT ?nbr
WHERE {
VALUES ?sel { %s }
?sel rdfs:subClassOf ?nbr .
FILTER(!isLiteral(?nbr))
FILTER(?nbr != ?sel)
%s
}
`, values, bnodeFilter)
}
func runSuperclasses(ctx context.Context, q Querier, idx Index, selectedIDs []int, includeBNodes bool) ([]int, error) {
selectedNodes, selectedSet := selectedNodesFromIDs(idx, selectedIDs, includeBNodes)
if len(selectedNodes) == 0 {
return []int{}, nil
}
raw, err := q.Query(ctx, superclassesQuery(selectedNodes, includeBNodes))
if err != nil {
return nil, err
}
return idsFromBindings(raw, "nbr", idx, selectedSet, includeBNodes)
}

View File

@@ -0,0 +1,29 @@
package selection_queries
import "context"
type Querier interface {
Query(ctx context.Context, query string) ([]byte, error)
}
type NodeRef struct {
ID int
TermType string // "uri" | "bnode"
IRI string
}
type Index struct {
IDToNode map[int]NodeRef
KeyToID map[string]int
}
type Meta struct {
ID string `json:"id"`
Label string `json:"label"`
}
type Definition struct {
Meta Meta
Run func(ctx context.Context, q Querier, idx Index, selectedIDs []int, includeBNodes bool) ([]int, error)
}

View File

@@ -0,0 +1,33 @@
package main
import (
"context"
"fmt"
selectionqueries "visualizador_instanciados/backend_go/selection_queries"
)
func runSelectionQuery(
ctx context.Context,
sparql *AnzoGraphClient,
snapshot GraphResponse,
queryID string,
selectedIDs []int,
includeBNodes bool,
) ([]int, error) {
def, ok := selectionqueries.Get(queryID)
if !ok {
return nil, fmt.Errorf("unknown query_id: %s", queryID)
}
idToNode := make(map[int]selectionqueries.NodeRef, len(snapshot.Nodes))
keyToID := make(map[string]int, len(snapshot.Nodes))
for _, n := range snapshot.Nodes {
nr := selectionqueries.NodeRef{ID: n.ID, TermType: n.TermType, IRI: n.IRI}
idToNode[n.ID] = nr
keyToID[n.TermType+"\x00"+n.IRI] = n.ID
}
return def.Run(ctx, sparql, selectionqueries.Index{IDToNode: idToNode, KeyToID: keyToID}, selectedIDs, includeBNodes)
}

301
backend_go/server.go Normal file
View File

@@ -0,0 +1,301 @@
package main
import (
"fmt"
"net/http"
"strconv"
"strings"
graphqueries "visualizador_instanciados/backend_go/graph_queries"
selectionqueries "visualizador_instanciados/backend_go/selection_queries"
)
type APIServer struct {
cfg Config
sparql *AnzoGraphClient
snapshots *GraphSnapshotService
}
func (s *APIServer) handler() http.Handler {
mux := http.NewServeMux()
mux.HandleFunc("/api/health", s.handleHealth)
mux.HandleFunc("/api/stats", s.handleStats)
mux.HandleFunc("/api/sparql", s.handleSparql)
mux.HandleFunc("/api/graph", s.handleGraph)
mux.HandleFunc("/api/graph_queries", s.handleGraphQueries)
mux.HandleFunc("/api/selection_queries", s.handleSelectionQueries)
mux.HandleFunc("/api/selection_query", s.handleSelectionQuery)
mux.HandleFunc("/api/neighbors", s.handleNeighbors)
return s.corsMiddleware(mux)
}
func (s *APIServer) corsMiddleware(next http.Handler) http.Handler {
origins := s.cfg.corsOriginList()
allowAll := len(origins) == 1 && origins[0] == "*"
allowed := make(map[string]struct{}, len(origins))
for _, o := range origins {
allowed[o] = struct{}{}
}
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
origin := r.Header.Get("Origin")
if origin != "" {
if allowAll {
w.Header().Set("Access-Control-Allow-Origin", "*")
} else if _, ok := allowed[origin]; ok {
w.Header().Set("Access-Control-Allow-Origin", origin)
w.Header().Add("Vary", "Origin")
}
w.Header().Set("Access-Control-Allow-Methods", "GET,POST,OPTIONS")
w.Header().Set("Access-Control-Allow-Headers", "*")
}
if r.Method == http.MethodOptions {
w.WriteHeader(http.StatusNoContent)
return
}
next.ServeHTTP(w, r)
})
}
func (s *APIServer) handleHealth(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodGet {
w.WriteHeader(http.StatusMethodNotAllowed)
return
}
writeJSON(w, http.StatusOK, HealthResponse{Status: "ok"})
}
func (s *APIServer) handleStats(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodGet {
w.WriteHeader(http.StatusMethodNotAllowed)
return
}
ctx := r.Context()
snap, err := s.snapshots.Get(ctx, s.cfg.DefaultNodeLimit, s.cfg.DefaultEdgeLimit, graphqueries.DefaultID)
if err != nil {
writeError(w, http.StatusInternalServerError, err.Error())
return
}
endpoint := snap.Meta.SparqlEndpoint
writeJSON(w, http.StatusOK, StatsResponse{
Backend: snap.Meta.Backend,
TTLPath: snap.Meta.TTLPath,
SparqlEndpoint: &endpoint,
ParsedTriples: len(snap.Edges),
Nodes: len(snap.Nodes),
Edges: len(snap.Edges),
})
}
func (s *APIServer) handleSparql(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
w.WriteHeader(http.StatusMethodNotAllowed)
return
}
var req SparqlQueryRequest
if err := decodeJSON(r.Body, &req); err != nil || strings.TrimSpace(req.Query) == "" {
writeError(w, http.StatusUnprocessableEntity, "invalid request body")
return
}
raw, err := s.sparql.Query(r.Context(), req.Query)
if err != nil {
writeError(w, http.StatusBadGateway, err.Error())
return
}
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
_, _ = w.Write(raw)
}
func (s *APIServer) handleGraph(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodGet {
w.WriteHeader(http.StatusMethodNotAllowed)
return
}
nodeLimit, err := intQuery(r, "node_limit", s.cfg.DefaultNodeLimit)
if err != nil || nodeLimit < 1 || nodeLimit > s.cfg.MaxNodeLimit {
writeError(w, http.StatusUnprocessableEntity, fmt.Sprintf("node_limit must be between 1 and %d", s.cfg.MaxNodeLimit))
return
}
edgeLimit, err := intQuery(r, "edge_limit", s.cfg.DefaultEdgeLimit)
if err != nil || edgeLimit < 1 || edgeLimit > s.cfg.MaxEdgeLimit {
writeError(w, http.StatusUnprocessableEntity, fmt.Sprintf("edge_limit must be between 1 and %d", s.cfg.MaxEdgeLimit))
return
}
graphQueryID := strings.TrimSpace(r.URL.Query().Get("graph_query_id"))
if graphQueryID == "" {
graphQueryID = graphqueries.DefaultID
}
if _, ok := graphqueries.Get(graphQueryID); !ok {
writeError(w, http.StatusUnprocessableEntity, "unknown graph_query_id")
return
}
snap, err := s.snapshots.Get(r.Context(), nodeLimit, edgeLimit, graphQueryID)
if err != nil {
if _, ok := err.(*CycleError); ok {
writeError(w, http.StatusUnprocessableEntity, err.Error())
return
}
writeError(w, http.StatusInternalServerError, err.Error())
return
}
writeJSON(w, http.StatusOK, snap)
}
func (s *APIServer) handleGraphQueries(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodGet {
w.WriteHeader(http.StatusMethodNotAllowed)
return
}
writeJSON(w, http.StatusOK, graphqueries.List())
}
func (s *APIServer) handleSelectionQueries(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodGet {
w.WriteHeader(http.StatusMethodNotAllowed)
return
}
writeJSON(w, http.StatusOK, selectionqueries.List())
}
func (s *APIServer) handleSelectionQuery(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
w.WriteHeader(http.StatusMethodNotAllowed)
return
}
var req SelectionQueryRequest
if err := decodeJSON(r.Body, &req); err != nil || strings.TrimSpace(req.QueryID) == "" {
writeError(w, http.StatusUnprocessableEntity, "invalid request body")
return
}
if _, ok := selectionqueries.Get(req.QueryID); !ok {
writeError(w, http.StatusUnprocessableEntity, "unknown query_id")
return
}
if len(req.SelectedIDs) == 0 {
writeJSON(w, http.StatusOK, SelectionQueryResponse{
QueryID: req.QueryID,
SelectedIDs: req.SelectedIDs,
NeighborIDs: []int{},
})
return
}
graphQueryID := graphqueries.DefaultID
if req.GraphQueryID != nil && strings.TrimSpace(*req.GraphQueryID) != "" {
graphQueryID = strings.TrimSpace(*req.GraphQueryID)
}
if _, ok := graphqueries.Get(graphQueryID); !ok {
writeError(w, http.StatusUnprocessableEntity, "unknown graph_query_id")
return
}
nodeLimit := s.cfg.DefaultNodeLimit
edgeLimit := s.cfg.DefaultEdgeLimit
if req.NodeLimit != nil {
nodeLimit = *req.NodeLimit
}
if req.EdgeLimit != nil {
edgeLimit = *req.EdgeLimit
}
if nodeLimit < 1 || nodeLimit > s.cfg.MaxNodeLimit || edgeLimit < 1 || edgeLimit > s.cfg.MaxEdgeLimit {
writeError(w, http.StatusUnprocessableEntity, "invalid node_limit/edge_limit")
return
}
snap, err := s.snapshots.Get(r.Context(), nodeLimit, edgeLimit, graphQueryID)
if err != nil {
writeError(w, http.StatusInternalServerError, err.Error())
return
}
ids, err := runSelectionQuery(r.Context(), s.sparql, snap, req.QueryID, req.SelectedIDs, s.cfg.IncludeBNodes)
if err != nil {
writeError(w, http.StatusBadGateway, err.Error())
return
}
writeJSON(w, http.StatusOK, SelectionQueryResponse{
QueryID: req.QueryID,
SelectedIDs: req.SelectedIDs,
NeighborIDs: ids,
})
}
func (s *APIServer) handleNeighbors(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
w.WriteHeader(http.StatusMethodNotAllowed)
return
}
var req NeighborsRequest
if err := decodeJSON(r.Body, &req); err != nil {
writeError(w, http.StatusUnprocessableEntity, "invalid request body")
return
}
if len(req.SelectedIDs) == 0 {
writeJSON(w, http.StatusOK, NeighborsResponse{SelectedIDs: req.SelectedIDs, NeighborIDs: []int{}})
return
}
graphQueryID := graphqueries.DefaultID
if req.GraphQueryID != nil && strings.TrimSpace(*req.GraphQueryID) != "" {
graphQueryID = strings.TrimSpace(*req.GraphQueryID)
}
if _, ok := graphqueries.Get(graphQueryID); !ok {
writeError(w, http.StatusUnprocessableEntity, "unknown graph_query_id")
return
}
nodeLimit := s.cfg.DefaultNodeLimit
edgeLimit := s.cfg.DefaultEdgeLimit
if req.NodeLimit != nil {
nodeLimit = *req.NodeLimit
}
if req.EdgeLimit != nil {
edgeLimit = *req.EdgeLimit
}
if nodeLimit < 1 || nodeLimit > s.cfg.MaxNodeLimit || edgeLimit < 1 || edgeLimit > s.cfg.MaxEdgeLimit {
writeError(w, http.StatusUnprocessableEntity, "invalid node_limit/edge_limit")
return
}
snap, err := s.snapshots.Get(r.Context(), nodeLimit, edgeLimit, graphQueryID)
if err != nil {
writeError(w, http.StatusInternalServerError, err.Error())
return
}
nbrs, err := runSelectionQuery(r.Context(), s.sparql, snap, "neighbors", req.SelectedIDs, s.cfg.IncludeBNodes)
if err != nil {
writeError(w, http.StatusBadGateway, err.Error())
return
}
writeJSON(w, http.StatusOK, NeighborsResponse{SelectedIDs: req.SelectedIDs, NeighborIDs: nbrs})
}
func intQuery(r *http.Request, name string, def int) (int, error) {
raw := strings.TrimSpace(r.URL.Query().Get(name))
if raw == "" {
return def, nil
}
n, err := strconv.Atoi(raw)
if err != nil {
return 0, err
}
return n, nil
}

View File

@@ -0,0 +1,76 @@
package main
import (
"context"
"sync"
)
type snapshotKey struct {
NodeLimit int
EdgeLimit int
IncludeBNodes bool
GraphQueryID string
}
type snapshotInflight struct {
ready chan struct{}
snapshot GraphResponse
err error
}
type GraphSnapshotService struct {
sparql *AnzoGraphClient
cfg Config
mu sync.Mutex
cache map[snapshotKey]GraphResponse
inflight map[snapshotKey]*snapshotInflight
}
func NewGraphSnapshotService(sparql *AnzoGraphClient, cfg Config) *GraphSnapshotService {
return &GraphSnapshotService{
sparql: sparql,
cfg: cfg,
cache: make(map[snapshotKey]GraphResponse),
inflight: make(map[snapshotKey]*snapshotInflight),
}
}
func (s *GraphSnapshotService) Get(ctx context.Context, nodeLimit int, edgeLimit int, graphQueryID string) (GraphResponse, error) {
key := snapshotKey{NodeLimit: nodeLimit, EdgeLimit: edgeLimit, IncludeBNodes: s.cfg.IncludeBNodes, GraphQueryID: graphQueryID}
s.mu.Lock()
if snap, ok := s.cache[key]; ok {
s.mu.Unlock()
return snap, nil
}
if inf, ok := s.inflight[key]; ok {
ready := inf.ready
s.mu.Unlock()
select {
case <-ctx.Done():
return GraphResponse{}, ctx.Err()
case <-ready:
return inf.snapshot, inf.err
}
}
inf := &snapshotInflight{ready: make(chan struct{})}
s.inflight[key] = inf
s.mu.Unlock()
snap, err := fetchGraphSnapshot(ctx, s.sparql, s.cfg, nodeLimit, edgeLimit, graphQueryID)
s.mu.Lock()
inf.snapshot = snap
inf.err = err
delete(s.inflight, key)
if err == nil {
s.cache[key] = snap
}
close(inf.ready)
s.mu.Unlock()
return snap, err
}

169
backend_go/sparql.go Normal file
View File

@@ -0,0 +1,169 @@
package main
import (
"context"
"encoding/base64"
"fmt"
"io"
"net/http"
"net/url"
"strings"
"time"
)
type AnzoGraphClient struct {
cfg Config
endpoint string
authHeader string
client *http.Client
}
func NewAnzoGraphClient(cfg Config) *AnzoGraphClient {
endpoint := cfg.EffectiveSparqlEndpoint()
authHeader := ""
user := strings.TrimSpace(cfg.SparqlUser)
pass := strings.TrimSpace(cfg.SparqlPass)
if user != "" && pass != "" {
token := base64.StdEncoding.EncodeToString([]byte(user + ":" + pass))
authHeader = "Basic " + token
}
return &AnzoGraphClient{
cfg: cfg,
endpoint: endpoint,
authHeader: authHeader,
client: &http.Client{},
}
}
func (c *AnzoGraphClient) Startup(ctx context.Context) error {
if err := c.waitReady(ctx); err != nil {
return err
}
if c.cfg.SparqlClearOnStart {
if err := c.update(ctx, "CLEAR ALL"); err != nil {
return err
}
if err := c.waitReady(ctx); err != nil {
return err
}
}
if c.cfg.SparqlLoadOnStart {
df := strings.TrimSpace(c.cfg.SparqlDataFile)
if df == "" {
return fmt.Errorf("SPARQL_LOAD_ON_START=true but SPARQL_DATA_FILE is not set")
}
giri := strings.TrimSpace(c.cfg.SparqlGraphIRI)
if giri != "" {
if err := c.update(ctx, fmt.Sprintf("LOAD <%s> INTO GRAPH <%s>", df, giri)); err != nil {
return err
}
} else {
if err := c.update(ctx, fmt.Sprintf("LOAD <%s>", df)); err != nil {
return err
}
}
if err := c.waitReady(ctx); err != nil {
return err
}
}
return nil
}
func (c *AnzoGraphClient) Shutdown(ctx context.Context) error {
_ = ctx
return nil
}
func (c *AnzoGraphClient) Query(ctx context.Context, query string) ([]byte, error) {
return c.queryWithTimeout(ctx, query, c.cfg.SparqlTimeout)
}
func (c *AnzoGraphClient) queryWithTimeout(ctx context.Context, query string, timeout time.Duration) ([]byte, error) {
ctx2, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
form := url.Values{}
form.Set("query", query)
req, err := http.NewRequestWithContext(ctx2, http.MethodPost, c.endpoint, strings.NewReader(form.Encode()))
if err != nil {
return nil, err
}
req.Header.Set("Content-Type", "application/x-www-form-urlencoded")
req.Header.Set("Accept", "application/sparql-results+json")
if c.authHeader != "" {
req.Header.Set("Authorization", c.authHeader)
}
resp, err := c.client.Do(req)
if err != nil {
return nil, err
}
defer resp.Body.Close()
body, err := io.ReadAll(resp.Body)
if err != nil {
return nil, err
}
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
return nil, fmt.Errorf("sparql query failed: %s: %s", resp.Status, strings.TrimSpace(string(body)))
}
return body, nil
}
func (c *AnzoGraphClient) update(ctx context.Context, update string) error {
ctx2, cancel := context.WithTimeout(ctx, c.cfg.SparqlTimeout)
defer cancel()
req, err := http.NewRequestWithContext(ctx2, http.MethodPost, c.endpoint, strings.NewReader(update))
if err != nil {
return err
}
req.Header.Set("Content-Type", "application/sparql-update")
req.Header.Set("Accept", "application/json")
if c.authHeader != "" {
req.Header.Set("Authorization", c.authHeader)
}
resp, err := c.client.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
body, _ := io.ReadAll(resp.Body)
return fmt.Errorf("sparql update failed: %s: %s", resp.Status, strings.TrimSpace(string(body)))
}
return nil
}
func (c *AnzoGraphClient) waitReady(ctx context.Context) error {
var lastErr error
for i := 0; i < c.cfg.SparqlReadyRetries; i++ {
select {
case <-ctx.Done():
if lastErr != nil {
return fmt.Errorf("anzograph not ready at %s: %w", c.endpoint, lastErr)
}
return ctx.Err()
default:
}
body, err := c.queryWithTimeout(ctx, "ASK WHERE { ?s ?p ?o }", c.cfg.SparqlReadyTimeout)
if err == nil {
// Ensure it's JSON, not HTML/text during boot.
if strings.HasPrefix(strings.TrimSpace(string(body)), "{") {
return nil
}
err = fmt.Errorf("unexpected readiness response: %s", strings.TrimSpace(string(body)))
}
lastErr = err
time.Sleep(c.cfg.SparqlReadyDelay)
}
return fmt.Errorf("anzograph not ready at %s: %w", c.endpoint, lastErr)
}

View File

@@ -0,0 +1,13 @@
package main
type sparqlTerm struct {
Type string `json:"type"`
Value string `json:"value"`
Lang string `json:"xml:lang,omitempty"`
}
type sparqlResponse struct {
Results struct {
Bindings []map[string]sparqlTerm `json:"bindings"`
} `json:"results"`
}