package main import ( "context" "log" "sync" ) type snapshotKey struct { NodeLimit int EdgeLimit int IncludeBNodes bool GraphQueryID string LayoutEngine string LayoutRootIRI string } type snapshotInflight struct { ready chan struct{} snapshot GraphResponse err error } type GraphSnapshotService struct { sparql *AnzoGraphClient cfg Config fetchSnapshot func(context.Context, *AnzoGraphClient, Config, int, int, string) (GraphResponse, error) mu sync.Mutex cache map[snapshotKey]GraphResponse inflight map[snapshotKey]*snapshotInflight } func NewGraphSnapshotService(sparql *AnzoGraphClient, cfg Config) *GraphSnapshotService { return &GraphSnapshotService{ sparql: sparql, cfg: cfg, fetchSnapshot: fetchGraphSnapshot, cache: make(map[snapshotKey]GraphResponse), inflight: make(map[snapshotKey]*snapshotInflight), } } func (s *GraphSnapshotService) Get(ctx context.Context, nodeLimit int, edgeLimit int, graphQueryID string) (GraphResponse, error) { key := snapshotKey{ NodeLimit: nodeLimit, EdgeLimit: edgeLimit, IncludeBNodes: s.cfg.IncludeBNodes, GraphQueryID: graphQueryID, LayoutEngine: s.cfg.HierarchyLayoutEngine, LayoutRootIRI: s.cfg.HierarchyLayoutRootIRI, } s.mu.Lock() if snap, ok := s.cache[key]; ok { s.mu.Unlock() return snap, nil } if inf, ok := s.inflight[key]; ok { ready := inf.ready s.mu.Unlock() select { case <-ctx.Done(): return GraphResponse{}, ctx.Err() case <-ready: return inf.snapshot, inf.err } } inf := &snapshotInflight{ready: make(chan struct{})} s.inflight[key] = inf s.mu.Unlock() log.Printf("[snapshot] build_start graph_query_id=%s node_limit=%d edge_limit=%d detached=true", graphQueryID, nodeLimit, edgeLimit) go s.buildSnapshotInBackground(key, inf, nodeLimit, edgeLimit, graphQueryID) select { case <-ctx.Done(): log.Printf("[snapshot] requester_canceled graph_query_id=%s node_limit=%d edge_limit=%d err=%v build_continues=true", graphQueryID, nodeLimit, edgeLimit, ctx.Err()) return GraphResponse{}, ctx.Err() case <-inf.ready: return inf.snapshot, inf.err } } func (s *GraphSnapshotService) buildSnapshotInBackground(key snapshotKey, inf *snapshotInflight, nodeLimit int, edgeLimit int, graphQueryID string) { snap, err := s.fetchSnapshot(context.Background(), s.sparql, s.cfg, nodeLimit, edgeLimit, graphQueryID) s.mu.Lock() inf.snapshot = snap inf.err = err delete(s.inflight, key) if err == nil { s.cache[key] = snap } close(inf.ready) s.mu.Unlock() if err != nil { log.Printf("[snapshot] build_done graph_query_id=%s node_limit=%d edge_limit=%d detached=true cached=false err=%v", graphQueryID, nodeLimit, edgeLimit, err) return } log.Printf("[snapshot] build_done graph_query_id=%s node_limit=%d edge_limit=%d detached=true cached=true", graphQueryID, nodeLimit, edgeLimit) }