107 lines
2.8 KiB
Go
107 lines
2.8 KiB
Go
package main
|
|
|
|
import (
|
|
"context"
|
|
"log"
|
|
"sync"
|
|
)
|
|
|
|
type snapshotKey struct {
|
|
NodeLimit int
|
|
EdgeLimit int
|
|
IncludeBNodes bool
|
|
GraphQueryID string
|
|
LayoutEngine string
|
|
LayoutRootIRI string
|
|
}
|
|
|
|
type snapshotInflight struct {
|
|
ready chan struct{}
|
|
snapshot GraphResponse
|
|
err error
|
|
}
|
|
|
|
type GraphSnapshotService struct {
|
|
sparql *AnzoGraphClient
|
|
cfg Config
|
|
|
|
fetchSnapshot func(context.Context, *AnzoGraphClient, Config, int, int, string) (GraphResponse, error)
|
|
|
|
mu sync.Mutex
|
|
cache map[snapshotKey]GraphResponse
|
|
inflight map[snapshotKey]*snapshotInflight
|
|
}
|
|
|
|
func NewGraphSnapshotService(sparql *AnzoGraphClient, cfg Config) *GraphSnapshotService {
|
|
return &GraphSnapshotService{
|
|
sparql: sparql,
|
|
cfg: cfg,
|
|
fetchSnapshot: fetchGraphSnapshot,
|
|
cache: make(map[snapshotKey]GraphResponse),
|
|
inflight: make(map[snapshotKey]*snapshotInflight),
|
|
}
|
|
}
|
|
|
|
func (s *GraphSnapshotService) Get(ctx context.Context, nodeLimit int, edgeLimit int, graphQueryID string) (GraphResponse, error) {
|
|
key := snapshotKey{
|
|
NodeLimit: nodeLimit,
|
|
EdgeLimit: edgeLimit,
|
|
IncludeBNodes: s.cfg.IncludeBNodes,
|
|
GraphQueryID: graphQueryID,
|
|
LayoutEngine: s.cfg.HierarchyLayoutEngine,
|
|
LayoutRootIRI: s.cfg.HierarchyLayoutRootIRI,
|
|
}
|
|
|
|
s.mu.Lock()
|
|
if snap, ok := s.cache[key]; ok {
|
|
s.mu.Unlock()
|
|
return snap, nil
|
|
}
|
|
|
|
if inf, ok := s.inflight[key]; ok {
|
|
ready := inf.ready
|
|
s.mu.Unlock()
|
|
select {
|
|
case <-ctx.Done():
|
|
return GraphResponse{}, ctx.Err()
|
|
case <-ready:
|
|
return inf.snapshot, inf.err
|
|
}
|
|
}
|
|
|
|
inf := &snapshotInflight{ready: make(chan struct{})}
|
|
s.inflight[key] = inf
|
|
s.mu.Unlock()
|
|
|
|
log.Printf("[snapshot] build_start graph_query_id=%s node_limit=%d edge_limit=%d detached=true", graphQueryID, nodeLimit, edgeLimit)
|
|
go s.buildSnapshotInBackground(key, inf, nodeLimit, edgeLimit, graphQueryID)
|
|
|
|
select {
|
|
case <-ctx.Done():
|
|
log.Printf("[snapshot] requester_canceled graph_query_id=%s node_limit=%d edge_limit=%d err=%v build_continues=true", graphQueryID, nodeLimit, edgeLimit, ctx.Err())
|
|
return GraphResponse{}, ctx.Err()
|
|
case <-inf.ready:
|
|
return inf.snapshot, inf.err
|
|
}
|
|
}
|
|
|
|
func (s *GraphSnapshotService) buildSnapshotInBackground(key snapshotKey, inf *snapshotInflight, nodeLimit int, edgeLimit int, graphQueryID string) {
|
|
snap, err := s.fetchSnapshot(context.Background(), s.sparql, s.cfg, nodeLimit, edgeLimit, graphQueryID)
|
|
|
|
s.mu.Lock()
|
|
inf.snapshot = snap
|
|
inf.err = err
|
|
delete(s.inflight, key)
|
|
if err == nil {
|
|
s.cache[key] = snap
|
|
}
|
|
close(inf.ready)
|
|
s.mu.Unlock()
|
|
|
|
if err != nil {
|
|
log.Printf("[snapshot] build_done graph_query_id=%s node_limit=%d edge_limit=%d detached=true cached=false err=%v", graphQueryID, nodeLimit, edgeLimit, err)
|
|
return
|
|
}
|
|
log.Printf("[snapshot] build_done graph_query_id=%s node_limit=%d edge_limit=%d detached=true cached=true", graphQueryID, nodeLimit, edgeLimit)
|
|
}
|