"""
Accessibility metrics computed against tiered OD tables.
Inputs are always:
1. A cost `TieredODPairs` (subclass) — see "Key space" below.
2. One or more pre-aggregated property weights, position-aligned with the
cost ODM at each tier — a `dict[name -> TieredODPairs]`.
3. A `cell_to_zone` mapping that gives each cell-tier origin its parent
zone-tier key (for stitching the `zones_to_zones` far tier — the
`cells_to_cells` and `cells_to_zones` tiers are already cell-keyed
and don't need the indirection).
The per-origin stitching of `cells_to_cells / cells_to_zones / zones_to_zones`
tiers is done once, then reused across every (parameter × property) combination
— so adding more bins, decays, or k values is essentially free relative to a
single-parameter call.
**Key space — node-keyed vs geo-keyed.**
Two valid input shapes, distinguished by the costs/weights subclass:
- **`TieredODNodePairs`** (node-keyed): origins and dests are network node IDs.
Each origin row in the output is a network node. Per-cell origin overhead
cannot be applied here — the function returns per-NODE accessibilities. Build
the `cell_to_zone` map from `od_pairs.build_cell_to_zone_node_map(cells,
zones, node_column)` (cell-tier node → zone-tier node). Weights from
`od_pairs.dest_values` (per-node sums).
- **`TieredODGeoPairs`** (geo-keyed): origins and dests are geo-unit IDs
(cells_to_cells → cell_id; zones_to_zones → zone_id; etc.). Each origin row
in the output is a cell. Per-cell origin overhead should be baked into the
ODM *before* calling this function via `overhead.add_origin_cell_overhead`.
Build the `cell_to_zone` map directly: `cells['zone_id'].to_dict()`. Weights
from `od_pairs.dest_values_geo` (per-cell direct lookup, no implicit summing).
The same three functions (`cumulative_opportunities`, `gravity`, `nearest_k`) accept
either shape; output index name (`'node'` vs `'cell'`) reflects the input.
For cross-modal accessibility ("destinations within X min by ANY mode",
cross-modal logsum), combine per-mode `TieredODGeoPairs` cost ODMs with
`od_pairs.aggregate_across_modes` before passing here. Node-keyed cross-modal
is not supported — different modes live on different graphs.
For gravity in particular, the intrazonal-cost issue (cell-tier self-pairs
route at cost 0, which sends exp(0)=1 to maximum weight) is addressed by
calling `routing.floor_intrazonal_costs` on the cost ODM before passing here.
Provides:
- `Bin` namedtuple — half-open `[lo, hi)` cost bin with a name.
- `Decay` namedtuple — named callable for gravity-style cost decay.
- `exp_decay`, `power_decay` — convenience constructors for common families.
- `cumulative_opportunities` — sum each property's weights over destinations within each
cost bin (cumulative-opportunity accessibility).
- `gravity` — sum each property's weights weighted by f(cost), over all
destinations, for one or more decay specs.
- `nearest_k` — mean cost (or cost-at-k) to the k nearest weight-units,
for one or more k values. Lower is better; canonical "mean travel time
to the nearest k opportunities" formulation.
"""
from typing import Callable, NamedTuple
import numpy as np
import pandas as pd
from aperta.od_pairs import TieredODGeoPairs, TieredODPairs
def _require_cell_tier(costs: TieredODPairs) -> dict:
"""Narrow `costs.cells_to_cells` to a concrete `dict`, raising if None.
Accessibility metrics require the cell-tier to be populated (origins are
the cell-tier dict keys); raise a clear error if it isn't.
"""
if costs.cells_to_cells is None:
raise ValueError("`costs.cells_to_cells` is None; cell-tier is required.")
return costs.cells_to_cells
[docs]
class Bin(NamedTuple):
"""Half-open cost bin: `lo <= cost < hi`. `name` labels the output column.
Bins should be mutually exclusive (the function does *not* check); a
destination falling in multiple bins would be counted multiple times.
"""
name: str
lo: float
hi: float
[docs]
class Decay(NamedTuple):
"""Named cost-decay specification for `gravity`.
`fn` is a vectorised callable mapping a cost array to a weight array; `name`
labels the corresponding output column. Use `exp_decay` / `power_decay` for
the common families, or construct directly with any user-defined callable.
Multiple `Decay` specs can be passed to a single `gravity` call; the per-OD
stitching is then amortised across all of them.
"""
name: str
fn: Callable[[np.ndarray], np.ndarray]
[docs]
def exp_decay(name: str, beta: float) -> Decay:
"""Exponential decay: `f(c) = exp(-beta * c)`. `beta` > 0 for sensible decay."""
return Decay(name, lambda c: np.exp(-beta * c))
[docs]
def power_decay(name: str, beta: float) -> Decay:
"""Power-law decay: `f(c) = c ** (-beta)`. `beta` > 0; c = 0 yields `inf`,
so callers should apply `routing.add_intrazonal_cost` first to replace
self-pair cost-0 entries with a finite intrazonal cost.
"""
return Decay(name, lambda c: np.power(c, -beta))
def _stitched_for(
origin: int | str, costs: TieredODPairs, cell_to_zone_node: dict
) -> tuple[np.ndarray, slice, slice, slice]:
"""Per-origin stitched cost array + per-tier slices into it.
The slices let callers stitch *other* per-tier arrays (e.g. each property's
weights) into the same position-aligned 1-D layout without re-deriving the
boundaries.
Three tiers in order: cells_to_cells (cell-keyed, direct), cells_to_zones
(cell-keyed, direct), zones_to_zones (zone-keyed, needs origin's parent
zone). The returned slices have the same order.
"""
cell_arr = _require_cell_tier(costs)[origin]
c2z_arr = costs.cells_to_zones.get(origin) if costs.cells_to_zones is not None else None
zone_node = cell_to_zone_node.get(origin)
zone_arr = costs.zones_to_zones.get(zone_node) if costs.zones_to_zones is not None else None
parts = [cell_arr]
if c2z_arr is not None:
parts.append(c2z_arr)
if zone_arr is not None:
parts.append(zone_arr)
stitched = np.concatenate(parts) if len(parts) > 1 else cell_arr
n_cell = len(cell_arr)
n_c2z = len(c2z_arr) if c2z_arr is not None else 0
n_zone = len(zone_arr) if zone_arr is not None else 0
return (
stitched,
slice(0, n_cell),
slice(n_cell, n_cell + n_c2z),
slice(n_cell + n_c2z, n_cell + n_c2z + n_zone),
)
def _stitched_weights(
origin: int | str,
zone_node: int | str | None,
weights: TieredODPairs,
n_cell: int,
n_c2z: int,
n_zone: int,
dtype: np.dtype | type = np.float32,
) -> np.ndarray:
"""Stitch a single property's three-tier value arrays for one origin.
Tiers that are `None` (or where the origin / zone has no entry) contribute
zeros so the result is positionally aligned with the cost stitching from
`_stitched_for`.
"""
total = n_cell + n_c2z + n_zone
out = np.zeros(total, dtype=dtype)
cell_w = weights.cells_to_cells.get(origin) if weights.cells_to_cells is not None else None
if cell_w is not None and n_cell:
out[:n_cell] = cell_w
if n_c2z and weights.cells_to_zones is not None:
c2z_w = weights.cells_to_zones.get(origin)
if c2z_w is not None:
out[n_cell : n_cell + n_c2z] = c2z_w
if n_zone and weights.zones_to_zones is not None:
zw = weights.zones_to_zones.get(zone_node)
if zw is not None:
out[n_cell + n_c2z :] = zw
return out
def _origin_index_name(costs: TieredODPairs) -> str:
"""Output-DataFrame index name based on the input ODM's key space."""
return "cell" if isinstance(costs, TieredODGeoPairs) else "node"
def _sniff_dtype(costs: TieredODPairs) -> np.dtype:
"""Return the dtype of the first non-empty per-tier cost array.
Used to make accessibility output dtype follow the input costs dtype
(FP32 by default, FP64 if the caller opted in upstream) without an
explicit kwarg on every accessibility function.
"""
for tier in (costs.cells_to_cells, costs.cells_to_zones, costs.zones_to_zones):
if tier is None:
continue
for arr in tier.values():
return np.asarray(arr).dtype
return np.dtype(np.float32)
[docs]
def cumulative_opportunities(
costs: TieredODPairs,
weights: dict[str, TieredODPairs],
cell_to_zone: dict,
bins: list[Bin],
) -> pd.DataFrame:
"""Sum each property's weights over destinations whose cost falls in each bin.
Args:
costs: tiered travel costs. Subclass determines output indexing:
`TieredODNodePairs` → per-node output; `TieredODGeoPairs` → per-cell
output. Non-finite entries (`np.inf`, `np.nan`) won't match any
finite bin and are silently dropped.
weights: `{property_name -> TieredODPairs}`, position-aligned with
`costs` per tier. Must share the costs' key space (node-keyed
weights for node-keyed costs; geo-keyed for geo-keyed). Build via
`od_pairs.dest_values` (node-keyed) or `od_pairs.dest_values_geo`
(geo-keyed). Missing origins / tiers contribute zeros, not errors.
cell_to_zone: `{cell_tier_key -> zone_tier_key}` map for tier
stitching. Build from `od_pairs.build_cell_to_zone_node_map`
(node-keyed: cell_node → zone_node) or directly from
`cells['zone_id'].to_dict()` (geo-keyed: cell_id → zone_id).
bins: half-open `[lo, hi)` cost bins. Should be mutually exclusive
(not checked).
Returns:
DataFrame indexed by origin key with `(bin_name, property_name)`
MultiIndex on columns. Order: bins outer, properties inner. Dtype
follows the input `costs` ODM (FP32 by default, FP64 if the caller
opted in upstream).
Per-cell overhead: for `TieredODGeoPairs` inputs, bake per-cell origin
overhead into the cost ODM upfront via `overhead.add_origin_cell_overhead`.
"""
prop_names = list(weights.keys())
origins = list(_require_cell_tier(costs).keys())
columns = pd.MultiIndex.from_product(
[[b.name for b in bins], prop_names], names=["bin", "property"]
)
dtype = _sniff_dtype(costs)
out = np.zeros((len(origins), len(bins) * len(prop_names)), dtype=dtype)
for i, origin in enumerate(origins):
stitched_cost, cell_sl, c2z_sl, zone_sl = _stitched_for(origin, costs, cell_to_zone)
n_cell = cell_sl.stop - cell_sl.start
n_c2z = c2z_sl.stop - c2z_sl.start
n_zone = zone_sl.stop - zone_sl.start
zone_key = cell_to_zone.get(origin)
prop_weights = np.empty((len(prop_names), len(stitched_cost)), dtype=dtype)
for p, name in enumerate(prop_names):
prop_weights[p] = _stitched_weights(
origin, zone_key, weights[name], n_cell, n_c2z, n_zone, dtype=dtype
)
for b, bin_ in enumerate(bins):
mask = (stitched_cost >= bin_.lo) & (stitched_cost < bin_.hi)
if not mask.any():
continue
out[i, b * len(prop_names) : (b + 1) * len(prop_names)] = prop_weights[:, mask].sum(
axis=1
)
return pd.DataFrame(
out,
index=pd.Index(origins, name=_origin_index_name(costs)),
columns=columns,
)
[docs]
def gravity(
costs: TieredODPairs,
weights: dict[str, TieredODPairs],
cell_to_zone: dict,
decays: Decay | list[Decay],
) -> pd.DataFrame:
"""Gravity-based accessibility: sum each property's weights, weighted by
`f(cost)`, over all destinations — for one or more decay specs in a single
call.
For each origin `i`, each property `w`, and each decay spec `f`::
A_i^{f,w} = Σ_j w_j · f(cost_ij)
Multiple decay specs share the per-OD stitching, so calling with a list of
`Decay` specs is much cheaper than calling once per spec — useful for sensitivity
analyses across decay-coefficient ranges.
For utility-based accessibility, pass the per-OD utility values as the cost
ODM and an exponential decay with the desired scale (β=1 gives the
standard `Σ_j w_j · exp(-U_ij)` form, on which logsum accessibility is
`ln(...)` of the same sum).
Args:
costs, weights, cell_to_zone: see `cumulative_opportunities`.
decays: a single `Decay` or list of `Decay` specs. Output columns are
MultiIndex `(decay_name, property_name)` with decay names outer.
Returns:
DataFrame indexed by origin key with MultiIndex columns
`(decay, property)`. Dtype follows the input `costs` ODM
(FP32 by default, FP64 if the caller opted in upstream).
"""
if isinstance(decays, Decay):
decays = [decays]
if not decays:
raise ValueError("`decays` must be a non-empty list of `Decay` specs.")
prop_names = list(weights.keys())
decay_names = [d.name for d in decays]
origins = list(_require_cell_tier(costs).keys())
columns = pd.MultiIndex.from_product([decay_names, prop_names], names=["decay", "property"])
n_props = len(prop_names)
dtype = _sniff_dtype(costs)
out = np.zeros((len(origins), len(decays) * n_props), dtype=dtype)
for i, origin in enumerate(origins):
stitched_cost, cell_sl, c2z_sl, zone_sl = _stitched_for(origin, costs, cell_to_zone)
n_cell = cell_sl.stop - cell_sl.start
n_c2z = c2z_sl.stop - c2z_sl.start
n_zone = zone_sl.stop - zone_sl.start
zone_key = cell_to_zone.get(origin)
prop_weights = np.empty((n_props, len(stitched_cost)), dtype=dtype)
for p, name in enumerate(prop_names):
prop_weights[p] = _stitched_weights(
origin, zone_key, weights[name], n_cell, n_c2z, n_zone, dtype=dtype
)
finite_mask = np.isfinite(stitched_cost)
if not finite_mask.any():
continue
cost_finite = stitched_cost[finite_mask]
w_finite = prop_weights[:, finite_mask]
for d, decay in enumerate(decays):
decayed = decay.fn(cost_finite)
# Defensive: a decay that produces non-finite at finite cost
# (e.g. power with c=0 if intrazonal cost wasn't applied) should
# not silently corrupt the sum. Drop those entries.
if not np.all(np.isfinite(decayed)):
decay_finite = np.isfinite(decayed)
decayed = decayed[decay_finite]
w_used = w_finite[:, decay_finite]
else:
w_used = w_finite
out[i, d * n_props : (d + 1) * n_props] = (w_used * decayed).sum(axis=1)
return pd.DataFrame(
out,
index=pd.Index(origins, name=_origin_index_name(costs)),
columns=columns,
)
[docs]
def nearest_k(
costs: TieredODPairs,
weights: dict[str, TieredODPairs],
cell_to_zone: dict,
ks: int | float | list[int | float],
*,
aggregator: str = "cost_mean",
) -> pd.DataFrame:
"""Nearest-`k` accessibility: cost (mean, or at-`k`) over the `k` nearest
weight-units.
Each destination is treated as carrying `weight_j` opportunities at cost
`cost_ij`. Destinations are sorted ascending by cost; the first `k`
weight-units (with fractional contribution at the boundary) define the
"nearest `k` opportunities". The aggregator decides what to return:
- **`'cost_mean'`** (default): the mean cost over the first `k` weight-units,
``A_i^{k,w} = (Σ cost_j · weight_j contributed, fractional at the boundary) / k``.
The canonical "mean travel cost to the nearest `k` opportunities"
formulation — directly comparable across `k` values (k=3 and k=5 are on
the same scale, in cost units).
- **`'cost_at_k'`**: the cost of the `k`-th weight-unit — i.e., the cost
at which the cumulative weight first reaches `k`. Answers
"how far is the `k`-th nearest opportunity?".
Both aggregators return a value in the same units as `costs`, with **lower
values = better accessibility**. NaN is returned where the total available
(finite-cost, positive-weight) opportunities at an origin is less than `k`
— i.e., the `k`-th opportunity is unreachable in finite cost.
Multiple `k` values share the per-OD sort, so a multi-`k` call is much
cheaper than `k` individual calls.
Args:
costs, weights, cell_to_zone: see `cumulative_opportunities`.
ks: a single `k` or list of `k`s; positive values, integer or float.
Output columns are MultiIndex `(k, property_name)` with `k` outer.
aggregator: `'cost_mean'` (default) or `'cost_at_k'`.
Returns:
DataFrame indexed by origin key with MultiIndex columns `(k, property)`.
Dtype follows the input `costs` ODM (FP32 by default, FP64 if the
caller opted in upstream). NaN where the `k`-th opportunity is
unreachable.
"""
if aggregator not in ("cost_mean", "cost_at_k"):
raise ValueError(f"Unknown aggregator {aggregator!r}; expected 'cost_mean' or 'cost_at_k'.")
if isinstance(ks, (int, float)):
ks = [ks]
if not ks:
raise ValueError("`ks` must be a non-empty list of positive values.")
if any(k <= 0 for k in ks):
raise ValueError(f"All `k` values must be > 0; got {ks!r}.")
prop_names = list(weights.keys())
origins = list(_require_cell_tier(costs).keys())
columns = pd.MultiIndex.from_product([ks, prop_names], names=["k", "property"])
n_props = len(prop_names)
dtype = _sniff_dtype(costs)
out = np.full((len(origins), len(ks) * n_props), np.nan, dtype=dtype)
ks_arr = np.asarray(ks, dtype=dtype)
for i, origin in enumerate(origins):
stitched_cost, cell_sl, c2z_sl, zone_sl = _stitched_for(origin, costs, cell_to_zone)
n_cell = cell_sl.stop - cell_sl.start
n_c2z = c2z_sl.stop - c2z_sl.start
n_zone = zone_sl.stop - zone_sl.start
zone_key = cell_to_zone.get(origin)
prop_weights = np.empty((n_props, len(stitched_cost)), dtype=dtype)
for p, name in enumerate(prop_names):
prop_weights[p] = _stitched_weights(
origin, zone_key, weights[name], n_cell, n_c2z, n_zone, dtype=dtype
)
finite_mask = np.isfinite(stitched_cost)
if not finite_mask.any():
continue
costs_f = stitched_cost[finite_mask]
sort_idx = np.argsort(costs_f)
sorted_costs = costs_f[sort_idx]
for p in range(n_props):
w = prop_weights[p][finite_mask][sort_idx]
# Non-finite or non-positive weights contribute nothing.
w = np.where(np.isfinite(w) & (w > 0), w, 0.0)
cum_w = np.cumsum(w)
if cum_w.size == 0 or cum_w[-1] == 0.0:
continue
cum_cw = np.cumsum(sorted_costs * w) if aggregator == "cost_mean" else None
total = cum_w[-1]
for ki, k in enumerate(ks_arr):
if total < k:
continue
idx = int(np.searchsorted(cum_w, k, side="left"))
if aggregator == "cost_at_k":
out[i, ki * n_props + p] = sorted_costs[idx]
else: # cost_mean
assert cum_cw is not None # guaranteed by aggregator branch above
if idx == 0:
out[i, ki * n_props + p] = sorted_costs[0]
else:
full_cw = cum_cw[idx - 1]
partial = sorted_costs[idx] * (k - cum_w[idx - 1])
out[i, ki * n_props + p] = (full_cw + partial) / k
return pd.DataFrame(
out,
index=pd.Index(origins, name=_origin_index_name(costs)),
columns=columns,
)
[docs]
def flatten_index(df: pd.DataFrame) -> pd.DataFrame:
"""Collapse a 2-level column MultiIndex into single strings joined by `__`.
Convenience for the accessibility outputs (which carry `(bin, property)`
or `(decay, property)` MultiIndex columns) when downstream code prefers
flat single-string column names (e.g. for CSV export). Mutates in place
and also returns `df`.
"""
df.columns = ["__".join(col).strip() for col in df.columns.values]
return df