342 lines
8.5 KiB
Go
342 lines
8.5 KiB
Go
package main
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"log"
|
|
"runtime"
|
|
"runtime/debug"
|
|
"sort"
|
|
"strings"
|
|
"time"
|
|
|
|
graphqueries "visualizador_instanciados/backend_go/graph_queries"
|
|
)
|
|
|
|
const (
|
|
rdfsLabelIRI = "http://www.w3.org/2000/01/rdf-schema#label"
|
|
)
|
|
|
|
func fetchGraphSnapshot(
|
|
ctx context.Context,
|
|
sparql *AnzoGraphClient,
|
|
cfg Config,
|
|
nodeLimit int,
|
|
edgeLimit int,
|
|
graphQueryID string,
|
|
) (GraphResponse, error) {
|
|
start := time.Now()
|
|
logStats := func(stage string) {
|
|
if !cfg.LogSnapshotTimings {
|
|
return
|
|
}
|
|
var ms runtime.MemStats
|
|
runtime.ReadMemStats(&ms)
|
|
log.Printf(
|
|
"[snapshot] %s graph_query_id=%s node_limit=%d edge_limit=%d elapsed=%s alloc=%dMB heap_inuse=%dMB sys=%dMB numgc=%d",
|
|
stage,
|
|
graphQueryID,
|
|
nodeLimit,
|
|
edgeLimit,
|
|
time.Since(start).Truncate(time.Millisecond),
|
|
ms.Alloc/1024/1024,
|
|
ms.HeapInuse/1024/1024,
|
|
ms.Sys/1024/1024,
|
|
ms.NumGC,
|
|
)
|
|
}
|
|
|
|
def, ok := graphqueries.Get(graphQueryID)
|
|
if !ok {
|
|
return GraphResponse{}, fmt.Errorf("unknown graph_query_id: %s", graphQueryID)
|
|
}
|
|
|
|
// Build predicate dictionary (predicate IRI -> uint32 ID) before fetching edges.
|
|
preds, err := func() (*PredicateDict, error) {
|
|
logStats("predicates_query_start")
|
|
predQ := def.PredicateQuery(cfg.IncludeBNodes)
|
|
t0 := time.Now()
|
|
rawPred, err := sparql.Query(ctx, predQ)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("predicates query failed: %w", err)
|
|
}
|
|
if cfg.LogSnapshotTimings {
|
|
log.Printf("[snapshot] predicates_query_returned bytes=%d query_time=%s", len(rawPred), time.Since(t0).Truncate(time.Millisecond))
|
|
}
|
|
var predRes sparqlResponse
|
|
t1 := time.Now()
|
|
if err := json.Unmarshal(rawPred, &predRes); err != nil {
|
|
return nil, fmt.Errorf("predicates unmarshal failed: %w", err)
|
|
}
|
|
if cfg.LogSnapshotTimings {
|
|
log.Printf("[snapshot] predicates_unmarshal_done bindings=%d unmarshal_time=%s", len(predRes.Results.Bindings), time.Since(t1).Truncate(time.Millisecond))
|
|
}
|
|
predicateIRIs := make([]string, 0, len(predRes.Results.Bindings))
|
|
for _, b := range predRes.Results.Bindings {
|
|
pTerm, ok := b["p"]
|
|
if !ok || pTerm.Type != "uri" || pTerm.Value == "" {
|
|
continue
|
|
}
|
|
predicateIRIs = append(predicateIRIs, pTerm.Value)
|
|
}
|
|
logStats("predicates_dict_built")
|
|
return NewPredicateDict(predicateIRIs), nil
|
|
}()
|
|
if err != nil {
|
|
return GraphResponse{}, err
|
|
}
|
|
|
|
// Fetch edges in batches to avoid decoding a single huge SPARQL JSON response.
|
|
logStats("edges_batched_start")
|
|
batchSize := cfg.EdgeBatchSize
|
|
acc := newGraphAccumulator(nodeLimit, cfg.IncludeBNodes, min(edgeLimit, batchSize), preds)
|
|
|
|
totalBindings := 0
|
|
convAllT0 := time.Now()
|
|
for batch, offset := 0, 0; offset < edgeLimit; batch, offset = batch+1, offset+batchSize {
|
|
limit := batchSize
|
|
remaining := edgeLimit - offset
|
|
if remaining < limit {
|
|
limit = remaining
|
|
}
|
|
|
|
logStats(fmt.Sprintf("edges_batch_start batch=%d offset=%d limit=%d", batch, offset, limit))
|
|
bindings, err := func() ([]map[string]sparqlTerm, error) {
|
|
edgesQ := def.EdgeQuery(limit, offset, cfg.IncludeBNodes)
|
|
t0 := time.Now()
|
|
raw, err := sparql.Query(ctx, edgesQ)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("edges query failed: %w", err)
|
|
}
|
|
if cfg.LogSnapshotTimings {
|
|
log.Printf("[snapshot] edges_batch_query_returned batch=%d offset=%d limit=%d bytes=%d query_time=%s", batch, offset, limit, len(raw), time.Since(t0).Truncate(time.Millisecond))
|
|
}
|
|
|
|
var res sparqlResponse
|
|
t1 := time.Now()
|
|
if err := json.Unmarshal(raw, &res); err != nil {
|
|
return nil, fmt.Errorf("edges unmarshal failed: %w", err)
|
|
}
|
|
if cfg.LogSnapshotTimings {
|
|
log.Printf("[snapshot] edges_batch_unmarshal_done batch=%d bindings=%d unmarshal_time=%s", batch, len(res.Results.Bindings), time.Since(t1).Truncate(time.Millisecond))
|
|
}
|
|
return res.Results.Bindings, nil
|
|
}()
|
|
if err != nil {
|
|
return GraphResponse{}, fmt.Errorf("edges batch=%d offset=%d limit=%d: %w", batch, offset, limit, err)
|
|
}
|
|
|
|
got := len(bindings)
|
|
totalBindings += got
|
|
if got == 0 {
|
|
bindings = nil
|
|
logStats(fmt.Sprintf("edges_batch_done_empty batch=%d offset=%d", batch, offset))
|
|
break
|
|
}
|
|
|
|
convT0 := time.Now()
|
|
acc.addBindings(bindings)
|
|
if cfg.LogSnapshotTimings {
|
|
log.Printf(
|
|
"[snapshot] edges_batch_convert_done batch=%d got_bindings=%d total_bindings=%d nodes=%d edges=%d convert_time=%s",
|
|
batch,
|
|
got,
|
|
totalBindings,
|
|
len(acc.nodes),
|
|
len(acc.edges),
|
|
time.Since(convT0).Truncate(time.Millisecond),
|
|
)
|
|
}
|
|
|
|
// Make the batch eligible for GC.
|
|
bindings = nil
|
|
logStats(fmt.Sprintf("edges_batch_done batch=%d offset=%d", batch, offset))
|
|
if cfg.FreeOSMemoryAfterSnapshot {
|
|
debug.FreeOSMemory()
|
|
logStats(fmt.Sprintf("edges_batch_free_os_memory_done batch=%d offset=%d", batch, offset))
|
|
}
|
|
|
|
if got < limit {
|
|
break
|
|
}
|
|
}
|
|
if cfg.LogSnapshotTimings {
|
|
log.Printf("[snapshot] convert_batches_done total_bindings=%d total_time=%s", totalBindings, time.Since(convAllT0).Truncate(time.Millisecond))
|
|
}
|
|
logStats("edges_batched_done")
|
|
|
|
nodes := acc.nodes
|
|
edges := acc.edges
|
|
|
|
// Layout: invert edges for hierarchy (target -> source).
|
|
hierEdges := make([][2]int, 0, len(edges))
|
|
for _, e := range edges {
|
|
hierEdges = append(hierEdges, [2]int{int(e.Target), int(e.Source)})
|
|
}
|
|
|
|
layers, cycleErr := levelSynchronousKahnLayers(len(nodes), hierEdges)
|
|
if cycleErr != nil {
|
|
sample := make([]string, 0, 20)
|
|
for _, nid := range cycleErr.RemainingNodeIDs {
|
|
if len(sample) >= 20 {
|
|
break
|
|
}
|
|
if nid >= 0 && nid < len(nodes) {
|
|
sample = append(sample, nodes[nid].IRI)
|
|
}
|
|
}
|
|
cycleErr.RemainingIRISample = sample
|
|
return GraphResponse{}, cycleErr
|
|
}
|
|
|
|
idToIRI := make([]string, len(nodes))
|
|
for i := range nodes {
|
|
idToIRI[i] = nodes[i].IRI
|
|
}
|
|
for _, layer := range layers {
|
|
sortLayerByIRI(layer, idToIRI)
|
|
}
|
|
|
|
xs, ys := radialPositionsFromLayers(len(nodes), layers, 5000.0)
|
|
for i := range nodes {
|
|
nodes[i].X = xs[i]
|
|
nodes[i].Y = ys[i]
|
|
}
|
|
|
|
// Attach labels for URI nodes.
|
|
iris := make([]string, 0)
|
|
for _, n := range nodes {
|
|
if n.TermType == "uri" && n.IRI != "" {
|
|
iris = append(iris, n.IRI)
|
|
}
|
|
}
|
|
if len(iris) > 0 {
|
|
labelByIRI, err := fetchRDFSLabels(ctx, sparql, iris, 500)
|
|
if err != nil {
|
|
return GraphResponse{}, fmt.Errorf("fetch rdfs:label failed: %w", err)
|
|
}
|
|
for i := range nodes {
|
|
if nodes[i].TermType != "uri" {
|
|
continue
|
|
}
|
|
lbl, ok := labelByIRI[nodes[i].IRI]
|
|
if !ok {
|
|
continue
|
|
}
|
|
val := lbl
|
|
nodes[i].Label = &val
|
|
}
|
|
}
|
|
|
|
meta := &GraphMeta{
|
|
Backend: "anzograph",
|
|
TTLPath: nil,
|
|
SparqlEndpoint: cfg.EffectiveSparqlEndpoint(),
|
|
IncludeBNodes: cfg.IncludeBNodes,
|
|
GraphQueryID: graphQueryID,
|
|
Predicates: preds.IRIs(),
|
|
NodeLimit: nodeLimit,
|
|
EdgeLimit: edgeLimit,
|
|
Nodes: len(nodes),
|
|
Edges: len(edges),
|
|
}
|
|
|
|
return GraphResponse{Nodes: nodes, Edges: edges, Meta: meta}, nil
|
|
}
|
|
|
|
type bestLabel struct {
|
|
score int
|
|
value string
|
|
}
|
|
|
|
func fetchRDFSLabels(
|
|
ctx context.Context,
|
|
sparql *AnzoGraphClient,
|
|
iris []string,
|
|
batchSize int,
|
|
) (map[string]string, error) {
|
|
best := make(map[string]bestLabel)
|
|
|
|
for i := 0; i < len(iris); i += batchSize {
|
|
end := i + batchSize
|
|
if end > len(iris) {
|
|
end = len(iris)
|
|
}
|
|
batch := iris[i:end]
|
|
|
|
values := make([]string, 0, len(batch))
|
|
for _, u := range batch {
|
|
values = append(values, "<"+u+">")
|
|
}
|
|
|
|
q := fmt.Sprintf(`
|
|
SELECT ?s ?label
|
|
WHERE {
|
|
VALUES ?s { %s }
|
|
?s <%s> ?label .
|
|
}
|
|
`, strings.Join(values, " "), rdfsLabelIRI)
|
|
|
|
raw, err := sparql.Query(ctx, q)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
var res sparqlResponse
|
|
if err := json.Unmarshal(raw, &res); err != nil {
|
|
return nil, fmt.Errorf("failed to parse SPARQL JSON: %w", err)
|
|
}
|
|
|
|
for _, b := range res.Results.Bindings {
|
|
sTerm, ok := b["s"]
|
|
if !ok || sTerm.Value == "" {
|
|
continue
|
|
}
|
|
lblTerm, ok := b["label"]
|
|
if !ok || lblTerm.Type != "literal" || lblTerm.Value == "" {
|
|
continue
|
|
}
|
|
|
|
score := labelScore(lblTerm.Lang)
|
|
prev, ok := best[sTerm.Value]
|
|
if !ok || score > prev.score {
|
|
best[sTerm.Value] = bestLabel{score: score, value: lblTerm.Value}
|
|
}
|
|
}
|
|
}
|
|
|
|
out := make(map[string]string, len(best))
|
|
for iri, v := range best {
|
|
out[iri] = v.value
|
|
}
|
|
return out, nil
|
|
}
|
|
|
|
func labelScore(lang string) int {
|
|
lang = strings.ToLower(strings.TrimSpace(lang))
|
|
if lang == "en" {
|
|
return 3
|
|
}
|
|
if lang == "" {
|
|
return 2
|
|
}
|
|
return 1
|
|
}
|
|
|
|
func sortIntsUnique(xs []int) []int {
|
|
if len(xs) == 0 {
|
|
return xs
|
|
}
|
|
sort.Ints(xs)
|
|
out := xs[:0]
|
|
var last int
|
|
for i, v := range xs {
|
|
if i == 0 || v != last {
|
|
out = append(out, v)
|
|
}
|
|
last = v
|
|
}
|
|
return out
|
|
}
|