graph: stream snapshots over Arrow transport

This commit is contained in:
Oxy8
2026-04-06 13:36:23 -03:00
parent 44c1d3eaa6
commit ca715d7c3c
10 changed files with 1067 additions and 134 deletions

View File

@@ -4,77 +4,21 @@ import { fetchGraphQueries } from "./graph_queries";
import type { GraphQueryMeta } from "./graph_queries";
import { fetchSelectionQueries, runSelectionQuery, runSelectionTripleQuery } from "./selection_queries";
import { cosmosRuntimeConfig } from "./cosmos_config";
import type { GraphMeta, GraphRoutePoint, GraphRouteSegment, SelectionQueryMeta, SelectionTriple } from "./selection_queries";
import type { GraphMeta, SelectionQueryMeta, SelectionTriple } from "./selection_queries";
import { TripleGraphView } from "./TripleGraphView";
import { buildTripleGraphModel, type TripleGraphModel } from "./triple_graph";
import { readGraphArrow } from "./graph_arrow";
function sleep(ms: number): Promise<void> {
return new Promise((r) => setTimeout(r, ms));
}
type GraphNodeMeta = {
id?: number;
iri?: string;
label?: string;
x?: number;
y?: number;
type GraphNodeLookup = {
vertexIds: Uint32Array;
labels: (string | undefined)[];
iris: string[];
};
function graphRoutePoint(value: unknown): GraphRoutePoint | null {
if (!value || typeof value !== "object") return null;
const record = value as Record<string, unknown>;
if (typeof record.x !== "number" || typeof record.y !== "number") return null;
return {
x: record.x,
y: record.y,
};
}
function graphRouteSegmentArray(value: unknown): GraphRouteSegment[] {
if (!Array.isArray(value)) return [];
const out: GraphRouteSegment[] = [];
for (const item of value) {
if (!item || typeof item !== "object") continue;
const record = item as Record<string, unknown>;
if (typeof record.edge_index !== "number" || typeof record.kind !== "string") continue;
if (!Array.isArray(record.points)) continue;
const points: GraphRoutePoint[] = [];
for (const point of record.points) {
const parsed = graphRoutePoint(point);
if (!parsed) continue;
points.push(parsed);
}
if (points.length < 2) continue;
out.push({
edge_index: record.edge_index,
kind: record.kind,
points,
});
}
return out;
}
function buildRouteLineVertices(routeSegments: GraphRouteSegment[]): Float32Array {
let lineCount = 0;
for (const route of routeSegments) {
lineCount += Math.max(0, route.points.length - 1);
}
const out = new Float32Array(lineCount * 4);
let offset = 0;
for (const route of routeSegments) {
for (let i = 1; i < route.points.length; i++) {
const previous = route.points[i - 1];
const current = route.points[i];
out[offset++] = previous.x;
out[offset++] = previous.y;
out[offset++] = current.x;
out[offset++] = current.y;
}
}
return out;
}
type TripleResultState = {
status: "idle" | "loading" | "ready" | "error";
queryId: string;
@@ -92,6 +36,37 @@ function idleTripleResult(queryId: string): TripleResultState {
};
}
function formatBytes(bytes: number): string {
if (!Number.isFinite(bytes) || bytes <= 0) return "0 B";
const units = ["B", "KB", "MB", "GB", "TB"];
let value = bytes;
let unitIndex = 0;
while (value >= 1024 && unitIndex < units.length - 1) {
value /= 1024;
unitIndex++;
}
return `${value.toFixed(value >= 100 || unitIndex === 0 ? 0 : 1)} ${units[unitIndex]}`;
}
function estimateFrontendTypedArrayBytes(nodeCount: number, edgeCount: number, routeLineFloatCount: number): { app: number; renderer: number; total: number } {
const app =
nodeCount * Float32Array.BYTES_PER_ELEMENT * 2 +
nodeCount * Uint32Array.BYTES_PER_ELEMENT +
edgeCount * Uint32Array.BYTES_PER_ELEMENT * 2 +
routeLineFloatCount * Float32Array.BYTES_PER_ELEMENT;
const renderer =
nodeCount * Float32Array.BYTES_PER_ELEMENT * 2 +
nodeCount * Uint32Array.BYTES_PER_ELEMENT * 4 +
edgeCount * Uint32Array.BYTES_PER_ELEMENT * 4;
return {
app,
renderer,
total: app + renderer,
};
}
export default function App() {
const canvasRef = useRef<HTMLCanvasElement>(null);
const rendererRef = useRef<Renderer | null>(null);
@@ -121,14 +96,34 @@ export default function App() {
// Store mouse position in a ref so it can be accessed in render loop without re-renders
const mousePos = useRef({ x: 0, y: 0 });
const nodesRef = useRef<GraphNodeMeta[]>([]);
const nodeLookupRef = useRef<GraphNodeLookup | null>(null);
async function loadGraph(graphQueryId: string, signal: AbortSignal): Promise<void> {
const renderer = rendererRef.current;
if (!renderer) return;
setStatus("Fetching graph…");
const loadStartedAt = performance.now();
const logPhase = (phase: string, extra?: Record<string, unknown>) => {
console.log(`[graph-load] ${phase}`, {
elapsed_ms: Math.round(performance.now() - loadStartedAt),
graph_query_id: graphQueryId,
...(extra || {}),
});
};
const updateStatus = async (nextStatus: string, extra?: Record<string, unknown>): Promise<void> => {
setStatus(nextStatus);
logPhase(nextStatus, extra);
await sleep(0);
};
await updateStatus("Fetching graph…");
const graphRes = await fetch(`/api/graph?graph_query_id=${encodeURIComponent(graphQueryId)}`, { signal });
logPhase("graph response headers received", {
status: graphRes.status,
content_type: graphRes.headers.get("content-type"),
content_length: graphRes.headers.get("content-length"),
});
if (!graphRes.ok) {
let detail = "";
try {
@@ -141,44 +136,57 @@ export default function App() {
}
throw new Error(`Failed to fetch graph: ${graphRes.status}${detail ? ` (${detail})` : ""}`);
}
const graph = await graphRes.json();
await updateStatus("Streaming Arrow graph…");
await updateStatus("Decoding Arrow batches…");
const decodeStartedAt = performance.now();
let graph: Awaited<ReturnType<typeof readGraphArrow>>;
try {
graph = await readGraphArrow(graphRes, logPhase);
} catch (e) {
logPhase("arrow graph decode failed", {
error: e instanceof Error ? e.message : String(e),
});
throw e;
}
logPhase("arrow graph decoded", {
decode_ms: Math.round(performance.now() - decodeStartedAt),
});
if (signal.aborted) return;
const nodes = Array.isArray(graph.nodes) ? graph.nodes : [];
const edges = Array.isArray(graph.edges) ? graph.edges : [];
const routeSegments = graphRouteSegmentArray(graph.route_segments);
const meta = graph.meta || null;
const count = nodes.length;
const vertexIds = graph.vertexIds;
const xs = graph.xs;
const ys = graph.ys;
const edgeData = graph.edgeData;
const routeLineVertices = graph.routeLineVertices;
const labels = graph.labels;
const iris = graph.iris;
const count = vertexIds.length;
const edgeCount = edgeData.length / 2;
logPhase("graph payload ready", {
nodes: count,
edges: edgeCount,
route_line_segments: routeLineVertices.length / 4,
backend_nodes: meta && typeof meta.nodes === "number" ? meta.nodes : undefined,
backend_edges: meta && typeof meta.edges === "number" ? meta.edges : undefined,
});
nodesRef.current = nodes;
nodeLookupRef.current = { vertexIds, labels, iris };
graphMetaRef.current = meta && typeof meta === "object" ? (meta as GraphMeta) : null;
setTripleResult(idleTripleResult(activeSelectionQueryId));
// Build positions from backend-provided node coordinates.
setStatus("Preparing buffers…");
const xs = new Float32Array(count);
const ys = new Float32Array(count);
for (let i = 0; i < count; i++) {
const nx = nodes[i]?.x;
const ny = nodes[i]?.y;
xs[i] = typeof nx === "number" ? nx : 0;
ys[i] = typeof ny === "number" ? ny : 0;
}
const vertexIds = new Uint32Array(count);
for (let i = 0; i < count; i++) {
const id = nodes[i]?.id;
vertexIds[i] = typeof id === "number" ? id >>> 0 : i;
}
// Build edges as vertex-id pairs.
const edgeData = new Uint32Array(edges.length * 2);
for (let i = 0; i < edges.length; i++) {
const s = edges[i]?.source;
const t = edges[i]?.target;
edgeData[i * 2] = typeof s === "number" ? s >>> 0 : 0;
edgeData[i * 2 + 1] = typeof t === "number" ? t >>> 0 : 0;
}
const routeLineVertices = buildRouteLineVertices(routeSegments);
await updateStatus("Preparing buffers…", {
nodes: count,
edges: edgeCount,
});
const bufferPrepStartedAt = performance.now();
const typedArrayBytes = estimateFrontendTypedArrayBytes(count, edgeCount, routeLineVertices.length);
logPhase("buffer prep done", {
buffer_prep_ms: Math.round(performance.now() - bufferPrepStartedAt),
app_typed_arrays: formatBytes(typedArrayBytes.app),
renderer_typed_arrays_estimate: formatBytes(typedArrayBytes.renderer),
total_typed_arrays_estimate: formatBytes(typedArrayBytes.total),
});
// Use /api/graph meta; don't do a second expensive backend call.
if (meta && typeof meta.nodes === "number" && typeof meta.edges === "number") {
@@ -188,33 +196,48 @@ export default function App() {
backend: typeof meta.backend === "string" ? meta.backend : undefined,
});
} else {
setBackendStats({ nodes: nodes.length, edges: edges.length });
setBackendStats({ nodes: count, edges: edgeCount });
}
setStatus("Building spatial index…");
await new Promise((r) => setTimeout(r, 0));
await updateStatus("Building spatial index…", {
nodes: count,
edges: edgeCount,
});
if (signal.aborted) return;
const buildMs = renderer.init(
xs,
ys,
vertexIds,
edgeData,
routeLineVertices.length > 0 ? routeLineVertices : null
);
let buildMs: number;
try {
buildMs = renderer.init(
xs,
ys,
vertexIds,
edgeData,
routeLineVertices.length > 0 ? routeLineVertices : null
);
} catch (e) {
logPhase("renderer.init failed", {
error: e instanceof Error ? e.message : String(e),
});
throw e;
}
setNodeCount(renderer.getNodeCount());
setSelectedNodes(new Set());
setStatus("");
console.log(`Init complete: ${count.toLocaleString()} nodes, ${edges.length.toLocaleString()} edges in ${buildMs.toFixed(0)}ms`);
logPhase("init complete", {
renderer_init_ms: Math.round(buildMs),
nodes: count,
edges: edgeCount,
});
}
function getSelectedIds(renderer: Renderer, selected: Set<number>): number[] {
const lookup = nodeLookupRef.current;
if (!lookup) return [];
const selectedIds: number[] = [];
for (const sortedIdx of selected) {
const origIdx = renderer.sortedIndexToOriginalIndex(sortedIdx);
if (origIdx === null) continue;
const node = nodesRef.current?.[origIdx];
const nodeId = node?.id;
const nodeId = lookup.vertexIds[origIdx];
if (typeof nodeId !== "number") continue;
selectedIds.push(nodeId);
}
@@ -370,14 +393,16 @@ export default function App() {
const hit = renderer.findNodeIndexAt(mousePos.current.x, mousePos.current.y);
if (hit) {
const origIdx = renderer.sortedIndexToOriginalIndex(hit.index);
const meta = origIdx === null ? null : nodesRef.current[origIdx];
const lookup = nodeLookupRef.current;
const label = origIdx === null || !lookup ? undefined : lookup.labels[origIdx];
const iri = origIdx === null || !lookup ? undefined : lookup.iris[origIdx];
setHoveredNode({
x: hit.x,
y: hit.y,
screenX: mousePos.current.x,
screenY: mousePos.current.y,
label: meta && typeof meta.label === "string" ? meta.label : undefined,
iri: meta && typeof meta.iri === "string" ? meta.iri : undefined,
label: typeof label === "string" ? label : undefined,
iri: typeof iri === "string" ? iri : undefined,
});
} else {
setHoveredNode(null);

301
frontend/src/graph_arrow.ts Normal file
View File

@@ -0,0 +1,301 @@
import { RecordBatchReader } from "apache-arrow";
import type { GraphMeta } from "./selection_queries";
export type ArrowGraphLoadResult = {
meta: GraphMeta | null;
vertexIds: Uint32Array;
xs: Float32Array;
ys: Float32Array;
edgeData: Uint32Array;
routeLineVertices: Float32Array;
labels: (string | undefined)[];
iris: string[];
};
type ArrowBatchLog = (phase: string, extra?: Record<string, unknown>) => void;
type ArrowLikeVector = {
data?: Array<{ valueOffsets?: ArrayLike<number | bigint> }>;
getChildAt?: (index: number) => ArrowLikeVector | null;
get?: (index: number) => unknown;
toArray?: () => ArrayLike<number>;
};
const graphTransportVersion = "1";
export async function readGraphArrow(response: Response, logPhase?: ArrowBatchLog): Promise<ArrowGraphLoadResult> {
validateArrowResponse(response);
const reader = await openArrowReader(response);
let meta: GraphMeta | null = null;
let vertexIds: Uint32Array | null = null;
let xs: Float32Array | null = null;
let ys: Float32Array | null = null;
let edgeData: Uint32Array | null = null;
let routeLineVertices: Float32Array | null = null;
let labels: (string | undefined)[] | null = null;
let iris: string[] | null = null;
let batchIndex = 0;
for await (const batch of reader) {
const batchStartedAt = performance.now();
switch (batchIndex) {
case 0: {
meta = decodeMetaBatch(batch);
const nodeCount = typeof meta.nodes === "number" ? meta.nodes : 0;
const edgeCount = typeof meta.edges === "number" ? meta.edges : 0;
const routeLineSegments = readScalarNumber(batch, "meta_route_line_segments") ?? 0;
vertexIds = new Uint32Array(nodeCount);
xs = new Float32Array(nodeCount);
ys = new Float32Array(nodeCount);
edgeData = new Uint32Array(edgeCount * 2);
routeLineVertices = new Float32Array(routeLineSegments * 4);
labels = new Array<string | undefined>(nodeCount);
iris = new Array<string>(nodeCount);
logPhase?.("arrow_meta_batch_decoded", {
batch_index: batchIndex,
rows: batch.numRows,
node_count: nodeCount,
edge_count: edgeCount,
route_line_segments: routeLineSegments,
decode_ms: Math.round(performance.now() - batchStartedAt),
});
break;
}
case 1: {
ensureAllocated(meta, vertexIds, xs, ys, edgeData, routeLineVertices, labels, iris);
const nodeIDs = readUint32List(batch, "node_id");
const nodeXs = readFloat32List(batch, "node_x");
const nodeYs = readFloat32List(batch, "node_y");
const nodeIRIs = readStringList(batch, "node_iri");
const nodeLabels = readNullableStringList(batch, "node_label");
if (nodeIDs.length !== vertexIds.length || nodeXs.length !== xs.length || nodeYs.length !== ys.length) {
throw new Error("Arrow node batch length mismatch");
}
if (nodeIRIs.length !== iris.length || nodeLabels.length !== labels.length) {
throw new Error("Arrow node metadata batch length mismatch");
}
vertexIds.set(nodeIDs);
xs.set(nodeXs);
ys.set(nodeYs);
for (let i = 0; i < iris.length; i++) {
iris[i] = nodeIRIs[i] ?? "";
labels[i] = nodeLabels[i];
}
logPhase?.("arrow_node_batch_decoded", {
batch_index: batchIndex,
rows: batch.numRows,
nodes: vertexIds.length,
decode_ms: Math.round(performance.now() - batchStartedAt),
});
break;
}
case 2: {
ensureAllocated(meta, vertexIds, xs, ys, edgeData, routeLineVertices, labels, iris);
const edgeSources = readUint32List(batch, "edge_source");
const edgeTargets = readUint32List(batch, "edge_target");
if (edgeSources.length !== edgeTargets.length) {
throw new Error("Arrow edge batch source/target length mismatch");
}
if (edgeData.length !== edgeSources.length * 2) {
throw new Error("Arrow edge batch size mismatch");
}
for (let i = 0; i < edgeSources.length; i++) {
edgeData[i * 2] = edgeSources[i];
edgeData[i * 2 + 1] = edgeTargets[i];
}
logPhase?.("arrow_edge_batch_decoded", {
batch_index: batchIndex,
rows: batch.numRows,
edges: edgeSources.length,
decode_ms: Math.round(performance.now() - batchStartedAt),
});
break;
}
case 3: {
ensureAllocated(meta, vertexIds, xs, ys, edgeData, routeLineVertices, labels, iris);
const routeX1 = readFloat32List(batch, "route_x1");
const routeY1 = readFloat32List(batch, "route_y1");
const routeX2 = readFloat32List(batch, "route_x2");
const routeY2 = readFloat32List(batch, "route_y2");
if (
routeX1.length !== routeY1.length ||
routeX1.length !== routeX2.length ||
routeX1.length !== routeY2.length
) {
throw new Error("Arrow route batch axis length mismatch");
}
if (routeLineVertices.length !== routeX1.length * 4) {
throw new Error("Arrow route batch size mismatch");
}
for (let i = 0; i < routeX1.length; i++) {
routeLineVertices[i * 4] = routeX1[i];
routeLineVertices[i * 4 + 1] = routeY1[i];
routeLineVertices[i * 4 + 2] = routeX2[i];
routeLineVertices[i * 4 + 3] = routeY2[i];
}
logPhase?.("arrow_route_batch_decoded", {
batch_index: batchIndex,
rows: batch.numRows,
route_line_segments: routeX1.length,
decode_ms: Math.round(performance.now() - batchStartedAt),
});
break;
}
default:
throw new Error(`Unexpected Arrow batch index ${batchIndex}`);
}
batchIndex++;
}
if (batchIndex !== 4) {
throw new Error(`Expected 4 Arrow batches, received ${batchIndex}`);
}
ensureAllocated(meta, vertexIds, xs, ys, edgeData, routeLineVertices, labels, iris);
return {
meta,
vertexIds,
xs,
ys,
edgeData,
routeLineVertices,
labels,
iris,
};
}
async function openArrowReader(response: Response) {
if (response.body) {
return RecordBatchReader.from(response.body);
}
return RecordBatchReader.from(await response.arrayBuffer());
}
function validateArrowResponse(response: Response): void {
const contentType = response.headers.get("content-type") || "";
if (!contentType.includes("application/vnd.apache.arrow.stream")) {
throw new Error(`Unexpected graph content type: ${contentType || "(missing)"}`);
}
const version = response.headers.get("X-Graph-Transport-Version");
if (version !== graphTransportVersion) {
throw new Error(`Unexpected graph transport version: ${version || "(missing)"}`);
}
}
function decodeMetaBatch(batch: any): GraphMeta {
return {
backend: readScalarString(batch, "meta_backend"),
graph_query_id: readScalarString(batch, "meta_graph_query_id"),
node_limit: readScalarNumber(batch, "meta_node_limit"),
edge_limit: readScalarNumber(batch, "meta_edge_limit"),
nodes: readScalarNumber(batch, "meta_nodes"),
edges: readScalarNumber(batch, "meta_edges"),
};
}
function ensureAllocated(
meta: GraphMeta | null,
vertexIds: Uint32Array | null,
xs: Float32Array | null,
ys: Float32Array | null,
edgeData: Uint32Array | null,
routeLineVertices: Float32Array | null,
labels: (string | undefined)[] | null,
iris: string[] | null
): asserts meta is GraphMeta & {} & {
nodes?: number;
edges?: number;
} {
if (!meta || !vertexIds || !xs || !ys || !edgeData || !routeLineVertices || !labels || !iris) {
throw new Error("Arrow graph stream is missing the meta batch");
}
}
function readScalarString(batch: any, name: string): string | undefined {
const vector = batch.getChild(name);
if (!vector || batch.numRows < 1) return undefined;
const value = vector.get(0);
return typeof value === "string" ? value : undefined;
}
function readScalarNumber(batch: any, name: string): number | undefined {
const vector = batch.getChild(name);
if (!vector || batch.numRows < 1) return undefined;
const value = vector.get(0);
return typeof value === "number" && Number.isFinite(value) ? value : undefined;
}
function readUint32List(batch: any, name: string): Uint32Array {
return readPrimitiveList(batch, name, Uint32Array);
}
function readFloat32List(batch: any, name: string): Float32Array {
return readPrimitiveList(batch, name, Float32Array);
}
function readPrimitiveList<T extends Uint32Array | Float32Array>(
batch: any,
name: string,
ctor: { new(length: number): T }
): T {
const vector = batch.getChild(name) as ArrowLikeVector | null;
if (!vector) return new ctor(0);
const [begin, end] = getListBounds(vector);
const child = vector.getChildAt?.(0);
if (!child || typeof child.toArray !== "function") {
return new ctor(0);
}
const values = child.toArray();
if (typeof (values as any).subarray === "function") {
return (values as T).subarray(begin, end) as T;
}
const out = new ctor(end - begin);
for (let i = begin; i < end; i++) {
out[i - begin] = Number((values as ArrayLike<number>)[i]) as T[number];
}
return out;
}
function readStringList(batch: any, name: string): string[] {
return readNullableStringList(batch, name).map((value) => value ?? "");
}
function readNullableStringList(batch: any, name: string): (string | undefined)[] {
const vector = batch.getChild(name) as ArrowLikeVector | null;
if (!vector) return [];
const [begin, end] = getListBounds(vector);
const child = vector.getChildAt?.(0);
if (!child || typeof child.get !== "function") {
return [];
}
const out = new Array<string | undefined>(Math.max(0, end - begin));
for (let i = begin; i < end; i++) {
const value = child.get(i);
out[i - begin] = typeof value === "string" ? value : undefined;
}
return out;
}
function getListBounds(vector: ArrowLikeVector): [number, number] {
const offsets = vector.data?.[0]?.valueOffsets;
if (!offsets || offsets.length < 2) return [0, 0];
return [offsetToNumber(offsets[0]), offsetToNumber(offsets[1])];
}
function offsetToNumber(value: number | bigint | undefined): number {
if (typeof value === "bigint") return Number(value);
return typeof value === "number" ? value : 0;
}

View File

@@ -1,8 +1,6 @@
export { fetchSelectionQueries, runSelectionQuery, runSelectionTripleQuery } from "./api";
export type {
GraphMeta,
GraphRoutePoint,
GraphRouteSegment,
SelectionQueryMeta,
SelectionTriple,
SelectionTripleResult,

View File

@@ -1,26 +1,10 @@
export type GraphMeta = {
backend?: string;
ttl_path?: string | null;
sparql_endpoint?: string | null;
include_bnodes?: boolean;
graph_query_id?: string;
node_limit?: number;
edge_limit?: number;
nodes?: number;
edges?: number;
layout_engine?: string;
layout_root_iri?: string | null;
};
export type GraphRoutePoint = {
x: number;
y: number;
};
export type GraphRouteSegment = {
edge_index: number;
kind: string;
points: GraphRoutePoint[];
};
export type SelectionQueryMeta = {