"""Graph-construction and -annotation helpers for transport networks (OSM-flavoured).
Aperta operates on `networkx.Graph` (and its multi/directed variants) as its
canonical graph type. This module supplies the operations that take a raw
(typically OSMnx) graph and produce a richly-annotated one ready for
downstream snap / route work:
- **Intersection consolidation**: `consolidate_intersections` wraps
`osmnx.consolidate_intersections` but preserves intersection-attribute
nodes (traffic signals, stop signs, roundabouts) that the OSMnx default
drops, which matters for any route-level analysis that counts those
features (Section §3.3 of the toolkit paper). Calls
`flag_node_intersection_topology` and `flag_node_osm_classification` as
post-processing.
- **Highway-class machinery**: `OSM_HIGHWAY_RANKS`,
`collapse_osm_highway_lists_by_rank`, `lanes_per_direction`, the per-node
topology / OSM-classification flag functions.
- **Edge / node attribute helpers**: aggregate node attributes onto edges
(`aggregate_nodes_to_edges`), aggregate edge attributes onto nodes
(`aggregate_edges_to_nodes`), and write attribute values through to a
graph in a tolerant way (`set_nx_edge_attributes_filled`).
- **Edge betweenness sampling**: `get_nested_edge_betweenness` runs the
per-origin Dijkstra + path-walking accumulator used by the traffic-flow
estimation pipeline in `traffic_flows.py`.
- **Graphml round-trip**: `load_consolidated_graphml` with the dtype maps
that preserve custom integer flags through `.graphml` serialization.
Sibling modules cover the workflow steps that consume this module's output:
- `aperta.network_snap` — snap-target resolution: `snap_to_network_nodes`,
`insert_projected_nodes`, `nodes_incident_to_edges`, `transport_centroid`,
`split_edge_at_point`, `split_two_way_edge_at_point`. Re-exported below
for back-compat.
- `aperta.routing_prep` — mode-aware preparation: `prepare_network`,
`compute_snap_eligibility`, `PreparedGraph`, `MODE_DEFAULTS`.
Re-exported below for back-compat.
New code should import directly from `aperta.network_snap` /
`aperta.routing_prep`; the re-exports here exist so existing callsites
(`from aperta.network_processing import prepare_network` etc.) keep working
without churn.
"""
from collections import defaultdict
from typing import Callable, Literal, cast
import networkx as nx
import numpy as np
import pandas as pd
from aperta.errors import DataError
# Back-compat re-exports — snap mechanics and routing prep moved to dedicated
# modules to keep this file under the size limit. Keep these here so existing
# `from aperta.network_processing import …` callsites and `import
# aperta.network_processing as np` namespace aliases continue to work
# unchanged. New code should import from `aperta.network_snap` and
# `aperta.routing_prep` directly.
from aperta.network_snap import ( # noqa: F401 (re-export)
SplitEdgeResult,
SplitTwoWayEdgeResult,
insert_projected_nodes,
nodes_incident_to_edges,
snap_to_network_nodes,
split_edge_at_point,
split_two_way_edge_at_point,
transport_centroid,
)
from aperta.routing_prep import ( # noqa: F401 (re-export)
MODE_DEFAULTS,
BaseMode,
Directedness,
PreparedGraph,
compute_snap_eligibility,
prepare_network,
)
# OSM highway-type ranking used by `collapse_osm_highway_lists_by_rank` and
# `flag_node_osm_classification` (for max/min per-node highway rank). Higher
# value = more major road. Anything not listed (or `None`) is treated
# as rank -1 ("not a real motor-vehicle road").
OSM_HIGHWAY_RANKS: dict[str, int] = {
"motorway": 7,
"motorway_link": 7,
"trunk": 6,
"trunk_link": 6,
"primary": 5,
"primary_link": 5,
"secondary": 4,
"secondary_link": 4,
"tertiary": 3,
"tertiary_link": 3,
"residential": 2,
"road": 2,
"living_street": 1,
"pedestrian": 1,
"unclassified": -1,
"service": -1,
"busway": -1,
"cycleway": -1,
"footway": -1,
"path": -1,
"track": -1,
"steps": -1,
"crossing": -1,
"disused": -1,
}
def _osm_highway_rank(value) -> int:
"""Rank lookup tolerant of strings, lists (OSMnx-merged), and None."""
if value is None:
return -1
if isinstance(value, list):
return max((OSM_HIGHWAY_RANKS.get(v, -1) for v in value), default=-1)
return OSM_HIGHWAY_RANKS.get(value, -1)
[docs]
def collapse_osm_highway_lists_by_rank(graph: nx.Graph) -> None:
"""Mutate `graph` in place: collapse list-valued edge `highway` to a single
string (the highest-rank value via `OSM_HIGHWAY_RANKS`).
After `osmnx.consolidate_intersections`, edges built from multiple source
edges have `highway` as a *list* of strings. Most downstream code expects
a single string and silently picks the first element (e.g.
`osmnx.add_edge_speeds` does this internally), which is not principled
when the merged edges differ in road class. This helper picks the most
*major* value instead (motorway > trunk > primary > … > unclassified).
Unknown highway names map to rank `-1`; when a list contains only unknowns
the resulting collapsed value is the unknown with the highest dict-order.
Edges without a `highway` attribute are left alone.
Auto-called from inside `consolidate_intersections`; callable standalone
for graphs consolidated by external tooling.
"""
if graph.is_multigraph():
edges_data = (d for _, _, _, d in graph.edges(keys=True, data=True))
else:
edges_data = (d for _, _, d in graph.edges(data=True))
for d in edges_data:
hw = d.get("highway")
if not isinstance(hw, list):
continue
ranks = [OSM_HIGHWAY_RANKS.get(v_, -1) for v_ in hw]
d["highway"] = hw[ranks.index(max(ranks))]
[docs]
def set_nx_edge_attributes_filled(
graph: nx.MultiGraph, attr: dict | pd.Series, attr_name: str, fill_value=0, strict: bool = False
):
"""Set per-edge attribute `attr_name` on `graph`, filling missing edges with `fill_value`.
`nx.set_edge_attributes` silently leaves edges absent from the input mapping
without the attribute, which is a footgun for downstream code that expects
the attribute to be present on every edge. This wrapper writes `fill_value`
instead.
Args:
graph: a MultiGraph (uses `(u, v, k)` edge keys).
attr: edge → value mapping, keyed by `(u, v, k)` tuples.
attr_name: edge attribute name to write.
fill_value: value to assign to edges missing from `attr`. Default 0.
strict: if True, raise `DataError` when `attr` is missing any of the
graph's edges. Default False (silently fill).
Returns:
`graph`, mutated in place.
"""
if strict:
_idx = pd.Series(index=list(graph.edges(keys=True)))
n = len(_idx.index.difference(pd.Series(attr).index))
if n > 0:
raise DataError("Incomplete data: {n:,} edges are missing in `attr'.")
_data = {k: attr.get(k, fill_value) for k in graph.edges(keys=True)}
nx.set_edge_attributes(graph, _data, attr_name)
return graph
[docs]
def get_nested_edge_betweenness(
graph: nx.Graph,
nested_node_sample: dict,
weight: str | None = None,
*,
cutoff: float | None = None,
) -> pd.Series:
"""Edge usage counts from a nested (origin → sampled-destinations) sample.
For each origin in `nested_node_sample`, runs a single-source Dijkstra
on `graph` (via `scipy.sparse.csgraph.dijkstra` with `return_predecessors`),
walks the predecessor chain from each sampled destination back to the
origin, and adds 1 to every edge on the path. The result is the
weighted sum over all sampled OD pairs — a "traffic-stress"-style edge
usage count, not classical Brandes' betweenness.
Repeated destinations in the per-origin sample naturally count multiple
times (each occurrence adds 1 to its path's edges), so weight comes
from the upstream sampling step's destination distribution.
Args:
graph: networkx graph (any variant). MultiGraph parallel edges with the
same `(u, v)` collapse to the min-`weight` edge for routing,
and the chosen key is the one credited in the output.
nested_node_sample: `{origin_node -> array_of_dest_nodes}`, typically
from `traffic_flows.nested_node_sample`. Origins are unique;
duplicate destinations within an origin's array are fine.
weight: edge attribute name to use as the per-edge cost (e.g.
`'duration_s'`). Required — there's no "all edges weight 1"
default since traffic-flow sampling always needs real costs.
cutoff: optional network-distance cutoff in weight units. Passed to
`csg.dijkstra(limit=cutoff)` to truncate each per-origin search
once destinations beyond the cutoff are unreachable anyway. Set
this to the upstream sampling radius (typically `r_zones` from
`od_pairs.get_pairs`) — destinations sampled within that radius
are guaranteed reachable within `cutoff`, and the truncation
gives a large speed-up on country-scale graphs. Default `None`
= no cutoff.
Returns:
`pd.Series` indexed by edge ID — `(u, v)` for plain graphs, `(u, v, k)`
for multigraphs — with the accumulated edge usage count.
"""
# Local import to keep scipy.sparse out of the module load path.
import scipy.sparse.csgraph as csg
from aperta.routing import _graph_to_csr
if weight is None:
raise ValueError("`weight` is required: traffic-flow sampling needs a real edge cost.")
is_multi = graph.is_multigraph()
csr, nx_to_seq, seq_to_nx, parallel_keys = _graph_to_csr(
graph, weight, return_parallel_keys=True
)
limit = cutoff if cutoff is not None else np.inf
out: dict = defaultdict(float)
for orig_nx, dest_nodes in nested_node_sample.items():
if orig_nx not in nx_to_seq:
continue
orig_seq = nx_to_seq[orig_nx]
_, pred = csg.dijkstra(csr, indices=[orig_seq], limit=limit, return_predecessors=True)
pred_row = pred[0]
for dest_nx in dest_nodes:
v_seq = nx_to_seq.get(dest_nx)
if v_seq is None or v_seq == orig_seq:
continue
# Walk predecessors back to the origin; accumulate 1 per edge.
while v_seq != orig_seq:
u_seq = pred_row[v_seq]
if u_seq < 0:
break # unreachable / beyond cutoff
edge_key: tuple
if is_multi:
k = parallel_keys.get((int(u_seq), int(v_seq)))
edge_key = (seq_to_nx[int(u_seq)], seq_to_nx[int(v_seq)], k)
else:
edge_key = (seq_to_nx[int(u_seq)], seq_to_nx[int(v_seq)])
out[edge_key] += 1
v_seq = u_seq
return pd.Series(out)
def _add_to_edge_info(node_row, collected_edge_information, cols, node_edge_relations):
"""Fan a node's feature values out onto each edge it touches."""
if isinstance(node_edge_relations, str):
edge_ids = node_row[node_edge_relations].split(",")
elif isinstance(node_edge_relations, nx.Graph):
edge_ids = list(node_edge_relations.edges(node_row.name, keys=True))
else:
raise TypeError("node_edge_relations must be a str or nx.Graph.")
for edge_id in edge_ids:
if edge_id not in collected_edge_information:
collected_edge_information[edge_id] = {col: [] for col in cols}
for col in cols:
collected_edge_information[edge_id][col].append(node_row[col])
return collected_edge_information
[docs]
def aggregate_nodes_to_edges(
df_nodes: pd.DataFrame,
cols: list[str],
node_edge_relations: str | nx.Graph,
*,
aggregator: str,
) -> pd.DataFrame:
"""Aggregate node-level features onto the edges they touch (sum or mean).
Args:
df_nodes: list of nodes, supplied as a DataFrame.
cols: list of columns in df_nodes to be mapped to edges.
node_edge_relations: if str, must list the edges belonging to each node in column
'node_edge_relations' in df_nodes, separated by a comma (,). Otherwise, supply an
nx.Graph where the ID of each node corresponds to the index in df_nodes.
aggregator: how to aggregate values from different nodes onto a single edge.
One of `'sum'`, `'mean'`, `'median'`.
"""
collected_edge_information: dict = {}
df_nodes.apply(
lambda row: _add_to_edge_info(row, collected_edge_information, cols, node_edge_relations),
axis=1,
)
for k, d in collected_edge_information.items():
for col, values in d.items():
if aggregator == "sum":
collected_edge_information[k][col] = sum(values)
elif aggregator == "mean":
collected_edge_information[k][col] = float(np.average(values))
elif aggregator == "median":
collected_edge_information[k][col] = float(np.median(values))
else:
raise NotImplementedError(f"aggregator `{aggregator}` is not implemented.")
return pd.DataFrame.from_dict(collected_edge_information, orient="index")
def _mean_numeric(values: list):
"""Mean over values coercible to float; first value as fallback if none coerce.
OSM `lanes` / `maxspeed` come through as strings (sometimes numeric like
`'50'`, sometimes with units / labels like `'50 mph'` or `'RU:urban'`).
Coercible values are averaged; non-coercible are skipped. If nothing
parses, returns the first raw value (preserves a sensible default rather
than producing `NaN`).
"""
nums: list[float] = []
for v in values:
try:
nums.append(float(v))
except (TypeError, ValueError):
continue
if nums:
return sum(nums) / len(nums)
return values[0] if values else None
# Default edge-attribute aggregators applied to LIST-VALUED edge attrs
# post-consolidation. `lanes` and `maxspeed` get numeric-mean so merged
# edges expose single values; non-merged edges keep whatever the source
# had (typically a single string from OSM).
#
# `length` deliberately not here: OSMnx 2.x sums it across merged edges,
# but the merged edge has a single geometry whose actual length is
# *smaller* than that sum (parallel paths collapse to one). We recompute
# `length` from `geometry.length` post-consolidation in metric units.
_DEFAULT_EDGE_ATTR_AGGS = {
"lanes": _mean_numeric,
"maxspeed": _mean_numeric,
}
def _parse_lanes(raw) -> float | None:
"""OSM `lanes` is messy — string, list, missing. Returns float or None."""
if isinstance(raw, list):
raw = raw[0] if raw else None
try:
return float(raw)
except (TypeError, ValueError):
return None
[docs]
def lanes_per_direction(edge_data: dict) -> float:
"""Per-direction lane count for a directed edge. OSM-specific: reads the
OSM `lanes` and `oneway` tag conventions.
OSM's `lanes` tag is the **total** lane count across both directions on
two-way roads, and OSMnx inherits the same value on both directional
edges. Any per-direction quantity (directional AADT, per-lane capacity)
is therefore off by ~2× on two-way segments without correction — and
biased *unequally* between mostly-one-way road classes (motorways) and
mostly-two-way ones (primary / secondary), which a single coefficient
can't absorb.
Rules:
- `oneway=True`: all lanes are in this direction → return lanes.
- `lanes` missing: OSM implicit default (1 per direction) → return 1.
- `lanes ≤ 1`: can't split a single lane → return 1.
- otherwise: return lanes / 2.
Pure function over `edge_data` — caller decides whether to write the
result back as an edge attribute. `consolidate_intersections` calls
this for every consolidated edge and stores the result as
`lanes_per_direction`.
"""
lanes = _parse_lanes(edge_data.get("lanes"))
oneway = bool(edge_data.get("oneway", False))
if lanes is None:
return 1.0
if oneway or lanes <= 1:
return max(1.0, lanes)
return lanes / 2.0
# Edge attributes dropped post-consolidation by `consolidate_intersections`
# (callers can override). `name` is the main offender: it lists across
# merged edges, costs disk space in `.graphml`, and isn't used anywhere
# in aperta.
_DEFAULT_DROP_EDGE_ATTRS = ["name"]
def _int_via_float(value) -> int:
"""`int(float(v))` — tolerates both `'0'`/`'1'` and `'0.0'`/`'1.0'` strings.
Plain `int()` raises on float-formatted strings (`int('0.0')` → ValueError).
Used as the cast for graphml-loaded `is_*` flags so older saves (where
these were written as floats) and newer saves (ints) both round-trip
cleanly.
"""
return int(float(value))
def _int_via_bool_or_float(value) -> int:
"""Tolerant graphml-cast for attrs that may have been written as Python
`bool` (round-trips as `'True'` / `'False'` strings) or as `int` (`'0'`,
`'1'`, `'0.0'`, `'1.0'`).
Used by the prefix-scan dtype map for the per-mode flags
(`is_snap_eligible_<mode>`, `cost_excluded_<mode>`) — `prepare_network`
writes them as Python `bool` so they stringify as `'True'`/`'False'`,
while `insert_projected_nodes` writes `is_virtual=1` (int) which
stringifies as `'1'`. Both must come back as `0` / `1`.
"""
if isinstance(value, (bool, int)):
return int(value)
if value in ("True", "true"):
return 1
if value in ("False", "false"):
return 0
return int(float(value))
# Per-node attribute dtypes that `consolidate_intersections` writes as
# ints. OSMnx's own `default_node_dtypes` only knows about its built-in
# attrs (elevation, x, y, osmid, street_count, lat, lon), so without this
# constant our custom `is_*` and `*_highway_rank` flags round-trip as
# strings — and `int('0.0')` would raise downstream. The values are
# integer indicators (0 / 1 for the `is_*` flags, -1…7 for highway rank),
# so int round-trips cleanly (`'0'` / `'1'`) and `is_roundabout == 1`
# works as written. Pass to `ox.load_graphml` via the `node_dtypes`
# kwarg, or use `load_consolidated_graphml` below.
_CONSOLIDATED_NODE_DTYPES: dict[str, Callable] = {
"n_streets": _int_via_float,
"is_t_junction": _int_via_float,
"is_4way": _int_via_float,
"is_t_junction_major": _int_via_float,
"is_4way_major": _int_via_float,
"is_t_junction_anchor": _int_via_float,
"is_4way_anchor": _int_via_float,
"max_highway_rank": _int_via_float,
"min_highway_rank": _int_via_float,
# Per-obstacle `is_<name>` flags are dynamic; the default set used by
# `consolidate_intersections` is included below. Callers that pass
# custom `obstacle_node_tags` should extend this dict accordingly.
"is_traffic_signal": _int_via_float,
"is_stop": _int_via_float,
"is_yield": _int_via_float,
"is_roundabout": _int_via_float,
}
# Per-edge attribute dtypes used downstream by aperta but not covered by
# OSMnx's `default_edge_dtypes`. Without these, the attrs round-trip as
# strings and arithmetic breaks downstream.
#
# Sources:
# - `lanes_per_direction`: written by `consolidate_intersections`.
# - `density_norm`, `is_t_junction`, `is_4way`, `is_traffic_signal`:
# endpoint-mean of the per-node values, written by per-project density
# prep steps (see `examples/extended/prepare/5_density.py`). Values
# live in {0, 0.5, 1} on edges (mean of {0, 1} node flags), so `float`
# casts cleanly. Listed here so per-edge-feature aggregation (e.g.
# `calibration.calibrate_edge_weights`) doesn't choke on string-typed
# numerics post-reload.
_CONSOLIDATED_EDGE_DTYPES: dict[str, Callable] = {
"lanes_per_direction": float,
"density_norm": float,
"is_t_junction": float,
"is_4way": float,
"is_traffic_signal": float,
}
# Prefix-scan attribute prefixes for dynamic per-mode bool flags. Graphml
# stringifies bool/int values; without a dtype cast on load, `'False'`
# arrives as the literal string and `bool('False') == True` silently
# inverts downstream tests. `prepare_network` / `compute_snap_eligibility`
# embed the mode in the flag name (`is_snap_eligible_walk`,
# `cost_excluded_car_night`, etc.), so we can't hardcode the keys — instead
# we scan the graphml file's `<key>` elements at load time and add an
# `_int_via_float` cast for every attribute whose name starts with one of
# these prefixes (or matches a fixed name like `is_virtual`).
_PREFIX_SCAN_NODE = ("is_snap_eligible_", "is_virtual")
_PREFIX_SCAN_EDGE = ("cost_excluded_",)
def _scan_graphml_keys(
filepath, prefixes: tuple[str, ...], scope: Literal["node", "edge"]
) -> list[str]:
"""Return the names of `<key for=scope>` graphml attributes that start
with any of `prefixes` (or equal one — handles the no-suffix `is_virtual`
case naturally via the prefix-match condition).
Cheap: parses the `<key>` schema header only, stops before iterating
nodes / edges (which is where the cost lives on large graphs).
"""
import xml.etree.ElementTree as ET
ns = "{http://graphml.graphdrawing.org/xmlns}"
out: list[str] = []
for _event, elem in ET.iterparse(filepath, events=("end",)):
if elem.tag == f"{ns}key" and elem.get("for") == scope:
name = elem.get("attr.name", "")
if any(name.startswith(p) or name == p for p in prefixes):
out.append(name)
elif elem.tag == f"{ns}graph":
break
elem.clear()
return out
[docs]
def load_consolidated_graphml(
filepath, *, node_dtypes: dict | None = None, edge_dtypes: dict | None = None, **kwargs
):
"""Load a graphml saved by `consolidate_intersections`, casting aperta's
custom `is_*` / `*_highway_rank` / `is_snap_eligible_<mode>` /
`cost_excluded_<mode>` / `is_virtual` attrs back to their proper types.
OSM-specific — expects graphs in the OSMnx node/edge attribute
convention; for non-OSM pickle/graphml saves, call `nx.read_graphml`
directly and apply your own project-specific dtype casts.
Thin wrapper around `osmnx.load_graphml`. Three sources of casts get
merged in (in order; later ones win):
1. `_CONSOLIDATED_NODE_DTYPES` / `_CONSOLIDATED_EDGE_DTYPES` — fixed
casts for attrs `consolidate_intersections` writes (e.g.
`n_streets`, `is_t_junction`, `lanes_per_direction`).
2. **Prefix-scan** of the graphml `<key>` schema for the dynamic
per-mode flags written by `prepare_network` /
`compute_snap_eligibility` / `insert_projected_nodes`:
`is_snap_eligible_<mode>` and `is_virtual` (per-node);
`cost_excluded_<mode>` (per-edge). Handles any `mode` label,
including subtypes like `car_night`, without hardcoding.
3. Caller-supplied `node_dtypes` / `edge_dtypes` overrides.
Without the prefix-scan, the per-mode bool flags round-trip as the
literal strings `'True'` / `'False'`, and `bool('False') == True` would
silently invert the `if not d.get(cost_excluded_flag, False)` test in
`_compute_snap_eligible_nodes`.
Args:
filepath: path to a `.graphml` produced by `consolidate_intersections`
or `prepare_network` / `compute_snap_eligibility`.
node_dtypes: optional override merged on top (caller's values win).
edge_dtypes: optional override merged on top (caller's values win).
**kwargs: forwarded to `osmnx.load_graphml`.
Returns:
`nx.MultiDiGraph` for graphs saved from a directed source, or
`nx.MultiGraph` for undirected ones. (OSMnx's docstring claims
`MultiDiGraph` unconditionally; that's wrong — `nx.read_graphml`
honors the file's `<graph edgedefault>` attribute.)
"""
import osmnx as ox
auto_node = {
name: _int_via_bool_or_float
for name in _scan_graphml_keys(filepath, _PREFIX_SCAN_NODE, "node")
}
auto_edge = {
name: _int_via_bool_or_float
for name in _scan_graphml_keys(filepath, _PREFIX_SCAN_EDGE, "edge")
}
merged_node = {**_CONSOLIDATED_NODE_DTYPES, **auto_node, **(node_dtypes or {})}
merged_edge = {**_CONSOLIDATED_EDGE_DTYPES, **auto_edge, **(edge_dtypes or {})}
return ox.load_graphml(filepath, node_dtypes=merged_node, edge_dtypes=merged_edge, **kwargs)
[docs]
def consolidate_intersections(
graph: nx.MultiDiGraph,
tolerance: float,
*,
obstacle_buffer: float = 30.0,
obstacle_node_tags: dict[str, tuple[str, str]] | None = None,
obstacle_locations: dict[str, list[tuple[float, float]]] | None = None,
detect_roundabouts: bool = True,
roundabout_locations: list[tuple[float, float]] | None = None,
node_attr_aggs: dict | None = None,
edge_attr_aggs: dict | None = None,
drop_edge_attrs: list[str] | None = None,
):
"""OSMnx intersection consolidation + obstacle-aware re-flagging.
Wraps `osmnx.consolidate_intersections(rebuild_graph=True)` with the
post-processing OSMnx alone misses: traffic-signal / stop / give-way
nodes typically sit a few metres off the geometric intersection
centre, so OSMnx's `tolerance`-based merge can throw those nodes away
rather than carrying the `highway=traffic_signals` tag onto the
surviving consolidated node. The result is a consolidated graph in
which most intersections are not flagged as signalised even when
they actually are — a distortion for any edge-weight model that
penalises signals.
This wrapper captures obstacle locations from the *original* graph
before consolidation, then spatially re-attaches them to the nearest
surviving consolidated node within `obstacle_buffer` metres. The
same trick handles roundabouts, whose `junction=roundabout` tag
lives on edges (not nodes) in OSM and is otherwise lost when the
roundabout collapses to a single consolidated node.
The returned graph has the per-node attributes set by
`flag_node_intersection_topology` (`n_streets`, `is_t_junction`,
`is_4way`) and `flag_node_osm_classification` (`max_highway_rank`,
`min_highway_rank`, `is_t_junction_major`, `is_4way_major`,
`is_t_junction_anchor`, `is_4way_anchor`), plus one `is_<name>` per
requested obstacle type, plus `is_roundabout` if
`detect_roundabouts=True`. Edge `highway` lists from the consolidation
are collapsed to the highest-rank single string via
`collapse_osm_highway_lists_by_rank`. Each edge also gets
`lanes_per_direction` (the OSM `lanes` tag corrected for two-way
roads — see `lanes_per_direction()`). **Node IDs are new integer IDs**
(per OSMnx behaviour) — caller must re-snap geo units to the
consolidated graph.
OSM-specific. The wrapper consumes OSMnx graphs, reads the OSM
`highway` / `junction` / `traffic_signals` tag conventions, and
writes `OSM_HIGHWAY_RANKS`-derived per-node attributes. Projects
working with non-OSM road networks (e.g. LUMOS's simplified 3-tier
network) should skip consolidation entirely and call
`flag_node_intersection_topology` directly for the network-agnostic
`is_t_junction` / `is_4way` flags.
**Geometry guarantee**: every consolidated edge carries a `geometry`
LineString (OSMnx attaches one during the rebuild). This isn't true
of raw OSMnx graphs — `simplify=True` omits `geometry` from pure
point-to-point edges (~10 % of edges typically), and downstream code
that needs per-edge geometry (e.g. dual-graph construction, plotting
with curvature) on a raw graph has to call
`osmnx.graph_to_gdfs(..., fill_edge_geometry=True)` and copy
`geometry` back. Consolidating first sidesteps that step.
Args:
graph: an OSMnx MultiDiGraph (projected; `tolerance` is in graph
CRS units, usually metres). `osmnx` is required (optional
extra `osm`).
tolerance: nodes within this distance are merged. Typical urban
values: 5–15 m; ~25 m for sparser networks.
obstacle_buffer: max distance to which an obstacle from the
original graph is re-attached to a consolidated node.
Should be at least as large as `tolerance`; default 30 m
comfortably covers signalised intersections.
obstacle_node_tags: `{flag_name -> (osm_key, osm_value)}` — OSM
node tags to extract as obstacles. Default:
`{'traffic_signal': ('highway', 'traffic_signals')}`. Add
`'stop': ('highway', 'stop')`, `'give_way': ('highway',
'give_way')`, etc., as needed.
obstacle_locations: pre-supplied `{flag_name -> [(x, y), ...]}` map.
When given, the obstacle extraction from `obstacle_node_tags` is
skipped — useful when obstacles come from a non-OSM source or
were captured upstream.
detect_roundabouts: if True (default), edges with
`junction=roundabout` are detected before consolidation and
their midpoints get re-attached as `is_roundabout`.
roundabout_locations: pre-supplied list of roundabout midpoints
`[(x, y), ...]`. When given, skips the edge-based roundabout
detection from `detect_roundabouts`.
node_attr_aggs: passed through to `ox.consolidate_intersections`.
Any per-node attribute not listed here that varies across the
nodes being merged will be carried through as a **list** of
values.
edge_attr_aggs: passed through to `ox.consolidate_intersections`
to control how per-edge attributes are aggregated when parallel
edges between the same `(u, v)` are collapsed.
drop_edge_attrs: edge attributes to drop after consolidation. Use
for attributes that osmnx's aggregation leaves in a confusing
list-of-values form. Defaults to `_DEFAULT_DROP_EDGE_ATTRS`.
Returns:
Consolidated `nx.MultiDiGraph` with new integer node IDs.
"""
import osmnx as ox
from scipy.spatial import KDTree
# 1. Obstacle + roundabout locations.
# Pre-extracted `obstacle_locations` / `roundabout_locations` win —
# pass these from a canonical source (typically the raw car graph)
# so all network types share the same obstacle awareness
# (signals on trunk roads, for example, are dropped from walk
# graphs by OSMnx's network_type filter and would otherwise be
# absent from walk-graph consolidation entirely). Otherwise we
# extract from `graph` itself via `extract_obstacle_locations`.
if obstacle_locations is None or (detect_roundabouts and roundabout_locations is None):
auto_obstacle_xy, auto_roundabout_xy = extract_obstacle_locations(
graph,
obstacle_node_tags=obstacle_node_tags,
detect_roundabouts=detect_roundabouts,
)
if obstacle_locations is None:
obstacle_locations = auto_obstacle_xy
if detect_roundabouts and roundabout_locations is None:
roundabout_locations = auto_roundabout_xy
# 2. Consolidate. OSMnx 2.x doesn't expose `edge_attr_aggs`, so the
# edge aggregation (numeric-mean for `lanes` / `maxspeed`, etc.)
# runs as a post-pass below on list-valued attrs only.
# `rebuild_graph=True` guarantees the return is a MultiDiGraph (the
# GeoSeries return is only when `rebuild_graph=False`), but OSMnx's
# signature is a union — cast for the type checker.
consolidated = cast(
nx.MultiDiGraph,
ox.consolidate_intersections(
graph,
tolerance=tolerance,
rebuild_graph=True,
reconnect_edges=True,
node_attr_aggs=node_attr_aggs,
),
)
# 3. Post-consolidation edge cleanup:
# - drop unwanted attrs (saves disk space + avoids round-trip
# ambiguity for non-aggregated lists);
# - collapse list-valued attrs in `edge_attr_aggs` to single
# values;
# - recompute `length` from the edge geometry. OSMnx sums
# `length` across merged source edges, which inflates it for
# parallel-path merges — the merged edge's actual geometry is
# shorter than that sum. `geometry.length` gives metres in our
# metric-CRS graphs.
drop_attrs = _DEFAULT_DROP_EDGE_ATTRS if drop_edge_attrs is None else drop_edge_attrs
eff_edge_aggs = _DEFAULT_EDGE_ATTR_AGGS if edge_attr_aggs is None else edge_attr_aggs
for _, _, _, d in consolidated.edges(keys=True, data=True):
for attr in drop_attrs:
d.pop(attr, None)
for attr, aggregator in eff_edge_aggs.items():
if isinstance(d.get(attr), list):
d[attr] = aggregator(d[attr])
geom = d.get("geometry")
if geom is not None:
d["length"] = float(geom.length)
# Derived per-direction lane count — see `lanes_per_direction()`
# for rationale. Runs after the lanes aggregator so list-valued
# OSM tags are already collapsed.
d["lanes_per_direction"] = lanes_per_direction(d)
# 4. Collapse list-valued highway to a single string, then per-node
# intersection topology + OSM highway-rank classification.
collapse_osm_highway_lists_by_rank(consolidated)
flag_node_intersection_topology(consolidated)
flag_node_osm_classification(consolidated)
# 5. Spatial re-attachment: nearest consolidated node within
# obstacle_buffer gets the obstacle / roundabout flag.
node_ids = list(consolidated.nodes)
if not node_ids:
return consolidated
node_xy = np.array([(consolidated.nodes[n]["x"], consolidated.nodes[n]["y"]) for n in node_ids])
tree = KDTree(node_xy)
def _allocate(locations: list[tuple[float, float]], flag_name: str) -> None:
for nid in consolidated.nodes():
consolidated.nodes[nid][f"is_{flag_name}"] = 0
if not locations:
return
dists, idxs = tree.query(np.asarray(locations), distance_upper_bound=obstacle_buffer)
# query returns idx == len(node_xy) for misses with distance_upper_bound.
valid = (idxs < len(node_ids)) & np.isfinite(dists)
for i in np.where(valid)[0]:
consolidated.nodes[node_ids[int(idxs[i])]][f"is_{flag_name}"] = 1
for name, locs in obstacle_locations.items():
_allocate(locs, name)
if detect_roundabouts and roundabout_locations is not None:
_allocate(roundabout_locations, "roundabout")
return consolidated
[docs]
def flag_node_intersection_topology(graph: nx.Graph) -> None:
"""Mutate `graph` in place to add per-node **topology-only** intersection
flags. Network-agnostic — works on any graph regardless of where it came
from (OSM, a custom road dataset, a synthetic graph) since it inspects
only neighbour count, not edge tags.
Per-node attributes written:
- `n_streets` — number of distinct neighbour nodes (degree in the
undirected sense, ignoring edge direction and parallel edges). The
"physical" intersection size: 1 = dead-end, 2 = passthrough,
3 = T-junction, 4+ = 4-way intersection or denser.
- `is_t_junction` — 1 if `n_streets == 3`, else 0.
- `is_4way` — 1 if `n_streets >= 4`, else 0.
`is_t_junction` and `is_4way` are **mutually exclusive** — a 4-way
node carries only `is_4way`. (Degree 1 / 2 nodes — leaves and
passthroughs — get neither.)
OSM-tag-based per-node classifications (highway rank, `_major` /
`_anchor` variants) live in the companion function
`flag_node_osm_classification`, which must be called AFTER this one
if you want the rank-conditional variants (since they're conditional
on `is_t_junction` / `is_4way`). A project working with a non-OSM
road network (e.g., LUMOS's simplified 3-tier networks) can call
this function alone and supply its own project-specific classifier
on top.
Per-node obstacle flags (`is_traffic_signal`, `is_stop`, etc.) live
in `consolidate_intersections`, which is also OSM-specific.
"""
is_directed = graph.is_directed()
for nid in graph.nodes():
if is_directed:
neighbours = set(graph.predecessors(nid)) | set(graph.successors(nid))
else:
neighbours = set(graph.neighbors(nid))
n_streets = len(neighbours)
graph.nodes[nid]["n_streets"] = n_streets
graph.nodes[nid]["is_t_junction"] = int(n_streets == 3)
graph.nodes[nid]["is_4way"] = int(n_streets >= 4)
[docs]
def flag_node_osm_classification(graph: nx.Graph) -> None:
"""Mutate `graph` in place to add **OSM-tag-based** per-node classification
attributes derived from the per-edge `highway` tag (OSM convention).
Reads the per-edge `highway` attribute via `OSM_HIGHWAY_RANKS` and the
per-node `is_t_junction` / `is_4way` flags. Call
`flag_node_intersection_topology` first so those flags are present.
Per-node attributes written:
- `max_highway_rank` — max `OSM_HIGHWAY_RANKS` value over edges
incident to this node (`-1` for unknown / not-a-real-road, e.g.
footways).
- `min_highway_rank` — same with min.
- `is_t_junction_major` — `is_t_junction` AND `min_highway_rank >= 3`
(every incident edge is tertiary or better — a "fully classified"
T-junction with no minor branches).
- `is_4way_major` — `is_4way` AND `min_highway_rank >= 3`.
- `is_t_junction_anchor` — `is_t_junction` AND `max_highway_rank >= 3`
AND `min_highway_rank <= 5` (at least one tertiary-or-better edge,
and not exclusively trunk / motorway — a trip-anchor T-junction
where car trips can naturally begin or end).
- `is_4way_anchor` — `is_4way` AND the same rank condition.
The two OSM-derived intersection tiers — `_major`, `_anchor` —
capture progressively different selection criteria for downstream
snap targets and edge-weight features:
- **`_major`**: intersections where every connecting street is at
least tertiary class. Used when only "real road" junctions matter
(e.g., generating a coarse zone-snap candidate set).
- **`_anchor`**: intersections that touch at least one main road
(tertiary or better) and aren't purely highway interchanges. Used
as priority snap targets for car routing — trips begin and end at
anchor nodes.
**Non-OSM networks**: this function only fires on graphs whose edges
carry the OSM `highway` attribute (or `OSM_HIGHWAY_RANKS`-compatible
string values for it). For networks with a different classification
scheme (e.g., LUMOS's simplified 3-tier network with `highway` /
`autostrasse` / `main_street` tiers), write a project-specific
classifier that follows the same per-node-attribute pattern.
"""
is_multi = graph.is_multigraph()
# Per-node max / min highway rank from incident edges.
node_max = {n: float("-inf") for n in graph.nodes}
node_min = {n: float("inf") for n in graph.nodes}
if is_multi:
for u, v, _, d in graph.edges(keys=True, data=True):
rank = _osm_highway_rank(d.get("highway"))
for endpoint in (u, v):
if rank > node_max[endpoint]:
node_max[endpoint] = rank
if rank < node_min[endpoint]:
node_min[endpoint] = rank
else:
for u, v, d in graph.edges(data=True):
rank = _osm_highway_rank(d.get("highway"))
for endpoint in (u, v):
if rank > node_max[endpoint]:
node_max[endpoint] = rank
if rank < node_min[endpoint]:
node_min[endpoint] = rank
for nid in graph.nodes():
is_t = bool(graph.nodes[nid].get("is_t_junction", 0))
is_4 = bool(graph.nodes[nid].get("is_4way", 0))
mx = node_max[nid]
mn = node_min[nid]
max_rank = int(mx) if mx != float("-inf") else -1
min_rank = int(mn) if mn != float("inf") else -1
is_major = min_rank >= 3
is_anchor = (max_rank >= 3) and (min_rank <= 5)
graph.nodes[nid]["max_highway_rank"] = max_rank
graph.nodes[nid]["min_highway_rank"] = min_rank
graph.nodes[nid]["is_t_junction_major"] = int(is_t and is_major)
graph.nodes[nid]["is_4way_major"] = int(is_4 and is_major)
graph.nodes[nid]["is_t_junction_anchor"] = int(is_t and is_anchor)
graph.nodes[nid]["is_4way_anchor"] = int(is_4 and is_anchor)
[docs]
def aggregate_edges_to_nodes(
graph: nx.Graph,
edge_attribute: str | Callable,
*,
aggregator: str | Callable = "max",
) -> pd.Series:
"""For each node in `graph`, aggregate `edge_attribute` across its connected edges.
The inverse of `aggregate_nodes_to_edges` (which propagates per-node
features onto edges). Common use: classify each node by the highest-class
road that touches it (`aggregator='max'`) — useful for filtering snap
targets in `snap_to_network_nodes` (skip motorway-only nodes, etc.).
For MultiGraphs / MultiDiGraphs, parallel edges between the same `(u, v)`
each contribute their own value — for `'max'` this is harmless, for
`'mean'` it slightly weights duplicated edges. For OSMnx graphs (where
parallel edges typically carry identical attributes), this is fine.
Args:
graph: NetworkX graph.
edge_attribute: name of an edge attribute (`str`) or a callable
`(u, v, data) -> value`. Edges where the attribute is missing
and the string form is used contribute `NaN`.
aggregator: `'max'` (default), `'min'`, `'mean'`, `'sum'`, or a
callable that takes a 1-D numpy array of per-edge values and
returns a scalar. NaN handling is left to the aggregator
(`'max'`/`'min'`/`'mean'`/`'sum'` use the nan-safe numpy
variants and silently skip NaN edge values).
Returns:
`pd.Series` indexed by node ID with the per-node aggregated value.
Isolated nodes (no edges) are absent from the result.
"""
if isinstance(edge_attribute, str):
attr_name = edge_attribute
def _attr(u, v, data):
return data.get(attr_name, np.nan)
elif callable(edge_attribute):
_attr = edge_attribute # signature (u, v, data) -> value
else:
raise ValueError(
f"`edge_attribute` must be a string or callable, got {type(edge_attribute).__name__}."
)
_agg: Callable
if aggregator == "max":
_agg = np.nanmax
elif aggregator == "min":
_agg = np.nanmin
elif aggregator == "mean":
_agg = np.nanmean
elif aggregator == "sum":
_agg = np.nansum
elif callable(aggregator):
_agg = aggregator
else:
raise ValueError(
f"Unknown aggregator {aggregator!r}; expected "
f"'max', 'min', 'mean', 'sum', or a callable."
)
per_node: defaultdict = defaultdict(list)
is_multi = isinstance(graph, (nx.MultiGraph, nx.MultiDiGraph))
if is_multi:
for u, v, _k, data in graph.edges(keys=True, data=True):
val = float(_attr(u, v, data))
per_node[u].append(val)
per_node[v].append(val)
else:
for u, v, data in graph.edges(data=True):
val = float(_attr(u, v, data))
per_node[u].append(val)
per_node[v].append(val)
# Aggregate with nan-safe semantics; suppress the "all-NaN slice"
# warning since we return NaN in that case (and the user can filter).
with np.errstate(all="ignore"):
out = {}
for n, vals in per_node.items():
arr = np.asarray(vals, dtype=float)
finite = (
arr[np.isfinite(arr)]
if _agg in (np.nanmax, np.nanmin, np.nanmean, np.nansum)
else arr
)
if _agg in (np.nanmax, np.nanmin, np.nanmean) and finite.size == 0:
out[n] = np.nan
else:
out[n] = float(_agg(arr))
return pd.Series(out, name="node_value")