Visualizando todo grafo com anzograph
This commit is contained in:
@@ -4,13 +4,19 @@ import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"log"
|
||||
"runtime"
|
||||
"runtime/debug"
|
||||
"sort"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
graphqueries "visualizador_instanciados/backend_go/graph_queries"
|
||||
)
|
||||
|
||||
const rdfsLabelIRI = "http://www.w3.org/2000/01/rdf-schema#label"
|
||||
const (
|
||||
rdfsLabelIRI = "http://www.w3.org/2000/01/rdf-schema#label"
|
||||
)
|
||||
|
||||
func fetchGraphSnapshot(
|
||||
ctx context.Context,
|
||||
@@ -20,22 +26,148 @@ func fetchGraphSnapshot(
|
||||
edgeLimit int,
|
||||
graphQueryID string,
|
||||
) (GraphResponse, error) {
|
||||
start := time.Now()
|
||||
logStats := func(stage string) {
|
||||
if !cfg.LogSnapshotTimings {
|
||||
return
|
||||
}
|
||||
var ms runtime.MemStats
|
||||
runtime.ReadMemStats(&ms)
|
||||
log.Printf(
|
||||
"[snapshot] %s graph_query_id=%s node_limit=%d edge_limit=%d elapsed=%s alloc=%dMB heap_inuse=%dMB sys=%dMB numgc=%d",
|
||||
stage,
|
||||
graphQueryID,
|
||||
nodeLimit,
|
||||
edgeLimit,
|
||||
time.Since(start).Truncate(time.Millisecond),
|
||||
ms.Alloc/1024/1024,
|
||||
ms.HeapInuse/1024/1024,
|
||||
ms.Sys/1024/1024,
|
||||
ms.NumGC,
|
||||
)
|
||||
}
|
||||
|
||||
def, ok := graphqueries.Get(graphQueryID)
|
||||
if !ok {
|
||||
return GraphResponse{}, fmt.Errorf("unknown graph_query_id: %s", graphQueryID)
|
||||
}
|
||||
edgesQ := def.EdgeQuery(edgeLimit, cfg.IncludeBNodes)
|
||||
raw, err := sparql.Query(ctx, edgesQ)
|
||||
|
||||
// Build predicate dictionary (predicate IRI -> uint32 ID) before fetching edges.
|
||||
preds, err := func() (*PredicateDict, error) {
|
||||
logStats("predicates_query_start")
|
||||
predQ := def.PredicateQuery(cfg.IncludeBNodes)
|
||||
t0 := time.Now()
|
||||
rawPred, err := sparql.Query(ctx, predQ)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("predicates query failed: %w", err)
|
||||
}
|
||||
if cfg.LogSnapshotTimings {
|
||||
log.Printf("[snapshot] predicates_query_returned bytes=%d query_time=%s", len(rawPred), time.Since(t0).Truncate(time.Millisecond))
|
||||
}
|
||||
var predRes sparqlResponse
|
||||
t1 := time.Now()
|
||||
if err := json.Unmarshal(rawPred, &predRes); err != nil {
|
||||
return nil, fmt.Errorf("predicates unmarshal failed: %w", err)
|
||||
}
|
||||
if cfg.LogSnapshotTimings {
|
||||
log.Printf("[snapshot] predicates_unmarshal_done bindings=%d unmarshal_time=%s", len(predRes.Results.Bindings), time.Since(t1).Truncate(time.Millisecond))
|
||||
}
|
||||
predicateIRIs := make([]string, 0, len(predRes.Results.Bindings))
|
||||
for _, b := range predRes.Results.Bindings {
|
||||
pTerm, ok := b["p"]
|
||||
if !ok || pTerm.Type != "uri" || pTerm.Value == "" {
|
||||
continue
|
||||
}
|
||||
predicateIRIs = append(predicateIRIs, pTerm.Value)
|
||||
}
|
||||
logStats("predicates_dict_built")
|
||||
return NewPredicateDict(predicateIRIs), nil
|
||||
}()
|
||||
if err != nil {
|
||||
return GraphResponse{}, err
|
||||
}
|
||||
|
||||
var res sparqlResponse
|
||||
if err := json.Unmarshal(raw, &res); err != nil {
|
||||
return GraphResponse{}, fmt.Errorf("failed to parse SPARQL JSON: %w", err)
|
||||
}
|
||||
// Fetch edges in batches to avoid decoding a single huge SPARQL JSON response.
|
||||
logStats("edges_batched_start")
|
||||
batchSize := cfg.EdgeBatchSize
|
||||
acc := newGraphAccumulator(nodeLimit, cfg.IncludeBNodes, min(edgeLimit, batchSize), preds)
|
||||
|
||||
nodes, edges := graphFromSparqlBindings(res.Results.Bindings, nodeLimit, cfg.IncludeBNodes)
|
||||
totalBindings := 0
|
||||
convAllT0 := time.Now()
|
||||
for batch, offset := 0, 0; offset < edgeLimit; batch, offset = batch+1, offset+batchSize {
|
||||
limit := batchSize
|
||||
remaining := edgeLimit - offset
|
||||
if remaining < limit {
|
||||
limit = remaining
|
||||
}
|
||||
|
||||
logStats(fmt.Sprintf("edges_batch_start batch=%d offset=%d limit=%d", batch, offset, limit))
|
||||
bindings, err := func() ([]map[string]sparqlTerm, error) {
|
||||
edgesQ := def.EdgeQuery(limit, offset, cfg.IncludeBNodes)
|
||||
t0 := time.Now()
|
||||
raw, err := sparql.Query(ctx, edgesQ)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("edges query failed: %w", err)
|
||||
}
|
||||
if cfg.LogSnapshotTimings {
|
||||
log.Printf("[snapshot] edges_batch_query_returned batch=%d offset=%d limit=%d bytes=%d query_time=%s", batch, offset, limit, len(raw), time.Since(t0).Truncate(time.Millisecond))
|
||||
}
|
||||
|
||||
var res sparqlResponse
|
||||
t1 := time.Now()
|
||||
if err := json.Unmarshal(raw, &res); err != nil {
|
||||
return nil, fmt.Errorf("edges unmarshal failed: %w", err)
|
||||
}
|
||||
if cfg.LogSnapshotTimings {
|
||||
log.Printf("[snapshot] edges_batch_unmarshal_done batch=%d bindings=%d unmarshal_time=%s", batch, len(res.Results.Bindings), time.Since(t1).Truncate(time.Millisecond))
|
||||
}
|
||||
return res.Results.Bindings, nil
|
||||
}()
|
||||
if err != nil {
|
||||
return GraphResponse{}, fmt.Errorf("edges batch=%d offset=%d limit=%d: %w", batch, offset, limit, err)
|
||||
}
|
||||
|
||||
got := len(bindings)
|
||||
totalBindings += got
|
||||
if got == 0 {
|
||||
bindings = nil
|
||||
logStats(fmt.Sprintf("edges_batch_done_empty batch=%d offset=%d", batch, offset))
|
||||
break
|
||||
}
|
||||
|
||||
convT0 := time.Now()
|
||||
acc.addBindings(bindings)
|
||||
if cfg.LogSnapshotTimings {
|
||||
log.Printf(
|
||||
"[snapshot] edges_batch_convert_done batch=%d got_bindings=%d total_bindings=%d nodes=%d edges=%d convert_time=%s",
|
||||
batch,
|
||||
got,
|
||||
totalBindings,
|
||||
len(acc.nodes),
|
||||
len(acc.edges),
|
||||
time.Since(convT0).Truncate(time.Millisecond),
|
||||
)
|
||||
}
|
||||
|
||||
// Make the batch eligible for GC.
|
||||
bindings = nil
|
||||
logStats(fmt.Sprintf("edges_batch_done batch=%d offset=%d", batch, offset))
|
||||
if cfg.FreeOSMemoryAfterSnapshot {
|
||||
debug.FreeOSMemory()
|
||||
logStats(fmt.Sprintf("edges_batch_free_os_memory_done batch=%d offset=%d", batch, offset))
|
||||
}
|
||||
|
||||
if got < limit {
|
||||
break
|
||||
}
|
||||
}
|
||||
if cfg.LogSnapshotTimings {
|
||||
log.Printf("[snapshot] convert_batches_done total_bindings=%d total_time=%s", totalBindings, time.Since(convAllT0).Truncate(time.Millisecond))
|
||||
}
|
||||
logStats("edges_batched_done")
|
||||
|
||||
nodes := acc.nodes
|
||||
edges := acc.edges
|
||||
|
||||
// Layout: invert edges for hierarchy (target -> source).
|
||||
hierEdges := make([][2]int, 0, len(edges))
|
||||
@@ -82,7 +214,7 @@ func fetchGraphSnapshot(
|
||||
if len(iris) > 0 {
|
||||
labelByIRI, err := fetchRDFSLabels(ctx, sparql, iris, 500)
|
||||
if err != nil {
|
||||
return GraphResponse{}, err
|
||||
return GraphResponse{}, fmt.Errorf("fetch rdfs:label failed: %w", err)
|
||||
}
|
||||
for i := range nodes {
|
||||
if nodes[i].TermType != "uri" {
|
||||
@@ -103,6 +235,7 @@ func fetchGraphSnapshot(
|
||||
SparqlEndpoint: cfg.EffectiveSparqlEndpoint(),
|
||||
IncludeBNodes: cfg.IncludeBNodes,
|
||||
GraphQueryID: graphQueryID,
|
||||
Predicates: preds.IRIs(),
|
||||
NodeLimit: nodeLimit,
|
||||
EdgeLimit: edgeLimit,
|
||||
Nodes: len(nodes),
|
||||
|
||||
Reference in New Issue
Block a user