Source code for aperta.geo_processing

"""Geometric and raster-sampling utilities used across aperta.

Two broad concerns:

1. **Geometric helpers** — building hectare / H3 grids, generating point or
   polygon geometries from coordinate data, projecting between coordinate
   reference systems, computing per-cell spatial neighborhoods. Distinct
   from `geo_mapping`, which is specifically about mapping data between
   geo units.

2. **Raster sampling** — `sample_raster_at_points` reads values from a
   raster file at given point coordinates (used for elevation lookups in
   `topography.py`). Requires the optional `topo` extra (`rasterio`).
"""

import geopandas as gpd
import numpy as np
import pandas as pd


[docs] def remove_duplicate_indices(df: pd.DataFrame) -> pd.DataFrame: """Remove duplicate indices from DataFrame or Series. Often useful after spatial join.""" df = df.loc[~df.index.duplicated(keep="first"), :] return df
def _line_segments(line) -> np.ndarray | None: """Stack of `(dx, dy, length)` rows for every segment, across all parts. Handles both `LineString` and `MultiLineString` (multipart parts are concatenated in stored order). Empty / degenerate inputs return `None`. """ if line is None or line.is_empty: return None parts = list(line.geoms) if hasattr(line, "geoms") else [line] rows = [] for part in parts: if part is None or part.is_empty: continue coords = np.asarray(part.coords) if len(coords) < 2: continue deltas = np.diff(coords[:, :2], axis=0) seg_lens = np.hypot(deltas[:, 0], deltas[:, 1]) rows.append(np.column_stack([deltas, seg_lens])) if not rows: return None return np.vstack(rows) def _bearing_at(segs: np.ndarray, dist_along: float) -> float: """Compass bearing of the segment containing `dist_along` in a `_line_segments` array.""" seg_lens = segs[:, 2] total = seg_lens.sum() if total == 0: return float("nan") pos = max(0.0, min(float(dist_along), total)) i = int(np.searchsorted(np.cumsum(seg_lens), pos, side="right")) i = min(i, len(seg_lens) - 1) dx, dy = segs[i, 0], segs[i, 1] if dx == 0 and dy == 0: return float("nan") return float(np.degrees(np.arctan2(dx, dy)) % 360.0)
[docs] def line_bearings_deg(geoms: gpd.GeoSeries) -> pd.Series: """Compass bearing (degrees, [0, 360)) at each line's midpoint. For each (Multi)LineString, the polyline segment containing the half-arclength point is identified, and the bearing of *that segment* is returned — i.e. the local direction of travel at the midpoint, not the first→last chord. For straight single-segment lines the two are identical; for curved polylines they diverge. MultiLineString parts are treated as if concatenated end-to-end in their stored order. `0°` = +y (North), increasing clockwise (`90°` = East, `180°` = South, `270°` = West). The bearing reflects the line's stored vertex order — the line A→B has the opposite bearing of B→A. For undirected geometries take `bearings % 180`. Lines must be in a projected CRS with x = easting, y = northing (e.g. LV95 / EPSG:2056); a geographic CRS would introduce longitude-convergence error. Degenerate or empty geometries (no length, missing, or fewer than 2 vertices) return `NaN`. """ def _bearing(line) -> float: segs = _line_segments(line) if segs is None: return float("nan") return _bearing_at(segs, segs[:, 2].sum() / 2) return pd.Series([_bearing(g) for g in geoms], index=geoms.index, dtype="float64")
[docs] def line_segment_bearing_at(line, dist_along: float) -> float: """Compass bearing of the polyline segment containing an arbitrary `dist_along`. Same convention as `line_bearings_deg` but at any arclength position, not just the midpoint. Useful after snapping a point to a line, when you want the line's *local* direction at the snap point (e.g. to compare a traffic counter's bearing against the local edge bearing). `dist_along` is in the line's CRS units (meters in LV95). Clamped to `[0, line.length]`. MultiLineString parts are treated as concatenated end-to-end. Empty / degenerate inputs return `NaN`. """ segs = _line_segments(line) if segs is None: return float("nan") return _bearing_at(segs, dist_along)
[docs] def angular_diff_deg(a, b, undirected: bool = False): """Smallest unsigned difference (in degrees) between two compass bearings. Default (`undirected=False`): treats inputs as directions of travel — returns the circular distance in `[0, 180]`. E.g. `angular_diff_deg(45, 350)` is `55`, not `305`; opposing directions (90° vs 270°) give `180`. `undirected=True`: treats inputs as orientations of an undirected line — additionally folds across 180°, so A→B and B→A are equivalent. Returns `[0, 90]`. Use this when matching e.g. a directed traffic counter to an undirected road segment that carries traffic both ways: a counter bearing `90°` aligns with edges at *both* `90°` and `270°`. Accepts scalars or aligned numpy/pandas arrays; return type follows `np.minimum` (scalar in → scalar-like out). """ d = np.abs(np.asarray(a) - np.asarray(b)) % 360.0 diff = np.minimum(d, 360.0 - d) if undirected: diff = np.minimum(diff, 180.0 - diff) return diff
[docs] def sum_within_radius( gdf: gpd.GeoDataFrame, cols: list[str], radius: int | float, *, return_densities: bool = False, add_filled_densities: bool = False, ) -> pd.DataFrame: """Same-set neighbourhood sum: for each cell in `gdf`, sum the values of `cols` over all cells (including itself) within `radius`. For the cross-set case (`targets × sources` with two different GeoDataFrames) use [[cross_sum_within_radius]] instead. The regular sum, divided by the circular query area when `return_densities=True`, gives per-km² density. The "filled" variant additionally divides by the *occupied* area (cells where at least one value in `cols` is > 0) — useful where occupancy is sparse and the circular-area normaliser dilutes the signal (e.g. employment concentrated at an airport). The CRS of `gdf` must represent meters if densities are returned. Densities are returned per km². Implementation: builds a `scipy.spatial.KDTree` on cell centroids and constructs a sparse 0/1 adjacency matrix from `query_ball_point(radius)`; each cell's own row is implicitly included (since `dist(p, p) = 0 ≤ radius`). Per-cell sums are then `adj @ values` in one vectorised pass. """ if not return_densities and add_filled_densities: raise ValueError("`add_filled_densities` can only be True when `return_densities` is True.") from scipy.sparse import csr_matrix from scipy.spatial import KDTree centroids = gdf.geometry.centroid xy = np.column_stack([centroids.x.to_numpy(dtype=float), centroids.y.to_numpy(dtype=float)]) tree = KDTree(xy) idx_lists = tree.query_ball_point(xy, r=radius) # Sparse 0/1 adjacency matrix: row i has 1s at the neighbour positions # of cell i (within `radius`, inclusive of i itself). `adj @ values` # then computes per-cell spatial sums in one vectorised pass. n = len(gdf) row_lens = np.fromiter((len(nbrs) for nbrs in idx_lists), dtype=np.int64, count=n) cols_idx = np.concatenate([np.asarray(nbrs, dtype=np.int64) for nbrs in idx_lists]) rows = np.repeat(np.arange(n, dtype=np.int64), row_lens) data = np.ones(len(rows), dtype=float) adj = csr_matrix((data, (rows, cols_idx)), shape=(n, n), dtype=float) values = gdf[cols].to_numpy(dtype=float) spatial_sums = np.asarray(adj @ values) res = pd.DataFrame(index=gdf.index) circular_area_km2 = (np.pi * radius**2) / 1e6 out_values = spatial_sums / circular_area_km2 if return_densities else spatial_sums df = pd.DataFrame(out_values, index=gdf.index, columns=[f"{col}_r{radius}" for col in cols]) res = res.join(df) if add_filled_densities: # "Filled" densities: divide each cell's spatial sum by the total # area of *occupied* cells within its neighbourhood, instead of the # circular query area. Useful where occupancy is sparse — e.g. # employment concentrated at an airport with empty cells around. cell_areas_km2 = gdf.geometry.area.to_numpy(dtype=float) / 1e6 if len(np.unique(cell_areas_km2.round(decimals=4))) > 1: raise NotImplementedError( "Cells must currently be same size to calculate filled densities." ) is_filled = (gdf[cols].sum(axis=1) > 0).to_numpy() # `adj @ is_filled` = per-cell count of filled neighbours. n_filled_neighbours = np.asarray(adj @ is_filled.astype(float)) fill_cell_areas = cell_areas_km2 * n_filled_neighbours f = fill_cell_areas > 0 filled_values = spatial_sums.copy() filled_values[f, :] = filled_values[f, :] / fill_cell_areas[f, np.newaxis] filled_values[~f, :] = 0 df_filled = pd.DataFrame( filled_values, index=gdf.index, columns=[f"{col}_r{radius}_filled" for col in cols] ) res = res.join(df_filled) res = res.fillna(0) return res
[docs] def cross_sum_within_radius( targets: gpd.GeoDataFrame | gpd.GeoSeries, sources: gpd.GeoDataFrame | gpd.GeoSeries, radius: float, *, weight_column: str | None = None, return_density: bool = False, name: str = "aggregate", ) -> pd.Series: """Cross-set neighbourhood sum: for each target geometry, count source geometries (or sum a `weight_column` over them) within `radius` of the target's centroid. For the same-set case (one `gdf`, queried against itself with self-inclusion, multiple value columns at once, optional "filled-area" densities) use [[sum_within_radius]] instead. The two helpers are deliberately separate: same-set and cross-set queries differ in input shape (one gdf vs two) and in optimal backend (single sparse-matrix multiply vs per-target KDTree query). Backend: scipy `cKDTree` on source coordinates — O(N_src log N_src) one- time build, O(log N_src + k) per query. Both `targets` and `sources` must share a metric CRS for densities (and the radius) to be meaningful in real-world units. The function uses the centroid of each geometry, so non-point inputs (polygons, lines) work fine — their centroid is queried. Args: targets: GeoDataFrame or GeoSeries. Centroid of each geometry is the query point; `targets.index` becomes the output Series index. sources: GeoDataFrame or GeoSeries. Centroid of each source is the point that's counted / summed. If `weight_column` is given, `sources` must be a GeoDataFrame carrying that column. radius: query radius in CRS units (typically metres). weight_column: name of the column on `sources` to sum. `None` (default) counts source geometries within radius instead. return_density: if True, divide aggregates by `π · radius²` (the circular query area). Caller is responsible for ensuring CRS units are metric. name: name for the returned Series. Returns: `pd.Series` indexed by `targets.index`. Targets with no sources in range get `0` (or `0.0` density). """ from scipy.spatial import KDTree src_centroids = sources.geometry.centroid src_xy = np.column_stack( [src_centroids.x.to_numpy(dtype=float), src_centroids.y.to_numpy(dtype=float)] ) tgt_centroids = targets.geometry.centroid tgt_xy = np.column_stack( [tgt_centroids.x.to_numpy(dtype=float), tgt_centroids.y.to_numpy(dtype=float)] ) tree = KDTree(src_xy) if weight_column is None: # `return_length=True` (scipy ≥1.6) skips materialising the per-query # index lists — much faster for large source sets. agg = np.asarray( tree.query_ball_point(tgt_xy, r=radius, return_length=True), dtype=float, ) else: if not isinstance(sources, gpd.GeoDataFrame) or weight_column not in sources.columns: raise ValueError( f"`sources` must be a GeoDataFrame containing column " f"{weight_column!r} when `weight_column` is given." ) weights = sources[weight_column].to_numpy(dtype=float) idx_lists = tree.query_ball_point(tgt_xy, r=radius) agg = np.fromiter( (float(weights[idxs].sum()) if len(idxs) else 0.0 for idxs in idx_lists), dtype=float, count=len(idx_lists), ) if return_density: agg = agg / (np.pi * radius**2) return pd.Series(agg, index=targets.index, name=name)
[docs] def sample_raster_at_points( points: gpd.GeoDataFrame | gpd.GeoSeries, raster_path, *, band: int = 1, method: str = "bilinear", name: str = "raster_value", ) -> pd.Series: """Sample raster values at point centroids — vectorized, single-pass. Reads the raster once and indexes into the array with either bilinear interpolation (default — right for continuous fields like elevation; smooths pixel-boundary quantization noise) or nearest-neighbour (for categorical rasters like land cover or zoning, where blending values is meaningless). For ~100k+ points this is orders of magnitude faster (and far lighter on memory) than `rasterio.Dataset.sample`, which does a GDAL `RasterIO()` call per point and is the bottleneck inside `osmnx.elevation.add_node_elevations_raster`. Points whose `(row, col)` falls outside the raster — or whose pixel equals the raster's `nodata` value — get `NaN`. (For `'bilinear'`, any of the 4 surrounding pixels being nodata propagates to `NaN`.) The caller is responsible for ensuring `points.crs` matches the raster CRS; the function does not reproject. Polygon / line inputs are sampled at their centroid. Args: points: GeoDataFrame or GeoSeries; sampling location is each geometry's centroid; `points.index` becomes the output index. raster_path: path to a single-band raster (GeoTIFF, VRT, etc.). band: 1-based band index. method: `'bilinear'` (default) interpolates the 4 surrounding pixels weighted by fractional distance; `'nearest'` returns the value of the pixel containing the point. name: name for the returned Series. Returns: `pd.Series` of floats indexed by `points.index`. """ import rasterio with rasterio.open(raster_path) as src: arr = src.read(band).astype(float) inv_transform = ~src.transform nodata = src.nodata height, width = src.height, src.width # Mask nodata up-front so both branches see NaN — bilinear's weighted # sum will then naturally propagate NaN when any contributing pixel # is invalid. if nodata is not None: arr = np.where(arr == nodata, np.nan, arr) centroids = points.geometry.centroid xs = centroids.x.to_numpy(dtype=float) ys = centroids.y.to_numpy(dtype=float) # Affine `~transform` maps (x, y) -> (col_float, row_float). The +/-0.5 # shift converts from pixel-corner to pixel-centre coordinates, which # is what bilinear interpolation expects: a point at the pixel's # centre should return that pixel's value exactly, not a blend. cols_f, rows_f = inv_transform * (xs, ys) out = np.full(len(points), np.nan, dtype=float) if method == "nearest": cols = np.floor(cols_f).astype(int) rows = np.floor(rows_f).astype(int) valid = (cols >= 0) & (cols < width) & (rows >= 0) & (rows < height) if valid.any(): out[valid] = arr[rows[valid], cols[valid]] elif method == "bilinear": from scipy.ndimage import map_coordinates # `map_coordinates` expects (row, col) at pixel CENTRES (an integer # coord = that pixel's centre). Subtract 0.5 to convert from corner- # based pixel coords. mode='constant', cval=nan returns NaN out-of- # bounds; NaN inputs propagate to NaN outputs via the weighted sum. valid = (cols_f >= 0) & (cols_f <= width) & (rows_f >= 0) & (rows_f <= height) if valid.any(): coords = np.vstack([rows_f[valid] - 0.5, cols_f[valid] - 0.5]) out[valid] = map_coordinates(arr, coords, order=1, mode="constant", cval=np.nan) else: raise ValueError(f"method must be 'bilinear' or 'nearest', got {method!r}") return pd.Series(out, index=points.index, name=name)
[docs] def build_h3_grid( polygon, resolution: int, *, polygon_crs: str = "EPSG:4326", target_crs: str | None = None, id_column: str = "cell_id", ) -> gpd.GeoDataFrame: """Build a GeoDataFrame of H3 hex cells covering `polygon`. One row per H3 cell whose centre lies inside `polygon`. Geometry is the closed hex polygon. The H3 library operates in EPSG:4326 (WGS84); the input polygon is reprojected if `polygon_crs != 'EPSG:4326'`, and the output is optionally reprojected to `target_crs`. For nested tiers (e.g. cells at H3 res 10, zones at res 8), call once per resolution OR derive the coarser tier by `h3.cell_to_parent` on the finer one (cheaper, and the nesting is exact). Pure replacement of the private `_h3_cells_for_polygon` / `_h3_cell_to_polygon` helpers seen across aperta example notebooks. Args: polygon: input area as a shapely Polygon or MultiPolygon. resolution: H3 resolution (0–15). Lower = larger cells. polygon_crs: CRS of the input polygon (default `'EPSG:4326'`). target_crs: optional output CRS (cells are reprojected if given). id_column: name for the H3 cell-ID index (default `'cell_id'`). Returns: GeoDataFrame indexed by H3 cell ID (string), with a `geometry` column. CRS is `target_crs` if given, else `'EPSG:4326'`. """ import h3 from shapely.geometry import Polygon as ShPolygon if polygon_crs != "EPSG:4326": polygon = gpd.GeoSeries([polygon], crs=polygon_crs).to_crs("EPSG:4326").iloc[0] parts = list(polygon.geoms) if polygon.geom_type == "MultiPolygon" else [polygon] cell_ids: list[str] = [] for part in parts: # shapely uses (lng, lat); H3 wants (lat, lng). exterior = [(lat, lng) for lng, lat in part.exterior.coords] interiors = [[(lat, lng) for lng, lat in ring.coords] for ring in part.interiors] h3_poly = h3.LatLngPoly(exterior, *interiors) cell_ids.extend(h3.h3shape_to_cells(h3_poly, resolution)) cell_ids = sorted(set(cell_ids)) geoms = [ShPolygon([(lng, lat) for lat, lng in h3.cell_to_boundary(c)]) for c in cell_ids] gdf = gpd.GeoDataFrame( {"geometry": geoms}, index=pd.Index(cell_ids, name=id_column), crs="EPSG:4326", ) if target_crs is not None: gdf = gdf.to_crs(target_crs) return gdf