#!/usr/bin/env npx tsx /** * Fetch RDF Data from AnzoGraph DB * * 1. Query the first 1000 distinct subject URIs * 2. Fetch all triples where those URIs appear as subject or object * 3. Identify primary nodes (objects of rdf:type) * 4. Write primary_edges.csv, secondary_edges.csv, and uri_map.csv * * Usage: npx tsx scripts/fetch_from_db.ts [--host http://localhost:8080] */ import { writeFileSync } from "fs"; import { join, dirname } from "path"; import { fileURLToPath } from "url"; const __dirname = dirname(fileURLToPath(import.meta.url)); const PUBLIC_DIR = join(__dirname, "..", "public"); // ══════════════════════════════════════════════════════════ // Configuration // ══════════════════════════════════════════════════════════ const RDF_TYPE = "http://www.w3.org/1999/02/22-rdf-syntax-ns#type"; const BATCH_SIZE = 100; // URIs per VALUES batch query const MAX_RETRIES = 30; // Wait up to ~120s for AnzoGraph to start const RETRY_DELAY_MS = 4000; // Path to TTL file inside the AnzoGraph container (mapped via docker-compose volume) const DATA_FILE = process.env.SPARQL_DATA_FILE || "file:///opt/shared-files/vkg-materialized.ttl"; // Parse --host flag, default to http://localhost:8080 function getEndpoint(): string { const hostIdx = process.argv.indexOf("--host"); if (hostIdx !== -1 && process.argv[hostIdx + 1]) { return process.argv[hostIdx + 1]; } // Inside Docker, use service name; otherwise localhost return process.env.SPARQL_HOST || "http://localhost:8080"; } const SPARQL_ENDPOINT = `${getEndpoint()}/sparql`; // Auth credentials (AnzoGraph defaults) const SPARQL_USER = process.env.SPARQL_USER || "admin"; const SPARQL_PASS = process.env.SPARQL_PASS || "Passw0rd1"; const AUTH_HEADER = "Basic " + Buffer.from(`${SPARQL_USER}:${SPARQL_PASS}`).toString("base64"); // ══════════════════════════════════════════════════════════ // SPARQL helpers // ══════════════════════════════════════════════════════════ interface SparqlBinding { [key: string]: { type: string; value: string }; } function sleep(ms: number): Promise { return new Promise((resolve) => setTimeout(resolve, ms)); } async function sparqlQuery(query: string, retries = 5): Promise { for (let attempt = 1; attempt <= retries; attempt++) { const controller = new AbortController(); const timeout = setTimeout(() => controller.abort(), 300_000); // 5 min timeout try { const t0 = performance.now(); const response = await fetch(SPARQL_ENDPOINT, { method: "POST", headers: { "Content-Type": "application/x-www-form-urlencoded", "Accept": "application/sparql-results+json", "Authorization": AUTH_HEADER, }, body: "query=" + encodeURIComponent(query), signal: controller.signal, }); const t1 = performance.now(); console.log(` [sparql] response status=${response.status} in ${((t1 - t0) / 1000).toFixed(1)}s`); if (!response.ok) { const text = await response.text(); throw new Error(`SPARQL query failed (${response.status}): ${text}`); } const text = await response.text(); const t2 = performance.now(); console.log(` [sparql] body read (${(text.length / 1024).toFixed(0)} KB) in ${((t2 - t1) / 1000).toFixed(1)}s`); const json = JSON.parse(text); return json.results.bindings; } catch (err: any) { clearTimeout(timeout); const msg = err instanceof Error ? err.message : String(err); const isTransient = msg.includes("fetch failed") || msg.includes("Timeout") || msg.includes("ABORT") || msg.includes("abort"); if (isTransient && attempt < retries) { console.log(` [sparql] transient error (attempt ${attempt}/${retries}): ${msg.substring(0, 100)}`); console.log(` [sparql] retrying in 10s (AnzoGraph may still be indexing after LOAD)...`); await sleep(10_000); continue; } throw err; } finally { clearTimeout(timeout); } } throw new Error("sparqlQuery: should not reach here"); } async function waitForAnzoGraph(): Promise { console.log(`Waiting for AnzoGraph at ${SPARQL_ENDPOINT}...`); for (let attempt = 1; attempt <= MAX_RETRIES; attempt++) { try { const response = await fetch(SPARQL_ENDPOINT, { method: "POST", headers: { "Content-Type": "application/x-www-form-urlencoded", "Accept": "application/sparql-results+json", "Authorization": AUTH_HEADER, }, body: "query=" + encodeURIComponent("ASK WHERE { ?s ?p ?o }"), }); const text = await response.text(); // Verify it's actual JSON (not a plain-text error from a half-started engine) JSON.parse(text); console.log(` AnzoGraph is ready (attempt ${attempt})`); return; } catch (err: any) { const msg = err instanceof Error ? err.message : String(err); console.log(` Attempt ${attempt}/${MAX_RETRIES}: ${msg.substring(0, 100)}`); if (attempt < MAX_RETRIES) { await sleep(RETRY_DELAY_MS); } } } throw new Error(`AnzoGraph not available after ${MAX_RETRIES} attempts`); } async function sparqlUpdate(update: string): Promise { const response = await fetch(SPARQL_ENDPOINT, { method: "POST", headers: { "Content-Type": "application/sparql-update", "Accept": "application/json", "Authorization": AUTH_HEADER, }, body: update, }); const text = await response.text(); if (!response.ok) { throw new Error(`SPARQL update failed (${response.status}): ${text}`); } return text; } async function loadData(): Promise { console.log(`Loading data from ${DATA_FILE}...`); const t0 = performance.now(); const result = await sparqlUpdate(`LOAD <${DATA_FILE}>`); const elapsed = ((performance.now() - t0) / 1000).toFixed(1); console.log(` Load complete in ${elapsed}s: ${result.substring(0, 200)}`); } // ══════════════════════════════════════════════════════════ // Step 1: Fetch seed URIs // ══════════════════════════════════════════════════════════ async function fetchSeedURIs(): Promise { console.log("Querying first 1000 distinct subject URIs..."); const t0 = performance.now(); const query = ` SELECT DISTINCT ?s WHERE { ?s ?p ?o } LIMIT 1000 `; const bindings = await sparqlQuery(query); const elapsed = ((performance.now() - t0) / 1000).toFixed(1); const uris = bindings.map((b) => b.s.value); console.log(` Got ${uris.length} seed URIs in ${elapsed}s`); return uris; } // ══════════════════════════════════════════════════════════ // Step 2: Fetch all triples involving seed URIs // ══════════════════════════════════════════════════════════ interface Triple { s: string; p: string; o: string; oType: string; // "uri" or "literal" } async function fetchTriples(seedURIs: string[]): Promise { console.log(`Fetching triples for ${seedURIs.length} seed URIs (batch size: ${BATCH_SIZE})...`); const allTriples: Triple[] = []; for (let i = 0; i < seedURIs.length; i += BATCH_SIZE) { const batch = seedURIs.slice(i, i + BATCH_SIZE); const valuesClause = batch.map((u) => `<${u}>`).join(" "); const query = ` SELECT ?s ?p ?o WHERE { VALUES ?uri { ${valuesClause} } { ?uri ?p ?o . BIND(?uri AS ?s) } UNION { ?s ?p ?uri . BIND(?uri AS ?o) } } `; const bindings = await sparqlQuery(query); for (const b of bindings) { allTriples.push({ s: b.s.value, p: b.p.value, o: b.o.value, oType: b.o.type, }); } const progress = Math.min(i + BATCH_SIZE, seedURIs.length); process.stdout.write(`\r Fetched triples: batch ${Math.ceil(progress / BATCH_SIZE)}/${Math.ceil(seedURIs.length / BATCH_SIZE)} (${allTriples.length} triples so far)`); } console.log(`\n Total triples: ${allTriples.length}`); return allTriples; } // ══════════════════════════════════════════════════════════ // Step 3: Build graph data // ══════════════════════════════════════════════════════════ interface GraphData { nodeURIs: string[]; // All unique URIs (subjects & objects that are URIs) uriToId: Map; primaryNodeIds: Set; // Nodes that are objects of rdf:type edges: Array<[number, number]>; // [source, target] as numeric IDs primaryEdges: Array<[number, number]>; secondaryEdges: Array<[number, number]>; } function buildGraphData(triples: Triple[]): GraphData { console.log("Building graph data..."); // Collect all unique URI nodes (skip literal objects) const uriSet = new Set(); for (const t of triples) { uriSet.add(t.s); if (t.oType === "uri") { uriSet.add(t.o); } } // Assign numeric IDs const nodeURIs = Array.from(uriSet).sort(); const uriToId = new Map(); nodeURIs.forEach((uri, idx) => uriToId.set(uri, idx)); // Identify primary nodes: objects of rdf:type triples const primaryNodeIds = new Set(); for (const t of triples) { if (t.p === RDF_TYPE && t.oType === "uri") { const objId = uriToId.get(t.o); if (objId !== undefined) { primaryNodeIds.add(objId); } } } // Build edges (only between URI nodes, skip literal objects) const edgeSet = new Set(); const edges: Array<[number, number]> = []; for (const t of triples) { if (t.oType !== "uri") continue; const srcId = uriToId.get(t.s); const dstId = uriToId.get(t.o); if (srcId === undefined || dstId === undefined) continue; if (srcId === dstId) continue; // Skip self-loops const key = `${srcId},${dstId}`; if (edgeSet.has(key)) continue; // Deduplicate edgeSet.add(key); edges.push([srcId, dstId]); } // Classify edges into primary (touches a primary node) and secondary const primaryEdges: Array<[number, number]> = []; const secondaryEdges: Array<[number, number]> = []; for (const [src, dst] of edges) { if (primaryNodeIds.has(src) || primaryNodeIds.has(dst)) { primaryEdges.push([src, dst]); } else { secondaryEdges.push([src, dst]); } } console.log(` Nodes: ${nodeURIs.length}`); console.log(` Primary nodes (rdf:type objects): ${primaryNodeIds.size}`); console.log(` Edges: ${edges.length} (primary: ${primaryEdges.length}, secondary: ${secondaryEdges.length})`); return { nodeURIs, uriToId, primaryNodeIds, edges, primaryEdges, secondaryEdges }; } // ══════════════════════════════════════════════════════════ // Step 4: Write CSV files // ══════════════════════════════════════════════════════════ function extractLabel(uri: string): string { // Extract local name: after # or last / const hashIdx = uri.lastIndexOf("#"); if (hashIdx !== -1) return uri.substring(hashIdx + 1); const slashIdx = uri.lastIndexOf("/"); if (slashIdx !== -1) return uri.substring(slashIdx + 1); return uri; } function writeCSVs(data: GraphData): void { // Write primary_edges.csv const pLines = ["source,target"]; for (const [src, dst] of data.primaryEdges) { pLines.push(`${src},${dst}`); } const pPath = join(PUBLIC_DIR, "primary_edges.csv"); writeFileSync(pPath, pLines.join("\n") + "\n"); console.log(`Wrote ${data.primaryEdges.length} primary edges to ${pPath}`); // Write secondary_edges.csv const sLines = ["source,target"]; for (const [src, dst] of data.secondaryEdges) { sLines.push(`${src},${dst}`); } const sPath = join(PUBLIC_DIR, "secondary_edges.csv"); writeFileSync(sPath, sLines.join("\n") + "\n"); console.log(`Wrote ${data.secondaryEdges.length} secondary edges to ${sPath}`); // Write uri_map.csv (id,uri,label,isPrimary) const uLines = ["id,uri,label,isPrimary"]; for (let i = 0; i < data.nodeURIs.length; i++) { const uri = data.nodeURIs[i]; const label = extractLabel(uri); const isPrimary = data.primaryNodeIds.has(i) ? "1" : "0"; // Escape commas in URIs by quoting const safeUri = uri.includes(",") ? `"${uri}"` : uri; const safeLabel = label.includes(",") ? `"${label}"` : label; uLines.push(`${i},${safeUri},${safeLabel},${isPrimary}`); } const uPath = join(PUBLIC_DIR, "uri_map.csv"); writeFileSync(uPath, uLines.join("\n") + "\n"); console.log(`Wrote ${data.nodeURIs.length} URI mappings to ${uPath}`); } // ══════════════════════════════════════════════════════════ // Main // ══════════════════════════════════════════════════════════ async function main() { console.log(`SPARQL endpoint: ${SPARQL_ENDPOINT}`); const t0 = performance.now(); await waitForAnzoGraph(); await loadData(); // Smoke test: simplest possible query to verify connectivity console.log("Smoke test: SELECT ?s ?p ?o LIMIT 3..."); const smokeT0 = performance.now(); const smokeResult = await sparqlQuery("SELECT ?s ?p ?o WHERE { ?s ?p ?o } LIMIT 3"); const smokeElapsed = ((performance.now() - smokeT0) / 1000).toFixed(1); console.log(` Smoke test OK: ${smokeResult.length} results in ${smokeElapsed}s`); if (smokeResult.length > 0) { console.log(` First triple: ${smokeResult[0].s.value} ${smokeResult[0].p.value} ${smokeResult[0].o.value}`); } const seedURIs = await fetchSeedURIs(); const triples = await fetchTriples(seedURIs); const graphData = buildGraphData(triples); writeCSVs(graphData); const elapsed = ((performance.now() - t0) / 1000).toFixed(1); console.log(`\nDone in ${elapsed}s`); } main().catch((err) => { console.error("Fatal error:", err); process.exit(1); });