from __future__ import annotations import math from collections import deque from typing import Iterable, Sequence class CycleError(RuntimeError): """ Raised when the requested layout requires a DAG, but a cycle is detected. `remaining_node_ids` are the node ids that still had indegree > 0 after Kahn. """ def __init__( self, *, processed: int, total: int, remaining_node_ids: list[int], remaining_iri_sample: list[str] | None = None, ) -> None: self.processed = int(processed) self.total = int(total) self.remaining_node_ids = remaining_node_ids self.remaining_iri_sample = remaining_iri_sample msg = f"Cycle detected in subClassOf graph (processed {self.processed}/{self.total} nodes)." if remaining_iri_sample: msg += f" Example nodes: {', '.join(remaining_iri_sample)}" super().__init__(msg) def level_synchronous_kahn_layers( *, node_count: int, edges: Iterable[tuple[int, int]], ) -> list[list[int]]: """ Level-synchronous Kahn's algorithm: - process the entire current queue as one batch (one layer) - only then enqueue newly-unlocked nodes for the next batch `edges` are directed (u -> v). """ n = int(node_count) if n <= 0: return [] adj: list[list[int]] = [[] for _ in range(n)] indeg = [0] * n for u, v in edges: if u == v: # Self-loops don't help layout and would trivially violate DAG-ness. continue if not (0 <= u < n and 0 <= v < n): continue adj[u].append(v) indeg[v] += 1 q: deque[int] = deque(i for i, d in enumerate(indeg) if d == 0) layers: list[list[int]] = [] processed = 0 while q: # Consume the full current queue as a single layer. layer = list(q) q.clear() layers.append(layer) for u in layer: processed += 1 for v in adj[u]: indeg[v] -= 1 if indeg[v] == 0: q.append(v) if processed != n: remaining = [i for i, d in enumerate(indeg) if d > 0] raise CycleError(processed=processed, total=n, remaining_node_ids=remaining) return layers def radial_positions_from_layers( *, node_count: int, layers: Sequence[Sequence[int]], max_r: float = 5000.0, ) -> tuple[list[float], list[float]]: """ Assign node positions in concentric rings (one ring per layer). - radius increases with layer index - nodes within a layer are placed evenly by angle - each ring gets a "golden-angle" rotation to reduce spoke artifacts """ n = int(node_count) if n <= 0: return ([], []) xs = [0.0] * n ys = [0.0] * n if not layers: return (xs, ys) two_pi = 2.0 * math.pi golden = math.pi * (3.0 - math.sqrt(5.0)) layer_count = len(layers) denom = float(layer_count + 1) for li, layer in enumerate(layers): m = len(layer) if m <= 0: continue # Keep everything within ~[-max_r, max_r] like the previous spiral layout. r = ((li + 1) / denom) * max_r # Rotate each layer deterministically to avoid radial spokes aligning. offset = (li * golden) % two_pi if m == 1: nid = int(layer[0]) if 0 <= nid < n: xs[nid] = r * math.cos(offset) ys[nid] = r * math.sin(offset) continue step = two_pi / float(m) for j, raw_id in enumerate(layer): nid = int(raw_id) if not (0 <= nid < n): continue t = offset + step * float(j) xs[nid] = r * math.cos(t) ys[nid] = r * math.sin(t) return (xs, ys)