"""
Routing primitives backed by `scipy.sparse.csgraph.dijkstra`. Input graphs
are `networkx.Graph` (or its multi/directed variants); aperta converts to a
scipy CSR matrix internally per call. No backend choice — scipy throughout.
The library exposes two concerns separately:
1. **Edge weighting** — `apply_edge_weights` runs a user-supplied callable on each
edge of a graph and writes the result to a named edge attribute. Mode-specific
behavior (car vs. bike vs. walking, peak vs. off-peak, density-adjusted speeds,
intersection penalties, etc.) lives in these callables, not in this module.
`combine_edge_weights` sums multiple per-edge components into a single routing
weight (e.g. edge travel time + intersection penalty -> total cost).
`mask_excluded_edges` wraps a weight callable to return cost = ∞ for edges
flagged by `prepare_network` as non-traversable for a given mode (e.g.
motorways for walking).
2. **Routing primitives** — covering the common query shapes:
- `shortest_distances_from`: single source → all reachable nodes (with optional
cutoff). Used for accessibility / isochrone calculations.
- `shortest_distances_pairwise`: full distance matrix between two node lists.
- `shortest_path_metrics_one_to_one`: paired (origin, destination) routing
that also aggregates edge features along each path. Used for travel-time
model calibration and similar trip-by-trip work.
- `tiered_path_costs` / `tiered_path_aggregate`: bulk many-to-many routing
across the three-tier OD structure.
Why scipy-only: aperta's routing workflow is one-shot Dijkstra-from-many-origins
on a *live* graph whose edge weights are routinely mutated (calibration loop,
scenario comparison, time-of-day variants). Contraction hierarchies (Pandana,
OSRM) preprocess once and reuse, which fights this workflow. The historical
networkx / igraph backends were dropped in favour of scipy CSR + scipy
`csgraph.dijkstra`, which is competitive in raw speed, zero-preprocessing,
and has a single code path.
A future `RoutingProfile` class will bundle the duration callable + parameters +
graph into one object; for now, callers compose the pieces themselves.
"""
import logging
from collections.abc import Sequence
from typing import Callable, NamedTuple
import networkx as nx
import numpy as np
import pandas as pd
from aperta.errors import DataError
from aperta.od_pairs import TieredODNodePairs, TieredODPairs
# ---------------------------------------------------------------------------
# Edge weighting
# ---------------------------------------------------------------------------
[docs]
def apply_edge_weights(graph: nx.Graph, weight_fn: Callable, weight_name: str, **fn_kwargs) -> None:
"""Apply `weight_fn` to each edge of `graph` (mutates `graph` in place).
`weight_fn` receives the edge data dict plus any extra `fn_kwargs`. The
dict supports `row['key']` access just like a pandas Series, so callables
written against a GeoDataFrame edge-row pattern work without modification.
"""
if isinstance(graph, (nx.MultiGraph, nx.MultiDiGraph)):
for _u, _v, _k, data in graph.edges(keys=True, data=True):
data[weight_name] = weight_fn(data, **fn_kwargs)
else:
for _u, _v, data in graph.edges(data=True):
data[weight_name] = weight_fn(data, **fn_kwargs)
[docs]
def mask_excluded_edges(weight_fn: Callable, cost_excluded_flag: str) -> Callable:
"""Wrap `weight_fn` so edges flagged `cost_excluded_flag=True` get cost = ∞.
Companion to `routing_prep.prepare_network`, which writes the per-edge
boolean flag based on a mode's `cost_excluded_tags`. The flag name is
recorded on `PreparedGraph.cost_excluded_flag`; pass it here to bake
the mode-specific edge exclusion into a routing-weight callable.
Typical use:
prepared = prepare_network(graph, "walk")
weight_fn = mask_excluded_edges(duration_walk_fn, prepared.cost_excluded_flag)
apply_edge_weights(prepared.graph, weight_fn, "duration_walk_s")
Edges with the flag missing or `False` flow through to `weight_fn`
unchanged.
"""
def masked(edge_data, **kwargs):
if edge_data.get(cost_excluded_flag, False):
return float("inf")
return weight_fn(edge_data, **kwargs)
return masked
[docs]
def combine_edge_weights(graph: nx.Graph, source_names: list[str], target_name: str) -> None:
"""Sum multiple per-edge attributes into one combined weight (in place).
Use case (lumos pattern): travel time and intersection penalty are computed
separately, stored as `duration_edge_t3` and `duration_node_t3`, then summed
into the routing weight `duration_t3`.
"""
if isinstance(graph, (nx.MultiGraph, nx.MultiDiGraph)):
for _u, _v, _k, data in graph.edges(keys=True, data=True):
data[target_name] = sum(float(data[name]) for name in source_names)
else:
for _u, _v, data in graph.edges(data=True):
data[target_name] = sum(float(data[name]) for name in source_names)
# ---------------------------------------------------------------------------
# Cutoff semantics — opt-in via `cutoff=` on tiered routing.
#
# When the caller passes `cutoff=T` to `tiered_path_costs` /
# `tiered_path_aggregate`, the per-origin Dijkstra is run via
# scipy.sparse.csgraph.dijkstra with `limit=T`. This truncates the
# Dijkstra at network distance T (in weight units): destinations beyond
# T return `inf` cost / `nan` aggregations, and the inner loop exits
# early instead of exploring the full settled-set. The speed-up grows
# as T shrinks relative to graph diameter (most pronounced for walk on
# country-scale networks with short cumulative-opportunity bins, where
# T can be 1-2 orders of magnitude below diameter).
# ---------------------------------------------------------------------------
def _graph_to_csr(graph: nx.Graph, weight: str, return_parallel_keys: bool = False):
"""Build a scipy CSR matrix from `graph` using `weight` as edge cost.
For MultiGraph / MultiDiGraph parallels, keeps the minimum-weight edge
per (u, v) — matches igraph's `distances()` choice.
Undirected graphs (`nx.Graph` / `nx.MultiGraph`) are emitted as symmetric
CSR so callers can use scipy's default `directed=True` dijkstra. For
parallel-edge multigraphs the per-direction min is taken independently,
so an undirected MultiGraph with one parallel weighted 3 in (u, v) and
another weighted 5 still gets `csr[u, v] = csr[v, u] = 3` (the user's
intent for "undirected" is symmetric).
Args:
graph: networkx graph (any variant).
weight: edge attribute name used as the per-edge routing cost.
return_parallel_keys: when `True`, also return a `parallel_keys`
dict mapping `(u_seq, v_seq) -> nx_edge_key` for MultiGraph
inputs (the key of the parallel edge whose weight was kept).
For non-Multi graphs the dict is empty. Use this when the
caller needs to attribute path edges back to specific
MultiGraph edge IDs (e.g. edge betweenness on a MultiDiGraph
where output keys are `(u, v, k)` triples).
Returns:
`(csr, nx_to_seq, seq_to_nx)` by default, or
`(csr, nx_to_seq, seq_to_nx, parallel_keys)` when
`return_parallel_keys=True`.
"""
import scipy.sparse
node_ids = list(graph.nodes())
nx_to_seq = {n: i for i, n in enumerate(node_ids)}
seq_to_nx = np.array(node_ids, dtype=object)
is_multi = isinstance(graph, (nx.MultiGraph, nx.MultiDiGraph))
is_directed = graph.is_directed()
track_keys = return_parallel_keys and is_multi
min_weight: dict[tuple[int, int], float] = {}
parallel_keys: dict[tuple[int, int], object] = {} if track_keys else {}
def _update(key_uv: tuple[int, int], w: float, k=None) -> None:
if key_uv not in min_weight or min_weight[key_uv] > w:
min_weight[key_uv] = w
if track_keys:
parallel_keys[key_uv] = k
if is_multi:
for u, v, k, data in graph.edges(keys=True, data=True):
w = float(data[weight])
ui, vi = nx_to_seq[u], nx_to_seq[v]
_update((ui, vi), w, k)
if not is_directed:
_update((vi, ui), w, k)
else:
for u, v, data in graph.edges(data=True):
w = float(data[weight])
ui, vi = nx_to_seq[u], nx_to_seq[v]
_update((ui, vi), w)
if not is_directed:
_update((vi, ui), w)
n = len(node_ids)
if min_weight:
rows = np.fromiter(
(u for u, _v in min_weight.keys()), dtype=np.int64, count=len(min_weight)
)
cols = np.fromiter(
(v for _u, v in min_weight.keys()), dtype=np.int64, count=len(min_weight)
)
data = np.fromiter(min_weight.values(), dtype=float, count=len(min_weight))
else:
rows = cols = np.empty(0, dtype=np.int64)
data = np.empty(0, dtype=float)
csr = scipy.sparse.csr_matrix((data, (rows, cols)), shape=(n, n), dtype=float)
if return_parallel_keys:
return csr, nx_to_seq, seq_to_nx, parallel_keys
return csr, nx_to_seq, seq_to_nx
def _walk_predecessors_to_path(
predecessors_row: np.ndarray, origin_seq: int, target_seq: int, seq_to_nx: np.ndarray
) -> list:
"""Reconstruct path (as list of nx node IDs) from scipy's predecessor row.
Returns `[]` if `target_seq` is unreachable (predecessor chain hits -9999
before reaching origin). Returns `[origin_id]` if target == origin.
"""
if target_seq == origin_seq:
return [seq_to_nx[origin_seq]]
path_seq = [target_seq]
while path_seq[-1] != origin_seq:
p = predecessors_row[path_seq[-1]]
if p < 0:
return [] # unreachable
path_seq.append(int(p))
path_seq.reverse()
return [seq_to_nx[s] for s in path_seq]
# ---------------------------------------------------------------------------
# Routing primitives — all backed by `scipy.sparse.csgraph.dijkstra`. The
# input is always an `nx.Graph` (or a multi/directed variant); aperta
# converts to a scipy CSR matrix internally via `_graph_to_csr`.
# ---------------------------------------------------------------------------
[docs]
def shortest_distances_from(
graph: nx.Graph,
origin,
weight: str,
cutoff: float | None = None,
) -> dict:
"""Single-source shortest distances from `origin` to all reachable nodes.
Returns a dict mapping node -> total weight. With `cutoff`, only nodes
within that weight threshold are returned. Unreachable nodes are omitted.
"""
import scipy.sparse.csgraph as csg
csr, nx_to_seq, seq_to_nx = _graph_to_csr(graph, weight)
limit = cutoff if cutoff is not None else np.inf
dist_row = csg.dijkstra(
csr, indices=[nx_to_seq[origin]], limit=limit, return_predecessors=False
)[0]
return {seq_to_nx[i]: float(d) for i, d in enumerate(dist_row) if np.isfinite(d)}
[docs]
def shortest_distances_pairwise(
graph: nx.Graph,
origins: list,
destinations: list,
weight: str,
cutoff: float | None = None,
) -> np.ndarray:
"""Distance matrix for `origins` x `destinations`.
Returns ndarray of shape (len(origins), len(destinations)). Unreachable
destinations are `np.inf`.
"""
import scipy.sparse.csgraph as csg
csr, nx_to_seq, _ = _graph_to_csr(graph, weight)
limit = cutoff if cutoff is not None else np.inf
origin_seqs = [nx_to_seq[o] for o in origins]
dest_seqs = np.fromiter(
(nx_to_seq[d] for d in destinations), dtype=np.int64, count=len(destinations)
)
dist = csg.dijkstra(csr, indices=origin_seqs, limit=limit, return_predecessors=False)
return dist[:, dest_seqs]
[docs]
def shortest_path_metrics_one_to_one(
graph: nx.Graph,
trip_ids: list | pd.Series | np.ndarray,
origins: list | pd.Series | np.ndarray,
destinations: list | pd.Series | np.ndarray,
weight: str,
length_attr: str = "length",
edge_features: dict[str, str] | None = None,
*,
cutoff: float | None = None,
) -> pd.DataFrame:
"""Paired (origin, destination) shortest-path routing with edge-feature aggregation.
`edge_features` maps an edge attribute name to an aggregation:
- 'sum' : element-wise sum along the path (e.g. count of intersections)
- 'length_weighted' : average weighted by edge length (e.g. average gradient)
Returns a DataFrame indexed by trip_id with columns:
- `distance` (sum of `length_attr` along the path)
- `cost` (sum of `weight` along the path)
- one column per requested edge feature
Trips with no path are omitted from the output (so output length <= input length).
`cutoff` (optional): per-origin Dijkstra `limit=` in `weight` units. Set
this to a value comfortably above the longest expected trip cost — trips
that would route beyond it are silently dropped (treated as unreachable),
same as actually-unreachable trips. Big speed-up on large graphs when
trip costs are small relative to graph diameter (e.g. urban trips on a
country-scale road graph). Default `None` = no cutoff.
Raises DataError if 'distance' or 'cost' appear in `edge_features` (would
collide with the built-in path-total columns).
"""
if not (len(trip_ids) == len(origins) == len(destinations)):
raise DataError("trip_ids, origins, and destinations must have equal lengths.")
edge_features = edge_features or {}
reserved = {"distance", "cost"} & set(edge_features)
if reserved:
raise DataError(f"edge_features may not include reserved column names: {sorted(reserved)}")
import scipy.sparse.csgraph as csg
csr, nx_to_seq, seq_to_nx = _graph_to_csr(graph, weight)
is_multi = isinstance(graph, (nx.MultiGraph, nx.MultiDiGraph))
limit = cutoff if cutoff is not None else np.inf
# Group trips by origin so we only call dijkstra once per unique origin
# — far cheaper than per-trip when many trips share an origin (typical
# of calibration data: per-person trip diaries).
by_origin: dict = {}
for trip_id, o, d in zip(trip_ids, origins, destinations):
by_origin.setdefault(o, []).append((trip_id, d))
rows = {}
for origin, trips in by_origin.items():
origin_seq = nx_to_seq[origin]
dist, pred = csg.dijkstra(csr, indices=[origin_seq], limit=limit, return_predecessors=True)
for trip_id, dest in trips:
target_seq = nx_to_seq[dest]
if not np.isfinite(dist[0, target_seq]):
continue
npath = _walk_predecessors_to_path(pred[0], origin_seq, target_seq, seq_to_nx)
if not npath:
continue
edge_data = [
_pick_min_weight_edge(graph, u, v, weight, is_multi)
for u, v in zip(npath[:-1], npath[1:])
]
lengths = np.array([ed.get(length_attr, 0.0) for ed in edge_data])
costs = np.array([ed[weight] for ed in edge_data])
row = {"distance": float(lengths.sum()), "cost": float(costs.sum())}
for feature, agg in edge_features.items():
values = np.array([ed.get(feature, 0.0) for ed in edge_data])
row[feature] = _aggregate(values, lengths, agg, feature)
rows[trip_id] = row
return pd.DataFrame.from_dict(rows, orient="index")
def _pick_min_weight_edge(graph: nx.Graph, u, v, weight: str, is_multi: bool) -> dict:
"""For a (multi)graph, return the edge data dict for the cheapest parallel edge."""
data = graph.get_edge_data(u, v)
if data is None:
raise DataError(f"No edge between {u} and {v}.")
if is_multi:
return min(data.values(), key=lambda d: d[weight])
return data
def _aggregate(values: np.ndarray, weights: np.ndarray, agg: str, feature: str) -> float:
"""Aggregate per-edge `values` along one realised path.
Supported aggregations: `'sum'` (plain sum) and `'length_weighted'`
(weighted average using `weights`, typically per-edge length). Raises
`DataError` for unknown `agg`, naming `feature` so the error points at
the offending route-feature specification.
"""
if agg == "sum":
return float(values.sum())
if agg == "length_weighted":
if weights.sum() == 0:
return float("nan")
return float(np.average(values, weights=weights))
raise DataError(f"Unknown aggregation `{agg}` for feature `{feature}`.")
# ---------------------------------------------------------------------------
# Tiered OD routing
# ---------------------------------------------------------------------------
[docs]
def tiered_path_costs(
pairs: TieredODPairs,
graph: nx.Graph,
weight: str,
*,
mask: TieredODPairs | None = None,
cutoff: float | None = None,
dtype: np.dtype | type = np.float32,
) -> TieredODPairs:
"""Shortest-path cost (sum of edge `weight` along the path) for every OD pair
in `pairs`, across all tiers.
Single-process. For the experimental multi-process variant see
`tiered_path_costs_mp`. This is the hot path for almost every aperta
application — the closure-based inner loop is on purpose, not a refactor
candidate (a module-level worker pattern adds per-origin dict lookups
that measurably slow down single-process routing).
Every tier is routed across the same `graph`. All node IDs referenced
anywhere in `pairs` — cell nodes (cells_to_cells keys + values), zone
nodes (cells_to_zones values, zones_to_zones keys + values) — must
therefore be present in `graph`.
Args:
pairs: TieredODPairs of destination IDs (typically from `od_pairs.get_pairs`).
graph: networkx routable graph containing every node referenced in
`pairs`. Converted internally to a scipy CSR matrix.
weight: edge attribute name used as the per-edge routing cost (e.g.
`'duration_naive'`, `'duration_traffic_iterative'`).
mask: optional boolean `TieredODPairs` (build via `od_pairs.make_mask`).
Destinations where the mask is `False` are skipped and stored as
`np.inf` in the output (same convention as unreachable). Output
arrays keep the same length as the input pairs (position-wise
alignment is preserved); use the mask itself to distinguish
"masked-out" from "unreachable" if you care. Missing origins or
missing tiers in the mask are treated as "no filter".
cutoff: optional network-distance cutoff in weight units (e.g. seconds
for time-weighted edges, metres for length-weighted). Passed through
to `scipy.sparse.csgraph.dijkstra` as `limit=cutoff`, truncating
each per-origin Dijkstra at the cutoff. Big speed-up when the
cutoff is small relative to graph diameter (e.g. walk accessibility
on a country-scale graph). Destinations beyond cutoff are stored
as `np.inf` — same convention as unreachable, so downstream metrics
(`cumulative_opportunities` etc.) handle them naturally. Default `None` = no
cutoff (`limit=np.inf`).
dtype: dtype of returned cost arrays (default `np.float32` — halves
memory + on-disk size vs `float64`, with seconds-resolution
precision more than sufficient for travel costs). Pass
`np.float64` if downstream arithmetic needs the extra range
(e.g. logsum with very small scale parameter).
Returns:
`TieredODPairs` of cost arrays paired position-wise with `pairs`. Each
unreachable, masked-out, or beyond-cutoff destination is stored as
`np.inf`.
"""
import scipy.sparse.csgraph as csg
csr, nx_to_seq, _seq_to_nx = _graph_to_csr(graph, weight)
zero_edge = csr.nnz == 0
limit = cutoff if cutoff is not None else np.inf
def _route_subset(orig, sub_dests):
origin_seq = nx_to_seq[orig]
dist_row = csg.dijkstra(csr, indices=[origin_seq], limit=limit, return_predecessors=False)[
0
]
seq_dests = np.fromiter(
(nx_to_seq[d] for d in sub_dests), dtype=np.int64, count=len(sub_dests)
)
return dist_row[seq_dests]
def _per_origin(orig, dests, dest_mask):
n = len(dests)
if n == 0:
return np.empty(0, dtype=dtype)
if zero_edge:
return np.array([0.0 if d == orig else np.inf for d in dests], dtype=dtype)
if dest_mask is None:
return _route_subset(orig, dests).astype(dtype, copy=False)
true_idx = np.where(dest_mask)[0]
out = np.full(n, np.inf, dtype=dtype)
if len(true_idx) > 0:
out[true_idx] = _route_subset(orig, dests[true_idx])
return out
logging.info(
f"tiered_path_costs: routing single-process "
f"(scipy, cutoff={'none' if cutoff is None else cutoff})..."
)
def _process(tier_name: str, tier: dict | None, mask_tier: dict | None) -> dict | None:
if tier is None:
return None
n = len(tier)
# Per-tier counter and progress step: long-distance tiers (zone-to-zone)
# typically take much longer per origin than cell-tier ones, so
# tracking a single global counter would compress the early-tier
# progress into one big jump and stretch the later tiers' updates.
log_every = max(1, n // 10)
out: dict = {}
for i, (orig, dests) in enumerate(tier.items(), start=1):
dest_mask = mask_tier.get(orig) if mask_tier is not None else None
out[orig] = _per_origin(orig, dests, dest_mask)
if i % log_every == 0 or i == n:
logging.info(f" {tier_name}: {i:,} of {n:,} origins routed")
return out
cells_mask = mask.cells_to_cells if mask is not None else None
c2z_mask = mask.cells_to_zones if mask is not None else None
zones_mask = mask.zones_to_zones if mask is not None else None
return TieredODNodePairs(
cells_to_cells=_process("cells_to_cells", pairs.cells_to_cells, cells_mask),
cells_to_zones=_process("cells_to_zones", pairs.cells_to_zones, c2z_mask),
zones_to_zones=_process("zones_to_zones", pairs.zones_to_zones, zones_mask),
)
[docs]
class PathAggregation(NamedTuple):
"""Named per-edge feature aggregation along realised shortest paths.
`name` labels the corresponding output column in `tiered_path_aggregate`'s
return dict. `attribute` extracts a per-edge value; `aggregator` combines
those values into one scalar per OD pair.
`attribute`:
- `str`: name of an edge attribute on the graph; the per-edge value is
`edge_data[attribute]`.
- `Callable[(u, v, data) -> float]`: arbitrary per-edge function.
`aggregator`:
- `'sum'`: sum across path edges (returns 0 for an empty path).
- `'mean'`: arithmetic mean (returns NaN for an empty path).
- `'min'`, `'max'`: respective extremes (NaN for an empty path).
- `Callable[(np.ndarray) -> float]`: arbitrary callable on the
per-edge value array.
"""
name: str
attribute: str | Callable
aggregator: str | Callable = "sum"
[docs]
class NodeAggregation(NamedTuple):
"""Named per-node feature aggregation along realised shortest paths.
Parallel to `PathAggregation` but for node attributes (e.g. counting
traffic signals encountered, or finding the highest-elevation node
along a route). The node sequence of a path is `[u₀, u₁, ..., uₙ]`;
`include_endpoints` controls whether the route's origin (u₀) and
destination (uₙ) nodes contribute.
`name`, `aggregator`: as in `PathAggregation`.
`attribute`:
- `str`: name of a node attribute on the graph; the per-node value
is `node_data[attribute]`.
- `Callable[(node, data) -> float]`: arbitrary per-node function.
`include_endpoints`:
- `True` (default): all `n+1` nodes contribute, including origin
and destination. Risk: endpoints shared across many routes get
amplified weight in cross-route counts.
- `False`: interior nodes only (`u₁ .. uₙ₋₁`). Self-pair `[u]` and
single-edge path `[u, v]` both yield an empty array → aggregator
empty-path semantics apply (`'sum'` → 0; `'mean'/'min'/'max'`
→ NaN).
"""
name: str
attribute: str | Callable
aggregator: str | Callable = "sum"
include_endpoints: bool = True
def _resolve_attribute(attr: str | Callable) -> Callable:
"""Normalise an edge `attribute` spec into `(u, v, data) -> value`."""
if isinstance(attr, str):
return lambda u, v, data: data[attr]
if callable(attr):
return attr
raise ValueError(f"`attribute` must be a string or callable, got {type(attr).__name__}.")
def _resolve_node_attribute(attr: str | Callable) -> Callable:
"""Normalise a node `attribute` spec into `(node, data) -> value`."""
if isinstance(attr, str):
return lambda node, data: data[attr]
if callable(attr):
return attr
raise ValueError(f"`attribute` must be a string or callable, got {type(attr).__name__}.")
def _resolve_aggregator(agg: str | Callable) -> Callable:
"""Normalise an `aggregator` spec into `(np.ndarray) -> float`.
Empty-path semantics: `'sum'` returns 0.0 (the additive identity);
`'mean'` / `'min'` / `'max'` return NaN.
"""
if agg == "sum":
return lambda arr: float(arr.sum()) if arr.size else 0.0
if agg == "mean":
return lambda arr: float(arr.mean()) if arr.size else np.nan
if agg == "min":
return lambda arr: float(arr.min()) if arr.size else np.nan
if agg == "max":
return lambda arr: float(arr.max()) if arr.size else np.nan
if callable(agg):
return agg
raise ValueError(
f"Unknown aggregator {agg!r}; expected 'sum', 'mean', 'min', 'max', or a callable."
)
[docs]
def aggregate_along_paths(
paths: list[list],
graph: nx.Graph,
weight: str,
*,
edge_aggregations: Sequence[PathAggregation] = (),
node_aggregations: Sequence[NodeAggregation] = (),
dtype: np.dtype | type = np.float32,
) -> tuple[np.ndarray, dict[str, np.ndarray]]:
"""Walk realised paths and aggregate per-edge / per-node features along each.
Pure path walker — no routing involved. Use this directly
when you already have a list of paths (Strava traces, prebuilt routes,
calibration targets, etc.). `tiered_path_aggregate` is the wrapper that
routes shortest paths on a `TieredODPairs` and scatters results back
into per-tier `TieredODPairs` outputs.
For each path:
- `cost` = sum of `weight` along the path's edges
- each `PathAggregation` reduces per-edge attribute values
- each `NodeAggregation` reduces per-node attribute values
`paths` semantics:
- `[]` → unreachable: cost=`inf`, all aggs=`NaN`
- `[u]` → self-pair: cost=0, edge aggs follow empty-array
semantics, node aggs follow each spec's
`include_endpoints` setting
- `[u, v, ...]` → multi-node path; cost + aggs walked normally
Args:
paths: list of node-id sequences (lists). Node IDs must match
`graph` keys.
graph: networkx graph used for edge / node attribute lookup. For
MultiGraph / MultiDiGraph the min-`weight` parallel edge is
used (matches the router's choice).
weight: edge attribute name used as the per-edge cost.
edge_aggregations: list of `PathAggregation` specs (per-edge).
node_aggregations: list of `NodeAggregation` specs (per-node).
At least one of `edge_aggregations` / `node_aggregations` must
be non-empty. Names must be unique across both lists.
dtype: dtype of returned arrays (default `np.float32`).
Returns:
`(costs, aggregations_by_name)`:
- `costs`: ndarray of shape `(len(paths),)`. `inf` for
unreachable; `0.0` for self-pairs.
- `aggregations_by_name`: dict `{name -> ndarray}` with one
entry per spec across both lists. Unreachable destinations
are `NaN`.
"""
if not edge_aggregations and not node_aggregations:
raise ValueError(
"At least one of `edge_aggregations` / `node_aggregations` must be "
"non-empty. For cost-only routing, use `tiered_path_costs` instead."
)
edge_aggregations = list(edge_aggregations)
node_aggregations = list(node_aggregations)
names = [a.name for a in edge_aggregations] + [a.name for a in node_aggregations]
if len(set(names)) != len(names):
raise ValueError(f"Aggregation names must be unique across edge + node specs; got {names}.")
# Precompute per-edge / per-node arrays once, then walk paths using
# numpy fancy-indexing. Big win when there are many paths and/or
# per-edge attribute extraction is a Python callable (e.g. derived
# quietness scores) — the callable runs once per graph edge instead
# of once per (path, edge) traversal.
edge_cache = _precompute_edge_arrays(graph, weight, edge_aggregations, dtype)
node_cache = (
_precompute_node_arrays(graph, node_aggregations, dtype) if node_aggregations else None
)
return _walk_paths_with_arrays(
paths,
edge_cache,
node_cache,
edge_aggregations,
node_aggregations,
dtype,
)
# ---------------------------------------------------------------------------
# Path-walking with precomputed per-edge / per-node arrays
# ---------------------------------------------------------------------------
def _precompute_edge_arrays(
graph: nx.Graph,
weight: str,
edge_aggregations: Sequence["PathAggregation"],
dtype: np.dtype | type,
) -> tuple[dict, np.ndarray, list[np.ndarray]]:
"""Build `(u, v) -> edge_index` map + per-edge weight array + per-spec
feature arrays. For MultiGraph / MultiDiGraph parallels, collapses to
the min-`weight` edge per `(u, v)` — matches the router's choice in
`_graph_to_csr`.
For undirected graphs, `edge_index` carries BOTH orientations of every
edge pointing at the same array slot, mirroring `_graph_to_csr`'s
symmetric emission. Without this, a reconstructed path that traverses
an edge v→u (originally iterated as u→v) would miss the index lookup
in `_walk_paths_with_arrays` and the whole path would be marked invalid
(cost=inf, aggregations=NaN) even though it routes fine.
Each per-edge attribute extractor runs *once* here (per graph edge),
not once per (path, edge) traversal. That's the optimisation that lets
callable attributes (e.g. derived per-edge quietness scores) stop
dominating the inner loop.
"""
is_multi = isinstance(graph, (nx.MultiGraph, nx.MultiDiGraph))
is_directed = graph.is_directed()
edge_attr_fns = [_resolve_attribute(a.attribute) for a in edge_aggregations]
chosen: dict = {} # (u, v) -> (weight, data)
if is_multi:
for u, v, _k, data in graph.edges(keys=True, data=True):
w = float(data.get(weight, np.inf))
prev = chosen.get((u, v))
if prev is None or prev[0] > w:
chosen[(u, v)] = (w, data)
else:
for u, v, data in graph.edges(data=True):
chosen[(u, v)] = (float(data.get(weight, np.inf)), data)
n_edges = len(chosen)
edge_index: dict = {}
weight_arr = np.empty(n_edges, dtype=dtype)
feat_arrs = [np.empty(n_edges, dtype=dtype) for _ in edge_aggregations]
for i, ((u, v), (w, data)) in enumerate(chosen.items()):
edge_index[(u, v)] = i
if not is_directed:
edge_index[(v, u)] = i # both orientations → same edge slot
weight_arr[i] = w
for j, attr_fn in enumerate(edge_attr_fns):
feat_arrs[j][i] = float(attr_fn(u, v, data))
return edge_index, weight_arr, feat_arrs
def _precompute_node_arrays(
graph: nx.Graph,
node_aggregations: Sequence["NodeAggregation"],
dtype: np.dtype | type,
) -> tuple[dict, list[np.ndarray]]:
"""Build `node_id -> node_index` map + per-spec node-feature arrays.
Each per-node attribute extractor runs once here, not once per
(path, node) traversal."""
node_attr_fns = [_resolve_node_attribute(a.attribute) for a in node_aggregations]
node_ids = list(graph.nodes())
node_index = {n: i for i, n in enumerate(node_ids)}
n_nodes = len(node_ids)
node_attr_arrs = [np.empty(n_nodes, dtype=dtype) for _ in node_aggregations]
for i, n in enumerate(node_ids):
data = graph.nodes[n]
for j, attr_fn in enumerate(node_attr_fns):
node_attr_arrs[j][i] = float(attr_fn(n, data))
return node_index, node_attr_arrs
def _walk_paths_with_arrays(
paths: list[list],
edge_cache: tuple[dict, np.ndarray, list[np.ndarray]],
node_cache: tuple[dict, list[np.ndarray]] | None,
edge_aggregations: Sequence["PathAggregation"],
node_aggregations: Sequence["NodeAggregation"],
dtype: np.dtype | type,
) -> tuple[np.ndarray, dict[str, np.ndarray]]:
"""Walk paths using precomputed arrays. The inner loop is a single
numpy fancy-index per path per spec, not a Python iteration over edges.
See `_precompute_edge_arrays` for cache shape."""
edge_index, weight_arr, feat_arrs = edge_cache
edge_agg_fns = [_resolve_aggregator(a.aggregator) for a in edge_aggregations]
if node_cache is not None:
node_index, node_attr_arrs = node_cache
node_agg_fns = [_resolve_aggregator(a.aggregator) for a in node_aggregations]
node_include_endpoints = [a.include_endpoints for a in node_aggregations]
else:
node_index = {}
node_attr_arrs = []
node_agg_fns = []
node_include_endpoints = []
n = len(paths)
n_edge = len(edge_aggregations)
n_node = len(node_aggregations)
costs = np.full(n, np.inf, dtype=dtype)
edge_out = [np.full(n, np.nan, dtype=dtype) for _ in range(n_edge)]
node_out = [np.full(n, np.nan, dtype=dtype) for _ in range(n_node)]
empty = np.empty(0, dtype=dtype)
for i, path in enumerate(paths):
if not path:
continue # unreachable: cost=inf, aggs=NaN (both preallocated)
n_edges = len(path) - 1
if n_edges == 0:
# Self-pair: cost=0, edge aggs follow empty-array semantics.
costs[i] = 0.0
for j, agg_fn in enumerate(edge_agg_fns):
edge_out[j][i] = float(agg_fn(empty))
else:
# Resolve every (u, v) pair to its edge index; bail to
# "unreachable" if any edge is missing from the graph
# (defensive — paths from the router should always be valid).
edge_idx = np.empty(n_edges, dtype=np.int64)
valid = True
for k in range(n_edges):
e = edge_index.get((path[k], path[k + 1]))
if e is None:
valid = False
break
edge_idx[k] = e
if not valid:
continue
sub_weights = weight_arr[edge_idx]
costs[i] = float(sub_weights.sum())
for j, agg_fn in enumerate(edge_agg_fns):
edge_out[j][i] = float(agg_fn(feat_arrs[j][edge_idx]))
if n_node:
for j in range(n_node):
nodes = path if node_include_endpoints[j] else path[1:-1]
if nodes:
node_idx = np.fromiter(
(node_index[n_] for n_ in nodes),
dtype=np.int64,
count=len(nodes),
)
node_vals = node_attr_arrs[j][node_idx]
else:
node_vals = empty
node_out[j][i] = float(node_agg_fns[j](node_vals))
aggs: dict[str, np.ndarray] = {}
for edge_spec, arr in zip(edge_aggregations, edge_out):
aggs[edge_spec.name] = arr
for node_spec, arr in zip(node_aggregations, node_out):
aggs[node_spec.name] = arr
return costs, aggs
[docs]
def tiered_path_aggregate(
pairs: TieredODPairs,
graph: nx.Graph,
weight: str,
*,
edge_aggregations: Sequence[PathAggregation] = (),
node_aggregations: Sequence[NodeAggregation] = (),
mask: TieredODPairs | None = None,
cutoff: float | None = None,
dtype: np.dtype | type = np.float32,
) -> tuple[TieredODNodePairs, dict[str, TieredODNodePairs]]:
"""Route shortest paths and aggregate per-edge / per-node features along each.
Wraps `aggregate_along_paths` with routing on every tier of `pairs`.
Memory cost matches `tiered_path_costs` for the cost component —
paths are processed per-origin and discarded.
For the cost-only case (no aggregations needed), use `tiered_path_costs`
directly: it can skip path retrieval (more expensive than distance
retrieval) and is faster.
Args:
pairs, graph, weight, mask, cutoff, dtype: as in `tiered_path_costs`.
Paths are retrieved via `scipy.sparse.csgraph.dijkstra(
return_predecessors=True)` and reconstructed by walking the
predecessor chain back from each target to the origin.
edge_aggregations: list of `PathAggregation` specs (per-edge).
node_aggregations: list of `NodeAggregation` specs (per-node).
At least one of the two must be non-empty. Names must be unique
across both lists.
Returns:
`(costs, aggregations_by_name)`:
- `costs`: `TieredODPairs` of routing costs (sum of `weight`
along the realised path). Same shape and conventions as
`tiered_path_costs`. Unreachable / masked-out / beyond-cutoff
destinations are `np.inf`.
- `aggregations_by_name`: `dict[name -> TieredODPairs]`. One
entry per spec (edge + node), keyed by spec name. Unreachable
/ masked-out / beyond-cutoff destinations are `np.nan` (not
`inf`, since aggregations may be signed or already use `inf`
semantics).
For OSMnx-style MultiDiGraphs with multiple parallel edges between the
same `(u, v)` pair, the edge with the lowest `weight` is used for both
cost computation and attribute extraction (matching the router's choice).
For self-pairs (origin == destination, path length 0): cost is 0.0,
edge aggregations follow each aggregator's empty-array semantics
(`'sum'` → 0.0; `'mean'`/`'min'`/`'max'` → NaN), node aggregations
depend on each spec's `include_endpoints` setting.
"""
if not edge_aggregations and not node_aggregations:
raise ValueError(
"At least one of `edge_aggregations` / `node_aggregations` must be "
"non-empty. For cost-only routing, use `tiered_path_costs` instead."
)
edge_aggregations = list(edge_aggregations)
node_aggregations = list(node_aggregations)
names = [a.name for a in edge_aggregations] + [a.name for a in node_aggregations]
if len(set(names)) != len(names):
raise ValueError(f"Aggregation names must be unique across edge + node specs; got {names}.")
# Per-origin path retrieval via scipy dijkstra. Path reconstruction
# walks the predecessor chain from each target back to the origin.
import scipy.sparse.csgraph as csg
csr, nx_to_seq, seq_to_nx = _graph_to_csr(graph, weight)
zero_edge = csr.nnz == 0
limit = cutoff if cutoff is not None else np.inf
# Precompute per-edge / per-node feature arrays ONCE — they're invariant
# across origins. Per-origin path walking then becomes numpy
# fancy-indexing instead of a Python loop over edges. For path-first
# workloads (utility, logsum, road stress) this is the dominant
# speed-up vs the pre-vectorisation per-origin path walker.
edge_cache = _precompute_edge_arrays(graph, weight, edge_aggregations, dtype)
node_cache = (
_precompute_node_arrays(graph, node_aggregations, dtype) if node_aggregations else None
)
def _paths(orig, sub_dests):
if zero_edge:
return [[orig] if d == orig else [] for d in sub_dests]
origin_seq = nx_to_seq[orig]
dist, pred = csg.dijkstra(csr, indices=[origin_seq], limit=limit, return_predecessors=True)
paths = []
for d in sub_dests:
target_seq = nx_to_seq[d]
if not np.isfinite(dist[0, target_seq]):
paths.append([]) # unreachable or beyond cutoff
else:
paths.append(_walk_predecessors_to_path(pred[0], origin_seq, target_seq, seq_to_nx))
return paths
def _per_origin(orig, dests, dest_mask):
n = len(dests)
cost_arr = np.full(n, np.inf, dtype=dtype)
agg_arrs = {name: np.full(n, np.nan, dtype=dtype) for name in names}
if n == 0:
return cost_arr, agg_arrs
if dest_mask is None:
active_idx = np.arange(n)
active_dests = dests
else:
active_idx = np.where(dest_mask)[0]
if len(active_idx) == 0:
return cost_arr, agg_arrs
active_dests = dests[active_idx]
paths = _paths(orig, active_dests)
sub_costs, sub_aggs = _walk_paths_with_arrays(
paths,
edge_cache,
node_cache,
edge_aggregations,
node_aggregations,
dtype,
)
cost_arr[active_idx] = sub_costs
for name in names:
agg_arrs[name][active_idx] = sub_aggs[name]
return cost_arr, agg_arrs
logging.info(
f"tiered_path_aggregate: routing (scipy, cutoff={'none' if cutoff is None else cutoff})..."
)
def _process(
tier_name: str, tier: dict | None, mask_tier: dict | None
) -> tuple[dict, dict[str, dict]] | None:
if tier is None:
return None
n = len(tier)
log_every = max(1, n // 10)
cost_out: dict = {}
agg_outs: dict[str, dict] = {name: {} for name in names}
for i, (orig, dests) in enumerate(tier.items(), start=1):
dest_mask = mask_tier.get(orig) if mask_tier is not None else None
cost_arr, agg_arrs = _per_origin(orig, dests, dest_mask)
cost_out[orig] = cost_arr
for name in names:
agg_outs[name][orig] = agg_arrs[name]
if i % log_every == 0 or i == n:
logging.info(f" {tier_name}: {i:,} of {n:,} origins routed")
return cost_out, agg_outs
cells_mask = mask.cells_to_cells if mask is not None else None
c2z_mask = mask.cells_to_zones if mask is not None else None
zones_mask = mask.zones_to_zones if mask is not None else None
cells_res = _process("cells_to_cells", pairs.cells_to_cells, cells_mask)
c2z_res = _process("cells_to_zones", pairs.cells_to_zones, c2z_mask)
zones_res = _process("zones_to_zones", pairs.zones_to_zones, zones_mask)
costs = TieredODNodePairs(
cells_to_cells=cells_res[0] if cells_res is not None else {},
cells_to_zones=c2z_res[0] if c2z_res is not None else None,
zones_to_zones=zones_res[0] if zones_res is not None else None,
)
aggregations_by_name = {
name: TieredODNodePairs(
cells_to_cells=cells_res[1][name] if cells_res is not None else {},
cells_to_zones=c2z_res[1][name] if c2z_res is not None else None,
zones_to_zones=zones_res[1][name] if zones_res is not None else None,
)
for name in names
}
return costs, aggregations_by_name
[docs]
def add_trip_overhead(
costs: TieredODPairs,
pairs: TieredODPairs,
cell_info: pd.DataFrame,
*,
zone_info: pd.DataFrame | None = None,
origin_overhead: Callable | None = None,
dest_overhead: Callable | None = None,
verify_finite: bool = True,
) -> TieredODPairs:
"""Add per-trip origin and/or destination overhead to each OD pair's cost.
For each (origin_node, dest_node) pair in `costs` (paired position-wise with
`pairs`), the returned cost is::
new_cost = old_cost + origin_overhead(info_o.loc[orig])
+ dest_overhead (info_d.loc[dest])
where the info dataframe depends on the tier and the endpoint side::
cells_to_cells: info_o = cell_info, info_d = cell_info
cells_to_zones: info_o = cell_info, info_d = zone_info
zones_to_zones: info_o = zone_info, info_d = zone_info
Each info DataFrame is one row per network node at that tier, indexed by
node ID. It can mix native node-level attributes (e.g. local density,
intersection count) with aggregated unit-level attributes (e.g. distance
from cell centroid to nearest network node) — the function doesn't care
which is which, just looks up by node ID and hands the row(s) to the
callback.
When multiple units share a node (typical for cells), aggregate upstream::
cell_info = (cells.groupby('node_id_nw').agg({
'dist_to_node': 'mean',
'population': 'sum',
}).join(nodes)) # node-level attrs joined in
Each callback receives a single `info` argument:
- For the **origin** side: a 1-D `pd.Series` (the row for that single
origin). The callback returns a scalar.
- For the **destination** side: a `pd.DataFrame` (one row per dest,
ordered the same as the dest array). The callback returns a 1-D
Series / ndarray of the same length. Pandas column access
(`info['col']`) yields a scalar in the Series case and a Series in the
DataFrame case, so the same callable typically works for both modes
without branching.
`origin_overhead` and `dest_overhead` are independently optional — pass
`None` to skip that side.
Args:
pairs: TieredODPairs of destination IDs (typically from `od_pairs.get_pairs`).
costs: TieredODPairs of cost arrays to augment; same shape as `pairs`.
cell_info: per-cell-node info DataFrame, indexed by the cell-tier node ID.
zone_info: per-zone-node info DataFrame, indexed by the zone-tier node ID.
Required iff `cells_to_zones` or `zones_to_zones` is present in `costs`.
origin_overhead: callable, see above. None to skip the origin contribution.
dest_overhead: callable, see above. None to skip the dest contribution.
verify_finite: if True, a ValueError is raised when output is not finite (NaN or Inf).
Returns:
New `TieredODPairs` of cost arrays (same shape as `costs`) with the
overhead added. `costs` is not mutated. Unreachable / masked-out entries
(np.inf in the input) stay infinite (inf + anything = inf).
Raises:
ValueError if overhead result is not finite and verify_finite is True.
"""
if origin_overhead is None and dest_overhead is None:
return costs # nothing to do
# (tier_attr) -> (origin_info_df, dest_info_df) lookup.
tier_infos: dict[str, tuple[pd.DataFrame | None, pd.DataFrame | None]] = {
"cells_to_cells": (cell_info, cell_info),
"cells_to_zones": (cell_info, zone_info),
"zones_to_zones": (zone_info, zone_info),
}
def _process(tier_attr: str) -> dict | None:
cost_tier = getattr(costs, tier_attr)
if cost_tier is None:
return None
pair_tier = getattr(pairs, tier_attr)
if pair_tier is None:
raise DataError(
f"`pairs.{tier_attr}` is None but `costs.{tier_attr}` is set — "
f"can't look up destination IDs to apply overhead."
)
info_o, info_d = tier_infos[tier_attr]
if origin_overhead is not None and info_o is None:
raise ValueError(
f"tier `{tier_attr}` has origin overhead requested but the "
f"matching info DataFrame is None."
)
if dest_overhead is not None and info_d is None:
raise ValueError(
f"tier `{tier_attr}` has dest overhead requested but the "
f"matching info DataFrame is None."
)
out: dict = {}
for orig, cost_arr in cost_tier.items():
new_c = np.asarray(cost_arr).copy()
if origin_overhead is not None:
# Validated non-None above when origin_overhead is set.
assert info_o is not None
# Single origin -> Series. Callback returns scalar.
oh_o = float(origin_overhead(info_o.loc[orig]))
if verify_finite and not np.isfinite(oh_o):
raise ValueError(f"Origin overhead {orig} is not finite.")
new_c = new_c + oh_o
if dest_overhead is not None:
assert info_d is not None
dests = pair_tier[orig]
if len(dests) > 0:
# Many dests -> DataFrame. Callback returns Series/array.
oh_d = np.asarray(dest_overhead(info_d.loc[dests]), dtype=np.float64)
if verify_finite:
not_finite = (~np.isfinite(oh_d)).sum()
if not_finite > 0:
raise ValueError(
f"Destination overhead for origin {orig} contains "
f"{not_finite:,} non-finite numbers."
)
new_c = new_c + oh_d
out[orig] = new_c
return out
return type(costs)(
cells_to_cells=_process("cells_to_cells"),
cells_to_zones=_process("cells_to_zones"),
zones_to_zones=_process("zones_to_zones"),
)
[docs]
def floor_intrazonal_costs(
costs: TieredODPairs,
min_cost: float | dict | pd.Series,
) -> TieredODPairs:
"""Floor cell-tier costs at `min_cost` — applied uniformly to every entry.
Routing on a graph returns 0 for the trivial origin-to-origin path. That's
fine for cumulative-opportunity output (cost 0 falls in the smallest bin),
but degenerate for decay-based metrics like gravity: `exp(-β·0) = 1` puts
the maximum possible decay weight on the cell itself, and `c^(-β)` at c = 0
diverges outright.
The floor is applied uniformly to every cell-tier entry, not just self-
pairs. Setting only the self-pair to a non-zero floor would create an
inconsistency: a cell would route to itself at, say, 120 s while a
different (very close) cell could route at 60 s, implying you can travel
further faster than you can travel zero distance. The min-cost
interpretation is the physical floor on per-trip cost — no trip can take
less than `min_cost`, regardless of distance — and it handles the
intrazonal-cost-0 case as a side effect.
Non-finite entries (`np.inf` for unreachable destinations, `np.nan` for
missing observations) are passed through unchanged — the floor is applied
only to finite costs. Flooring `inf` would erase reachability information
(an unreachable destination would become reachable in `min_cost` seconds),
and flooring `nan` would silently invent data; both behaviours would be
incorrect.
Only `cells_to_cells` is modified — `cells_to_zones` and `zones_to_zones`
are routed between distinct cell-zone / zone-zone pairs and don't have the
same zero-self-cost degeneracy. Tiers that are `None` pass through.
Args:
costs: TieredODPairs of cost arrays.
min_cost: floor value. Either a scalar `float` (same floor for every
origin), a `dict[origin_node -> float]` (per-origin floor;
origins absent from the dict get no floor and their costs pass
through unchanged), or a `pd.Series` indexed by origin_node
(same semantics as the dict form).
Returns:
New `TieredODPairs` with `cell_tier_cost = max(cell_tier_cost,
min_cost)` applied per origin to finite entries; non-finite entries
(`inf`, `nan`) pass through unchanged.
"""
if isinstance(min_cost, pd.Series):
cost_lookup: dict = min_cost.to_dict()
scalar_floor: float | None = None
elif isinstance(min_cost, dict):
cost_lookup = dict(min_cost)
scalar_floor = None
else:
cost_lookup = {}
scalar_floor = float(min_cost)
new_cells_to_cells: dict = {}
if costs.cells_to_cells is None:
raise ValueError(
"`costs.cells_to_cells` is None; cell-tier is required for this transform."
)
for origin, cost_arr in costs.cells_to_cells.items():
# Preserve input dtype (typically FP32).
new_arr = np.asarray(cost_arr).copy()
floor = scalar_floor if scalar_floor is not None else cost_lookup.get(origin)
if floor is None:
new_cells_to_cells[origin] = new_arr
continue
# Apply max() only to finite entries; inf/nan are left as-is.
finite_mask = np.isfinite(new_arr)
new_arr[finite_mask] = np.maximum(new_arr[finite_mask], float(floor))
new_cells_to_cells[origin] = new_arr
return type(costs)(
cells_to_cells=new_cells_to_cells,
cells_to_zones=costs.cells_to_zones,
zones_to_zones=costs.zones_to_zones,
)