package main import ( "fmt" "log" "net/http" "time" "github.com/apache/arrow/go/v17/arrow" "github.com/apache/arrow/go/v17/arrow/array" "github.com/apache/arrow/go/v17/arrow/ipc" "github.com/apache/arrow/go/v17/arrow/memory" ) const graphTransportVersion = "1" const ( graphArrowMetaBackendField = iota graphArrowMetaGraphQueryIDField graphArrowMetaNodeLimitField graphArrowMetaEdgeLimitField graphArrowMetaNodesField graphArrowMetaEdgesField graphArrowMetaRouteLineSegmentsField graphArrowNodeIDField graphArrowNodeXField graphArrowNodeYField graphArrowNodeIRIField graphArrowNodeLabelField graphArrowEdgeSourceField graphArrowEdgeTargetField graphArrowRouteX1Field graphArrowRouteY1Field graphArrowRouteX2Field graphArrowRouteY2Field ) var graphArrowSchema = arrow.NewSchema([]arrow.Field{ {Name: "meta_backend", Type: arrow.BinaryTypes.String, Nullable: true}, {Name: "meta_graph_query_id", Type: arrow.BinaryTypes.String, Nullable: true}, {Name: "meta_node_limit", Type: arrow.PrimitiveTypes.Int32, Nullable: true}, {Name: "meta_edge_limit", Type: arrow.PrimitiveTypes.Int32, Nullable: true}, {Name: "meta_nodes", Type: arrow.PrimitiveTypes.Int32, Nullable: true}, {Name: "meta_edges", Type: arrow.PrimitiveTypes.Int32, Nullable: true}, {Name: "meta_route_line_segments", Type: arrow.PrimitiveTypes.Int32, Nullable: true}, {Name: "node_id", Type: arrow.ListOf(arrow.PrimitiveTypes.Uint32), Nullable: true}, {Name: "node_x", Type: arrow.ListOf(arrow.PrimitiveTypes.Float32), Nullable: true}, {Name: "node_y", Type: arrow.ListOf(arrow.PrimitiveTypes.Float32), Nullable: true}, {Name: "node_iri", Type: arrow.ListOf(arrow.BinaryTypes.String), Nullable: true}, {Name: "node_label", Type: arrow.ListOf(arrow.BinaryTypes.String), Nullable: true}, {Name: "edge_source", Type: arrow.ListOf(arrow.PrimitiveTypes.Uint32), Nullable: true}, {Name: "edge_target", Type: arrow.ListOf(arrow.PrimitiveTypes.Uint32), Nullable: true}, {Name: "route_x1", Type: arrow.ListOf(arrow.PrimitiveTypes.Float32), Nullable: true}, {Name: "route_y1", Type: arrow.ListOf(arrow.PrimitiveTypes.Float32), Nullable: true}, {Name: "route_x2", Type: arrow.ListOf(arrow.PrimitiveTypes.Float32), Nullable: true}, {Name: "route_y2", Type: arrow.ListOf(arrow.PrimitiveTypes.Float32), Nullable: true}, }, nil) type graphArrowRouteLines struct { X1 []float32 Y1 []float32 X2 []float32 Y2 []float32 } func writeGraphArrow(w http.ResponseWriter, snap GraphResponse) error { start := time.Now() routes := flattenGraphRouteLines(snap.RouteSegments) meta := snap.Meta if meta == nil { meta = &GraphMeta{ GraphQueryID: "default", NodeLimit: len(snap.Nodes), EdgeLimit: len(snap.Edges), Nodes: len(snap.Nodes), Edges: len(snap.Edges), } } alloc := memory.NewGoAllocator() w.Header().Set("Content-Type", "application/vnd.apache.arrow.stream") w.Header().Set("X-Graph-Transport-Version", graphTransportVersion) w.WriteHeader(http.StatusOK) writer := ipc.NewWriter(w, ipc.WithSchema(graphArrowSchema)) closed := false defer func() { if !closed { _ = writer.Close() } }() if err := writeGraphArrowMetaBatch(writer, alloc, meta, len(routes.X1)); err != nil { return fmt.Errorf("write meta batch failed: %w", err) } if err := writeGraphArrowNodeBatch(writer, alloc, snap.Nodes); err != nil { return fmt.Errorf("write node batch failed: %w", err) } if err := writeGraphArrowEdgeBatch(writer, alloc, snap.Edges); err != nil { return fmt.Errorf("write edge batch failed: %w", err) } if err := writeGraphArrowRouteBatch(writer, alloc, routes); err != nil { return fmt.Errorf("write route batch failed: %w", err) } if err := writer.Close(); err != nil { return fmt.Errorf("close arrow writer failed: %w", err) } closed = true log.Printf( "[graph-arrow] encode_done graph_query_id=%s nodes=%d edges=%d route_line_segments=%d encode_time=%s", meta.GraphQueryID, len(snap.Nodes), len(snap.Edges), len(routes.X1), time.Since(start).Truncate(time.Millisecond), ) return nil } func writeGraphArrowMetaBatch(writer *ipc.Writer, alloc memory.Allocator, meta *GraphMeta, routeLineSegments int) error { builder := array.NewRecordBuilder(alloc, graphArrowSchema) defer builder.Release() appendStringBuilderValue(builder.Field(graphArrowMetaBackendField), meta.Backend) appendStringBuilderValue(builder.Field(graphArrowMetaGraphQueryIDField), meta.GraphQueryID) appendInt32BuilderValue(builder.Field(graphArrowMetaNodeLimitField), int32(meta.NodeLimit)) appendInt32BuilderValue(builder.Field(graphArrowMetaEdgeLimitField), int32(meta.EdgeLimit)) appendInt32BuilderValue(builder.Field(graphArrowMetaNodesField), int32(meta.Nodes)) appendInt32BuilderValue(builder.Field(graphArrowMetaEdgesField), int32(meta.Edges)) appendInt32BuilderValue(builder.Field(graphArrowMetaRouteLineSegmentsField), int32(routeLineSegments)) appendNullTopLevel(builder.Field(graphArrowNodeIDField)) appendNullTopLevel(builder.Field(graphArrowNodeXField)) appendNullTopLevel(builder.Field(graphArrowNodeYField)) appendNullTopLevel(builder.Field(graphArrowNodeIRIField)) appendNullTopLevel(builder.Field(graphArrowNodeLabelField)) appendNullTopLevel(builder.Field(graphArrowEdgeSourceField)) appendNullTopLevel(builder.Field(graphArrowEdgeTargetField)) appendNullTopLevel(builder.Field(graphArrowRouteX1Field)) appendNullTopLevel(builder.Field(graphArrowRouteY1Field)) appendNullTopLevel(builder.Field(graphArrowRouteX2Field)) appendNullTopLevel(builder.Field(graphArrowRouteY2Field)) return writeGraphArrowRecord(writer, builder) } func writeGraphArrowNodeBatch(writer *ipc.Writer, alloc memory.Allocator, nodes []Node) error { builder := array.NewRecordBuilder(alloc, graphArrowSchema) defer builder.Release() appendNullTopLevel(builder.Field(graphArrowMetaBackendField)) appendNullTopLevel(builder.Field(graphArrowMetaGraphQueryIDField)) appendNullTopLevel(builder.Field(graphArrowMetaNodeLimitField)) appendNullTopLevel(builder.Field(graphArrowMetaEdgeLimitField)) appendNullTopLevel(builder.Field(graphArrowMetaNodesField)) appendNullTopLevel(builder.Field(graphArrowMetaEdgesField)) appendNullTopLevel(builder.Field(graphArrowMetaRouteLineSegmentsField)) appendUint32List(builder.Field(graphArrowNodeIDField), func(valueBuilder *array.Uint32Builder) { for _, node := range nodes { valueBuilder.Append(node.ID) } }) appendFloat32List(builder.Field(graphArrowNodeXField), func(valueBuilder *array.Float32Builder) { for _, node := range nodes { valueBuilder.Append(float32(node.X)) } }) appendFloat32List(builder.Field(graphArrowNodeYField), func(valueBuilder *array.Float32Builder) { for _, node := range nodes { valueBuilder.Append(float32(node.Y)) } }) appendStringList(builder.Field(graphArrowNodeIRIField), func(valueBuilder *array.StringBuilder) { for _, node := range nodes { valueBuilder.Append(node.IRI) } }) appendNullableStringList(builder.Field(graphArrowNodeLabelField), func(valueBuilder *array.StringBuilder) { for _, node := range nodes { if node.Label == nil { valueBuilder.AppendNull() continue } valueBuilder.Append(*node.Label) } }) appendNullTopLevel(builder.Field(graphArrowEdgeSourceField)) appendNullTopLevel(builder.Field(graphArrowEdgeTargetField)) appendNullTopLevel(builder.Field(graphArrowRouteX1Field)) appendNullTopLevel(builder.Field(graphArrowRouteY1Field)) appendNullTopLevel(builder.Field(graphArrowRouteX2Field)) appendNullTopLevel(builder.Field(graphArrowRouteY2Field)) return writeGraphArrowRecord(writer, builder) } func writeGraphArrowEdgeBatch(writer *ipc.Writer, alloc memory.Allocator, edges []Edge) error { builder := array.NewRecordBuilder(alloc, graphArrowSchema) defer builder.Release() appendNullTopLevel(builder.Field(graphArrowMetaBackendField)) appendNullTopLevel(builder.Field(graphArrowMetaGraphQueryIDField)) appendNullTopLevel(builder.Field(graphArrowMetaNodeLimitField)) appendNullTopLevel(builder.Field(graphArrowMetaEdgeLimitField)) appendNullTopLevel(builder.Field(graphArrowMetaNodesField)) appendNullTopLevel(builder.Field(graphArrowMetaEdgesField)) appendNullTopLevel(builder.Field(graphArrowMetaRouteLineSegmentsField)) appendNullTopLevel(builder.Field(graphArrowNodeIDField)) appendNullTopLevel(builder.Field(graphArrowNodeXField)) appendNullTopLevel(builder.Field(graphArrowNodeYField)) appendNullTopLevel(builder.Field(graphArrowNodeIRIField)) appendNullTopLevel(builder.Field(graphArrowNodeLabelField)) appendUint32List(builder.Field(graphArrowEdgeSourceField), func(valueBuilder *array.Uint32Builder) { for _, edge := range edges { valueBuilder.Append(edge.Source) } }) appendUint32List(builder.Field(graphArrowEdgeTargetField), func(valueBuilder *array.Uint32Builder) { for _, edge := range edges { valueBuilder.Append(edge.Target) } }) appendNullTopLevel(builder.Field(graphArrowRouteX1Field)) appendNullTopLevel(builder.Field(graphArrowRouteY1Field)) appendNullTopLevel(builder.Field(graphArrowRouteX2Field)) appendNullTopLevel(builder.Field(graphArrowRouteY2Field)) return writeGraphArrowRecord(writer, builder) } func writeGraphArrowRouteBatch(writer *ipc.Writer, alloc memory.Allocator, routes graphArrowRouteLines) error { builder := array.NewRecordBuilder(alloc, graphArrowSchema) defer builder.Release() appendNullTopLevel(builder.Field(graphArrowMetaBackendField)) appendNullTopLevel(builder.Field(graphArrowMetaGraphQueryIDField)) appendNullTopLevel(builder.Field(graphArrowMetaNodeLimitField)) appendNullTopLevel(builder.Field(graphArrowMetaEdgeLimitField)) appendNullTopLevel(builder.Field(graphArrowMetaNodesField)) appendNullTopLevel(builder.Field(graphArrowMetaEdgesField)) appendNullTopLevel(builder.Field(graphArrowMetaRouteLineSegmentsField)) appendNullTopLevel(builder.Field(graphArrowNodeIDField)) appendNullTopLevel(builder.Field(graphArrowNodeXField)) appendNullTopLevel(builder.Field(graphArrowNodeYField)) appendNullTopLevel(builder.Field(graphArrowNodeIRIField)) appendNullTopLevel(builder.Field(graphArrowNodeLabelField)) appendNullTopLevel(builder.Field(graphArrowEdgeSourceField)) appendNullTopLevel(builder.Field(graphArrowEdgeTargetField)) appendFloat32List(builder.Field(graphArrowRouteX1Field), func(valueBuilder *array.Float32Builder) { for _, value := range routes.X1 { valueBuilder.Append(value) } }) appendFloat32List(builder.Field(graphArrowRouteY1Field), func(valueBuilder *array.Float32Builder) { for _, value := range routes.Y1 { valueBuilder.Append(value) } }) appendFloat32List(builder.Field(graphArrowRouteX2Field), func(valueBuilder *array.Float32Builder) { for _, value := range routes.X2 { valueBuilder.Append(value) } }) appendFloat32List(builder.Field(graphArrowRouteY2Field), func(valueBuilder *array.Float32Builder) { for _, value := range routes.Y2 { valueBuilder.Append(value) } }) return writeGraphArrowRecord(writer, builder) } func writeGraphArrowRecord(writer *ipc.Writer, builder *array.RecordBuilder) error { record := builder.NewRecord() defer record.Release() return writer.Write(record) } func appendStringBuilderValue(builder array.Builder, value string) { builder.(*array.StringBuilder).Append(value) } func appendInt32BuilderValue(builder array.Builder, value int32) { builder.(*array.Int32Builder).Append(value) } func appendUint32List(builder array.Builder, appendValues func(*array.Uint32Builder)) { listBuilder := builder.(*array.ListBuilder) listBuilder.Append(true) valueBuilder := listBuilder.ValueBuilder().(*array.Uint32Builder) appendValues(valueBuilder) } func appendFloat32List(builder array.Builder, appendValues func(*array.Float32Builder)) { listBuilder := builder.(*array.ListBuilder) listBuilder.Append(true) valueBuilder := listBuilder.ValueBuilder().(*array.Float32Builder) appendValues(valueBuilder) } func appendStringList(builder array.Builder, appendValues func(*array.StringBuilder)) { listBuilder := builder.(*array.ListBuilder) listBuilder.Append(true) valueBuilder := listBuilder.ValueBuilder().(*array.StringBuilder) appendValues(valueBuilder) } func appendNullableStringList(builder array.Builder, appendValues func(*array.StringBuilder)) { listBuilder := builder.(*array.ListBuilder) listBuilder.Append(true) valueBuilder := listBuilder.ValueBuilder().(*array.StringBuilder) appendValues(valueBuilder) } func appendNullTopLevel(builder array.Builder) { switch typed := builder.(type) { case *array.StringBuilder: typed.AppendNull() case *array.Int32Builder: typed.AppendNull() case *array.ListBuilder: typed.AppendNull() default: panic(fmt.Sprintf("unsupported top-level builder type %T", builder)) } } func flattenGraphRouteLines(routeSegments []RouteSegment) graphArrowRouteLines { lineCount := 0 for _, route := range routeSegments { if len(route.Points) > 1 { lineCount += len(route.Points) - 1 } } out := graphArrowRouteLines{ X1: make([]float32, 0, lineCount), Y1: make([]float32, 0, lineCount), X2: make([]float32, 0, lineCount), Y2: make([]float32, 0, lineCount), } for _, route := range routeSegments { for i := 1; i < len(route.Points); i++ { prev := route.Points[i-1] curr := route.Points[i] out.X1 = append(out.X1, float32(prev.X)) out.Y1 = append(out.Y1, float32(prev.Y)) out.X2 = append(out.X2, float32(curr.X)) out.Y2 = append(out.Y2, float32(curr.Y)) } } return out }