"""Iterative calibration of per-edge weights against observed trip-time data.
`calibrate_edge_weights` fits a linear model relating observed point-to-point
trip times to features collected along the routed shortest path plus features
at trip endpoints. The same feature set defines both the per-edge weight
formula used for routing AND the regression — keeping the two consistent
(a subtle pitfall in earlier ad-hoc calibration code).
Model:
time_trip = α · baseline_time
+ Σ_m coef_m · (baseline_time · length-weighted-avg of m along path)
+ Σ_a coef_a · (sum of a along path)
+ Σ_e coef_e · (endpoint value of e)
+ constant
where features come in three classes (matching how they enter the per-edge
duration formula in `examples/swiss/prepare/4_edge_weights.ipynb`):
- **multiplier**: scales baseline speed (so it multiplies baseline time per
edge — appears in the regression as `baseline_time · feature_avg`).
Examples: local density, traffic flow.
- **additive_route**: adds seconds per unit summed along the path. Examples:
intersection counts (sec per intersection), elevation gain (sec per metre).
- **additive_endpoint**: adds seconds based on the value of a node attribute
at the origin and at the destination. Examples: snap distance, local
density.
Iteration (option A from the design discussion): re-route after each OLS fit,
since updated coefficients change edge weights and therefore the chosen path
+ feature aggregates. Cheap to repeat — usually converges in 2-3 passes.
This module does NOT compute betweenness / traffic flows itself. Treat the
traffic estimate as just another per-edge attribute the caller supplies (e.g.
via `network_processing.get_nested_edge_betweenness`). Then include it in
`multiplier_features` (if it scales duration like density) or
`additive_route_features` (if seconds-per-unit).
"""
import logging
from dataclasses import dataclass
from typing import Callable
import geopandas as gpd
import networkx as nx
import numpy as np
import pandas as pd
from aperta import geo_mapping, geo_processing, network_snap, routing
# Used to convert km/h → m/s.
_KMH_TO_MS = 1.0 / 3.6
[docs]
@dataclass
class CalibrationResult:
"""Outcome of `calibrate_edge_weights`.
Attributes:
coefficients: DataFrame indexed by feature name with columns
`kind` (multiplier / additive_route / additive_endpoint / const /
baseline), `coef` (fitted value), `p` (p-value),
`mean_effect` (coef × mean of column, in seconds).
r_squared: OLS R² on the held-in trips.
n_used: number of ground-truth rows that survived snap + filter +
successful routing.
predicted_times: per-trip predicted time (Series indexed by trip_id);
comparable to `ground_truth.loc[predicted_times.index, 'time_measured']`.
observed_times: per-trip observed time (same index as predicted).
routed_distances: per-trip routed distance (m); useful for
distance-band breakdowns.
rmse: overall RMSE on the held-in trips, in seconds.
rmse_by_distance: Series of RMSE per distance band, indexed by
band label (`'< 10 km'`, `'10-25 km'`, `'>= 25 km'`).
edge_duration_attr: name of the per-edge attribute written to
`graph` by the final iteration (default `'duration_calibrated'`).
Downstream routing can use this as the cost.
iter_log: DataFrame, one row per iteration, columns r_squared, rmse,
n_used — useful to inspect convergence.
"""
coefficients: pd.DataFrame
r_squared: float
n_used: int
predicted_times: pd.Series
observed_times: pd.Series
routed_distances: pd.Series
rmse: float
rmse_by_distance: pd.Series
edge_duration_attr: str
iter_log: pd.DataFrame
def _baseline_edge_duration(graph: nx.MultiDiGraph, baseline_speed_attr: str) -> dict:
"""Per-edge baseline duration in seconds: length / (speed / 3.6)."""
out: dict = {}
for u, v, k, data in graph.edges(keys=True, data=True):
speed = float(data[baseline_speed_attr])
length = float(data["length"])
if speed <= 0:
raise ValueError(
f"Edge ({u},{v},{k}) has non-positive "
f"{baseline_speed_attr}={speed!r}; calibration needs "
"a positive baseline speed."
)
out[(u, v, k)] = length / (speed * _KMH_TO_MS)
return out
def _apply_edge_durations(
graph: nx.MultiDiGraph,
baseline_duration: dict,
alpha: float,
multiplier_coefs: dict[str, float],
additive_route_coefs: dict[str, float],
out_attr: str,
min_edge_duration: float = 0.01,
) -> None:
"""Write per-edge predicted duration to `out_attr`.
edge_duration = α · baseline + baseline · Σ_m coef_m · m_value
+ Σ_a coef_a · a_value
`α` calibrates the overall baseline scale; multiplier coefs scale the
baseline by per-feature amounts (added — not multiplied — to the α
baseline term, since they enter as `baseline · feature` in the linear
OLS model). Mirrors the per-edge formula in
`examples/swiss/prepare/4_edge_weights.ipynb`, minus the floor / cap.
Per-edge duration is clipped to `min_edge_duration` seconds. Without
this, a single transient negative coefficient (mid-iteration OLS noise,
common when starting from zero on a noisy feature) can produce a
negative edge weight, which scipy's Dijkstra refuses to handle. The
clip is wide of any physically meaningful duration so well-conditioned
fits are unaffected.
"""
for u, v, k, data in graph.edges(keys=True, data=True):
base = baseline_duration[(u, v, k)]
mult_term = base * sum(c * float(data.get(f, 0.0)) for f, c in multiplier_coefs.items())
add_term = sum(c * float(data.get(f, 0.0)) for f, c in additive_route_coefs.items())
data[out_attr] = max(alpha * base + mult_term + add_term, min_edge_duration)
def _filter_and_snap_legs(
legs: pd.DataFrame,
graph: nx.MultiDiGraph,
*,
min_trip_distance: float,
max_trip_distance: float,
max_dist_to_line_ratio: float,
snap_max_distance: float,
eligible_node_ids=None,
eligible_node_flag: str | None = None,
) -> pd.DataFrame:
"""Apply trip filters + snap origin / dest to nearest network nodes.
Adds columns: `nx_node_orig`, `nx_node_dest`, `snap_dist_orig`,
`snap_dist_dest`. Drops rows where either snap fails or distance
is beyond `snap_max_distance`. `eligible_node_ids` / `eligible_node_flag`
are forwarded to `snap_to_network_nodes` to restrict snap targets to
trap-free nodes (typically `prepared.snap_eligible_nodes` from
`prepare_network`).
"""
if "dist_line" not in legs.columns:
legs = legs.copy()
dx = legs["dest_x"] - legs["orig_x"]
dy = legs["dest_y"] - legs["orig_y"]
legs["dist_line"] = np.hypot(dx, dy)
n_in = len(legs)
legs = legs[(legs["dist_line"] >= min_trip_distance) & (legs["dist_line"] < max_trip_distance)]
if "dist_measured" in legs.columns:
ratio = legs["dist_measured"] / legs["dist_line"]
legs = legs[ratio < max_dist_to_line_ratio]
logging.info(
f" Trip filter: {n_in:,} → {len(legs):,} legs after dist_line "
f"[{min_trip_distance:.0f}, {max_trip_distance:.0f}] m + "
f"dist_measured/dist_line < {max_dist_to_line_ratio}."
)
legs = legs.copy()
for side in ("orig", "dest"):
points = gpd.GeoDataFrame(
geometry=gpd.points_from_xy(legs[f"{side}_x"], legs[f"{side}_y"]),
index=legs.index,
)
node_ids, dists = network_snap.snap_to_network_nodes(
points,
graph,
max_radius=snap_max_distance,
eligible_node_ids=eligible_node_ids,
eligible_node_flag=eligible_node_flag,
)
legs[f"nx_node_{side}"] = node_ids
legs[f"snap_dist_{side}"] = dists
n_before = len(legs)
legs = legs.dropna(subset=["nx_node_orig", "nx_node_dest"])
logging.info(
f" Snap filter: {n_before:,} → {len(legs):,} legs within "
f"{snap_max_distance:.0f} m of a network node."
)
return legs
def _join_endpoint_features(
routed: pd.DataFrame, legs: pd.DataFrame, graph: nx.MultiDiGraph, endpoint_features: list[str]
) -> pd.DataFrame:
"""Add per-trip columns `<feature>_orig` and `<feature>_dest` from node attrs."""
if not endpoint_features:
return routed
nodes_iter = graph.nodes(data=True)
node_attrs = pd.DataFrame.from_dict(
{n: {f: d.get(f, np.nan) for f in endpoint_features} for n, d in nodes_iter},
orient="index",
)
aligned_legs = legs.loc[legs.index.intersection(routed.index)]
for side in ("orig", "dest"):
node_ids = aligned_legs[f"nx_node_{side}"]
for f in endpoint_features:
routed.loc[node_ids.index, f"{f}_{side}"] = node_ids.map(node_attrs[f]).values
# Snap distance is on legs, not on graph nodes — propagate it through too.
for side in ("orig", "dest"):
col = f"snap_dist_{side}"
if col in legs.columns:
routed.loc[aligned_legs.index, col] = aligned_legs[col].reindex(routed.index).values
return routed
def _build_design_matrix(
routed: pd.DataFrame,
multiplier_features: list[str],
additive_route_features: list[str],
additive_endpoint_features: list[str],
constant: bool,
) -> tuple[pd.DataFrame, list[str], list[str]]:
"""Build the OLS design matrix `X` and the list of feature column names.
Multiplier features enter as `cost · feature_avg` (a velocity-like
interaction term). Additive route features enter as raw sums. Endpoint
features become two columns each: `<f>_orig` and `<f>_dest`.
Returns `(X, feature_columns, kinds_per_column)`. The first column is
always `cost` (baseline duration along the routed path) — its OLS
coefficient is the calibrated multiplier on the per-edge baseline (the
`α` term in the model docstring).
"""
rows = {"baseline_time": routed["cost"].astype(float)}
kinds = ["baseline"]
feat_cols = ["baseline_time"]
for f in multiplier_features:
col = f"{f}__mult"
rows[col] = (routed["cost"] * routed[f]).astype(float)
kinds.append("multiplier")
feat_cols.append(col)
for f in additive_route_features:
rows[f] = routed[f].astype(float)
kinds.append("additive_route")
feat_cols.append(f)
for f in additive_endpoint_features:
for side in ("orig", "dest"):
col = f"{f}_{side}"
rows[col] = routed[col].astype(float)
kinds.append("additive_endpoint")
feat_cols.append(col)
X = pd.DataFrame(rows)
if constant:
X.insert(0, "const", 1.0)
kinds.insert(0, "const")
feat_cols.insert(0, "const")
return X, feat_cols, kinds
def _rmse_by_distance(observed: pd.Series, predicted: pd.Series, dist_line: pd.Series) -> pd.Series:
"""RMSE per distance band — `< 10 km` / `10-25 km` / `>= 25 km`."""
bands = [
("< 10 km", dist_line < 10_000),
("10-25 km", (dist_line >= 10_000) & (dist_line < 25_000)),
(">= 25 km", dist_line >= 25_000),
]
rows = {}
for label, mask in bands:
idx = observed.index[mask.reindex(observed.index, fill_value=False)]
if len(idx) == 0:
rows[label] = np.nan
else:
err = (predicted.loc[idx] - observed.loc[idx]).to_numpy()
rows[label] = float(np.sqrt((err**2).mean()))
return pd.Series(rows)
[docs]
def calibrate_edge_weights(
graph: nx.MultiDiGraph,
ground_truth: pd.DataFrame,
*,
baseline_speed_attr: str = "speed_kph",
multiplier_features: dict[str, float] | None = None,
additive_route_features: dict[str, float] | None = None,
additive_endpoint_features: dict[str, float] | None = None,
constant: bool = True,
n_iterations: int = 3,
snap_max_distance: float = 300.0,
min_trip_distance: float = 3_000.0,
max_trip_distance: float = 100_000.0,
max_dist_to_line_ratio: float = 4.0,
edge_duration_attr: str = "duration_calibrated",
eligible_node_ids=None,
eligible_node_flag: str | None = None,
) -> CalibrationResult:
"""Iteratively calibrate per-edge durations against observed trip times.
See module docstring for the model. Each iteration:
1. Writes per-edge duration to `edge_duration_attr` from the current
coefficients (or initial guesses on iteration 1).
2. Routes each ground-truth trip on those weights, aggregating features
along the path.
3. Fits an OLS model of `time_measured` ~ baseline_time + features +
endpoint terms.
4. Updates coefficients to the OLS fit.
Args:
graph: routable networkx graph. Must carry `length` and
`baseline_speed_attr` on every edge, plus every attribute named
in the feature dicts.
ground_truth: DataFrame with columns `orig_x`, `orig_y`, `dest_x`,
`dest_y`, `time_measured` (seconds). Optional `dist_measured`
enables the dist-ratio filter. Optional `dist_line` is computed
from coords if not provided.
baseline_speed_attr: per-edge speed in km/h (e.g. from
`osmnx.add_edge_speeds`). Not modified by this function.
multiplier_features: `{edge_attr: initial_coef}`. Each scales the
baseline duration (`new_dur = old_dur · (1 + Σ coef · feat)`).
Use for density-like features.
additive_route_features: `{edge_attr: initial_coef}`. Each contributes
`coef · feat_value` seconds per edge (summed along path). Use for
intersection counts, elevation gain, etc.
additive_endpoint_features: `{node_attr: initial_coef}`. Each adds
`coef · value_at_origin + coef · value_at_destination` to total
trip duration. Use for snap distance, local density at endpoints.
constant: include an intercept in the OLS fit.
n_iterations: number of route-fit cycles. 2-3 usually converges.
snap_max_distance: drop trips where origin or destination is farther
than this from any network node (metres).
min_trip_distance: drop trips with `dist_line` below this (metres).
max_trip_distance: drop trips with `dist_line` above this (metres).
max_dist_to_line_ratio: if `dist_measured` is present, drop trips
where `dist_measured / dist_line` exceeds this (long detours are
usually data noise).
edge_duration_attr: name of the per-edge duration attribute written
on `graph` (overwritten each iteration).
eligible_node_ids: optional set / list / Index of node IDs to restrict
trip-endpoint snap targets to. Forwarded to
`snap_to_network_nodes`. Typically `prepared.snap_eligible_nodes`
from `routing_prep.prepare_network` — prevents trips from
snapping to trapped nodes and contaminating the calibration fit.
eligible_node_flag: alternative to `eligible_node_ids` — name of a
per-node bool attribute on `graph` marking eligible snap targets
(e.g., `prepared.snap_eligible_flag`). Ignored if
`eligible_node_ids` is also given.
Returns:
`CalibrationResult` — see its docstring.
Raises:
ValueError: if any required column is missing or every trip filters
out before fitting.
"""
try:
import statsmodels.api as sm
except ImportError as e:
raise ImportError(
"calibrate_edge_weights needs `statsmodels` (install with "
"`pip install 'aperta[examples]'` or `pip install statsmodels`)."
) from e
multiplier_features = dict(multiplier_features or {})
additive_route_features = dict(additive_route_features or {})
additive_endpoint_features = dict(additive_endpoint_features or {})
required = {"orig_x", "orig_y", "dest_x", "dest_y", "time_measured"}
missing = required - set(ground_truth.columns)
if missing:
raise ValueError(f"`ground_truth` is missing required columns: {sorted(missing)}")
logging.info(
f"Calibration: {len(ground_truth):,} input trips, "
f"{len(multiplier_features)} multiplier + "
f"{len(additive_route_features)} additive-route + "
f"{len(additive_endpoint_features)} additive-endpoint features."
)
# 1. Pre-snap + filter once — depends only on the graph topology, not on
# the (iteratively changing) edge weights.
legs = _filter_and_snap_legs(
ground_truth,
graph,
min_trip_distance=min_trip_distance,
max_trip_distance=max_trip_distance,
max_dist_to_line_ratio=max_dist_to_line_ratio,
snap_max_distance=snap_max_distance,
eligible_node_ids=eligible_node_ids,
eligible_node_flag=eligible_node_flag,
)
if len(legs) == 0:
raise ValueError("No trips remain after snap + filter.")
# 2. Baseline per-edge duration (length / speed) computed once — feeds
# into every iteration's edge-duration formula.
baseline_duration = _baseline_edge_duration(graph, baseline_speed_attr)
# 4. Iterate: apply current coefs → route → fit → update coefs.
# α (baseline scale) starts at 1.0; multiplier/additive/endpoint
# coefs start at user-supplied initial values. After each OLS fit,
# α + coefs are read directly from the fit (NO cumulative rescaling
# of baseline_duration — that was a confusing earlier design that
# made α drift across iterations even when the model was stable).
alpha = 1.0
cur_mult = dict(multiplier_features)
cur_add = dict(additive_route_features)
cur_end = dict(additive_endpoint_features)
# Aggregation per feature: multiplier features get length-weighted-avg
# (so they enter as a speed-like correction); additive route features
# get summed along the path.
edge_feature_aggs: dict[str, str] = {
**{f: "length_weighted" for f in cur_mult},
**{f: "sum" for f in cur_add},
}
iter_log_rows = []
fit_result = None
routed = None
feat_cols: list[str] = []
kinds: list[str] = []
for iteration in range(1, n_iterations + 1):
# 4a. Write per-edge duration into the graph (scipy CSR is rebuilt
# per call inside shortest_path_metrics_one_to_one — cheap relative
# to the actual Dijkstras).
_apply_edge_durations(
graph, baseline_duration, alpha, cur_mult, cur_add, edge_duration_attr
)
# 4b. Route every trip + aggregate features along the path.
routed = routing.shortest_path_metrics_one_to_one(
graph,
list(legs.index),
legs["nx_node_orig"],
legs["nx_node_dest"],
weight=edge_duration_attr,
length_attr="length",
edge_features=edge_feature_aggs,
)
routed = _join_endpoint_features(routed, legs, graph, list(cur_end))
# 4c. OLS fit.
X, feat_cols, kinds = _build_design_matrix(
routed, list(cur_mult), list(cur_add), list(cur_end), constant
)
y = legs.loc[routed.index, "time_measured"]
valid = X.notna().all(axis=1) & y.notna()
X_f, y_f = X[valid], y[valid]
if len(y_f) < len(feat_cols) + 1:
raise ValueError(
f"Iteration {iteration}: only {len(y_f)} valid rows for "
f"{len(feat_cols)} feature columns — calibration ill-posed."
)
fit_result = sm.OLS(y_f, X_f).fit()
# 4d. Update coefficient state for next iteration directly from the
# OLS fit (no rescaling — coefs are in the units of the model
# equation; the per-edge formula in _apply_edge_durations uses
# them as-is).
coefs = fit_result.params
alpha = float(coefs["baseline_time"])
cur_mult = {f: float(coefs[f"{f}__mult"]) for f in cur_mult}
cur_add = {f: float(coefs[f]) for f in cur_add}
# Endpoint coefs are stored per (feature, side); the per-edge
# formula doesn't use them. Average the two sides so cur_end stays
# a single-value dict (only relevant for the next iteration's
# design matrix, which rebuilds the per-side columns anyway).
cur_end = {f: (float(coefs[f"{f}_orig"]) + float(coefs[f"{f}_dest"])) / 2 for f in cur_end}
edge_feature_aggs = {
**{f: "length_weighted" for f in cur_mult},
**{f: "sum" for f in cur_add},
}
# 4e. Iteration log.
pred = fit_result.fittedvalues
rmse_iter = float(np.sqrt(((pred - y_f) ** 2).mean()))
iter_log_rows.append(
{
"iteration": iteration,
"r_squared": float(fit_result.rsquared),
"rmse": rmse_iter,
"n_used": int(len(y_f)),
"alpha": alpha,
}
)
logging.info(
f" Iter {iteration}/{n_iterations}: R²={fit_result.rsquared:.4f}, "
f"RMSE={rmse_iter:.1f} s, n={len(y_f):,}, α={alpha:.3f}"
)
assert fit_result is not None and routed is not None
# 5. Write final per-edge duration (with the LAST iteration's coefs).
_apply_edge_durations(graph, baseline_duration, alpha, cur_mult, cur_add, edge_duration_attr)
# 6. Assemble outputs.
coef_df = pd.DataFrame(
{
"kind": kinds,
"coef": fit_result.params.values,
"p": fit_result.pvalues.values,
},
index=feat_cols,
)
# Mean effect (coef × mean of column) — same convention as lumos.
X, _, _ = _build_design_matrix(
routed,
list(multiplier_features),
list(additive_route_features),
list(additive_endpoint_features),
constant,
)
y = legs.loc[routed.index, "time_measured"]
valid = X.notna().all(axis=1) & y.notna()
coef_df["mean_effect"] = [
float((X.loc[valid, c] * coef_df.loc[c, "coef"]).mean()) for c in coef_df.index
]
coef_df = coef_df.round(4)
predicted = pd.Series(fit_result.fittedvalues, index=y[valid].index, name="predicted_time")
observed = y[valid].rename("observed_time")
dist_routed = routed.loc[valid.index[valid], "distance"].rename("routed_distance")
rmse_overall = float(np.sqrt(((predicted - observed) ** 2).mean()))
rmse_band = _rmse_by_distance(observed, predicted, legs["dist_line"])
return CalibrationResult(
coefficients=coef_df,
r_squared=float(fit_result.rsquared),
n_used=int(valid.sum()),
predicted_times=predicted,
observed_times=observed,
routed_distances=dist_routed,
rmse=rmse_overall,
rmse_by_distance=rmse_band,
edge_duration_attr=edge_duration_attr,
iter_log=pd.DataFrame(iter_log_rows).set_index("iteration"),
)
# --- Traffic-counter calibration -----------------------------------------
#
# Calibration of a modeled traffic-flow estimate (e.g. the flows
# output from `traffic_flows.nested_node_sample` + betweenness) against
# observed point counters. Two primitives:
#
# * `snap_counters_to_edges` — assign each counter to the right network
# edge using a bearing-aware nearest-line match. The "right edge"
# part is critical: a counter sits next to two or more parallel
# edges (opposite directions, service roads, frontage roads) and
# naïve nearest-line picks the wrong one most of the time.
#
# * `evaluate_against_counters` — compute correlation R², regression
# slope, and RMSE between modeled and observed AADT on the snapped
# edges. R² is scale-invariant (use it to pick distribution-shape
# params); slope tells the caller how to rescale absolute volumes
# (e.g. derive `trips_per_person_per_day`).
#
# Together these let a notebook do simple coordinate-descent calibration:
# vary one parameter at a time, re-simulate flows, evaluate, plot the
# error curve, user picks the minimum. The library doesn't ship a
# coordinate-descent driver — too project-specific (simulation cost,
# parameter set, stopping criterion vary too much).
[docs]
def snap_counters_to_edges(
counters: gpd.GeoDataFrame,
graph: nx.MultiDiGraph,
*,
search_radius: float | pd.Series = 50.0,
bearing_tol_deg: float = 20.0,
bearing_column: str = "bearing_deg",
eligible_edges: Callable[[pd.Series, gpd.GeoDataFrame], gpd.GeoDataFrame] | None = None,
bidirectional: bool | None = None,
) -> pd.DataFrame:
"""Snap directional traffic counters to the correct network edges.
Counters typically sit next to several parallel candidate edges (opposite
directions on the same road; service roads; frontage roads), so naïve
nearest-line matching picks the wrong edge most of the time. This
function adds a **bearing tolerance** filter — only edges whose local
bearing matches the counter's `bearing_deg` (within `bearing_tol_deg`)
are eligible. For directed graphs the bearing comparison is directional
(a counter at bearing 90° won't snap to an edge pointing at 270°),
which correctly assigns the two counters of a two-way road to the two
directional edges.
Uses `d['geometry']` from every edge — guaranteed by
`consolidate_intersections`. Edges without a `geometry` attribute
(e.g. raw OSMnx graphs with `simplify=True`) are silently skipped;
consolidate first or call `osmnx.graph_to_gdfs(..., fill_edge_geometry=True)`.
Args:
counters: GeoDataFrame of point geometries with a `bearing_column`
(degrees, OSM/north-clockwise convention). Same CRS as the
graph node coordinates.
graph: routable nx graph. Edge attributes must include `geometry`
(LineString) and whatever `eligible_edges` reads.
search_radius: max cartesian distance for candidate edges (CRS
units). Pass a scalar for one global radius or a `pd.Series`
aligned to `counters.index` for per-counter radii (e.g. wider
for highway counters which sit further from the carriageway).
bearing_tol_deg: max angular difference between counter bearing
and local edge bearing at the snap point.
bearing_column: counter column holding the directional bearing
(default `'bearing_deg'`).
eligible_edges: optional `(counter_row, candidate_edges_gdf) -> subset`
callback. Use to restrict matches by class — typically a
highway counter only matches highway edges, a local counter
only matches local edges. Forwarded to
[[geo_mapping.map_points_to_filtered_lines]].
bidirectional: how to compare bearings. `True` collapses opposite
bearings (counter at 90° matches edges at 90° AND 270°) —
correct for undirected graphs where one edge represents both
directions of a road. `False` is directional — correct for
directed graphs (the default `nx.MultiDiGraph`). `None`
auto-detects from `graph.is_directed()`.
Returns:
DataFrame indexed like `counters` with columns:
- `u`, `v`, `k`: matched edge ID (or `pd.NA` if no acceptable
match within radius);
- `snap_dist`: cartesian distance counter → edge (or `NaN`);
- `dist_along`: along-edge distance from edge start to nearest
point on edge (or `NaN`).
Unmatched counters get all-NA rows — drop with `result.dropna(subset=['u'])`.
"""
if bidirectional is None:
bidirectional = not graph.is_directed()
if bearing_column not in counters.columns:
raise ValueError(
f"`counters` is missing required column `{bearing_column!r}` "
f"(have: {list(counters.columns)})"
)
# Build edges GDF from graph. Drop edges without geometry — they can't
# be snapped to anyway, and the caller is responsible for consolidating
# / filling geometry beforehand.
edge_records = []
for u, v, k, d in graph.edges(keys=True, data=True):
geom = d.get("geometry")
if geom is None:
continue
rec = dict(d)
rec["u"], rec["v"], rec["k"] = u, v, k
rec["geometry"] = geom
edge_records.append(rec)
if not edge_records:
raise ValueError(
"No edges have a `geometry` attribute. Consolidate the graph "
"via `network_processing.consolidate_intersections` first, or "
"use `osmnx.graph_to_gdfs(..., fill_edge_geometry=True)`."
)
edges_gdf = gpd.GeoDataFrame(edge_records, geometry="geometry", crs=counters.crs)
# Linear integer index so `map_points_to_filtered_lines`'s `line_id`
# outputs lift back to (u, v, k) via a single .iloc lookup.
edges_gdf = edges_gdf.reset_index(drop=True)
def _accept(counter_row, edge_row, ctx) -> bool:
edge_bearing = geo_processing.line_segment_bearing_at(edge_row.geometry, ctx["dist_along"])
if np.isnan(edge_bearing):
return False
diff = geo_processing.angular_diff_deg(
counter_row[bearing_column], edge_bearing, undirected=bidirectional
)
return float(diff) <= bearing_tol_deg
matches = geo_mapping.map_points_to_filtered_lines(
counters,
edges_gdf,
search_radius=search_radius,
eligible_lines=eligible_edges,
accept=_accept,
)
# Lift `line_id` (positional row in edges_gdf) back to (u, v, k).
out = pd.DataFrame(index=counters.index)
matched = matches["line_id"].notna()
out["u"] = pd.NA
out["v"] = pd.NA
out["k"] = pd.NA
if matched.any():
idxs = matches.loc[matched, "line_id"].astype(int).to_numpy()
out.loc[matched, "u"] = edges_gdf.iloc[idxs]["u"].to_numpy()
out.loc[matched, "v"] = edges_gdf.iloc[idxs]["v"].to_numpy()
out.loc[matched, "k"] = edges_gdf.iloc[idxs]["k"].to_numpy()
out["snap_dist"] = matches["distance"]
out["dist_along"] = matches["dist_along"]
n_match = int(matched.sum())
logging.info(
f"snap_counters_to_edges: {n_match:,} of {len(counters):,} counters "
f"matched ({n_match / max(len(counters), 1) * 100:.1f}%); "
f"bidirectional={bidirectional}, tol={bearing_tol_deg}°."
)
return out
[docs]
def evaluate_against_counters(
modeled: pd.Series,
counters: pd.DataFrame,
*,
observed_column: str = "traffic_cars",
) -> dict:
"""Compare modeled per-edge AADT against snapped counter observations.
Args:
modeled: per-edge modeled AADT, indexed by `(u, v, k)` tuples (the
output of `traffic_flows.nested_node_sample` + betweenness +
AADT scaling).
counters: DataFrame with `u`, `v`, `k` columns (from
`snap_counters_to_edges`) and an observed-AADT column. Rows
with NA in `u`/`v`/`k` are dropped (unmatched counters).
observed_column: name of the observed-AADT column (default
`'traffic_cars'`, matching the Swiss counter schema).
Returns:
Dict with:
- `r2`: Pearson correlation² between modeled and observed —
**scale-invariant**, so use this to pick distribution-shape
params (lognormal σ, μ).
- `slope`: slope from a no-intercept regression
`modeled = slope · observed`. Tells you how to rescale
absolute volumes — e.g. multiply `trips_per_person_per_day`
by `1 / slope` to bring the modeled total in line with
counters.
- `rmse`: root-mean-square error on the matched set, in
counter-units (veh/day).
- `n_matched`: number of counters used in the comparison.
- `merged`: DataFrame with `observed`, `modeled`, `(u, v, k)`
for every matched counter — convenient for scatter plots.
"""
matched = counters.dropna(subset=["u", "v", "k"]).copy()
if observed_column not in matched.columns:
raise ValueError(
f"`counters` is missing observed column `{observed_column!r}` "
f"(have: {list(matched.columns)})"
)
# Build the index lookup. `modeled` may be a Series with a MultiIndex
# or a tuple-keyed flat index — handle both via .reindex with tuples.
keys = list(zip(matched["u"].astype(int), matched["v"].astype(int), matched["k"].astype(int)))
if isinstance(modeled.index, pd.MultiIndex):
modeled_values = modeled.reindex(keys).to_numpy()
else:
modeled_values = np.array([modeled.get(k, np.nan) for k in keys])
matched["modeled"] = modeled_values
matched["observed"] = matched[observed_column].astype(float)
matched = matched.dropna(subset=["modeled", "observed"])
if len(matched) == 0:
return {"r2": np.nan, "slope": np.nan, "rmse": np.nan, "n_matched": 0, "merged": matched}
obs = matched["observed"].to_numpy()
mod = matched["modeled"].to_numpy()
r = float(np.corrcoef(obs, mod)[0, 1]) if np.std(obs) > 0 and np.std(mod) > 0 else np.nan
r2 = r**2 if not np.isnan(r) else np.nan
# No-intercept regression: slope = Σ(x·y) / Σ(x²) with x=observed.
denom = float((obs**2).sum())
slope = float((obs * mod).sum() / denom) if denom > 0 else np.nan
rmse = float(np.sqrt(((mod - obs) ** 2).mean()))
return {
"r2": r2,
"slope": slope,
"rmse": rmse,
"n_matched": int(len(matched)),
"merged": matched[["u", "v", "k", "observed", "modeled"]],
}