Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 39 additions & 0 deletions functions-python/pmtiles_builder/src/agencies_processor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
import csv

from base_processor import BaseProcessor
from csv_cache import AGENCY_FILE


class AgenciesProcessor(BaseProcessor):
def __init__(
self,
csv_cache,
logger=None,
):
super().__init__(AGENCY_FILE, csv_cache, logger)
self.agencies = {}

def process_file(self):
with open(self.filepath, "r", encoding=self.encoding, newline="") as f:
header = f.readline()
if not header:
return
columns = next(csv.reader([header]))

agency_id_index = self.csv_cache.get_index(columns, "agency_id")
agency_name_index = self.csv_cache.get_index(columns, "agency_name")

for line in f:
if not line.strip():
continue

row = self.csv_parser.parse(line)
agency_id = self.csv_cache.get_safe_value_from_index(
row, agency_id_index
)
agency_name = self.csv_cache.get_safe_value_from_index(
row, agency_name_index
)

if agency_id:
self.agencies[agency_id] = agency_name
69 changes: 69 additions & 0 deletions functions-python/pmtiles_builder/src/base_processor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
from fast_csv_parser import FastCsvParser
from shared.helpers.utils import detect_encoding
from shared.helpers.runtime_metrics import track_metrics
import os


class BaseProcessor:
"""
Minimal base class for processors that read a GTFS CSV and write derived outputs.

Responsibilities
- Resolve the absolute path of the input file via the shared CsvCache.
- Detect file encoding and initialize a fast CSV parser.
- Provide a safe early-return when the input file is absent (some GTFS files are optional).
- Delegate the actual line-by-line work to subclasses via `process_file()`.

Lifecycle contract
- Subclasses should override `process_file(self)` — do not call it directly.

Notes on flags
- `no_download` and `no_delete` are orchestration hints used by the caller (e.g., the builder) to
decide whether to download the source file and whether to delete it afterward. The base class does
not use these flags directly; they are honored by the orchestrator.
"""

def __init__(
self, filename, csv_cache, logger=None, no_download=False, no_delete=False
):
self.filename = filename
self.csv_cache = csv_cache
self.logger = logger or csv_cache.logger
self.no_download = no_download
self.no_delete = no_delete

# Will be populated during `process()`
self.filepath = None
self.csv_parser = None
self.encoding = None

@track_metrics(metrics=("time", "memory", "cpu"))
def process(self):
"""
Entry point called by orchestrators to run a processor in a safe, uniform way.
\
"""
self.filepath = self.csv_cache.get_path(self.filename)
# If the target file does not exist in the workdir, skip processing.
# This avoids exceptions for optional files and keeps pipelines resilient.
if not os.path.exists(self.filepath):
# We don't return an Exception here because the presence of mandatory files has been verified elsewhere.
self.logger.debug(
"File not present, skipping processing: %s", self.filepath
)
return
self.csv_parser = FastCsvParser()
self.encoding = detect_encoding(filename=self.filepath, logger=self.logger)
self.logger.debug("Begin processing file %s", self.filename)
self.process_file()

def process_file(self):
"""
Hook for subclasses to implement the actual processing logic.

Contract
- May assume `self.filepath`, `self.csv_parser`, and `self.encoding` are initialized.
- Should handle empty files gracefully (e.g., by returning early).
- Should not raise on benign format issues where possible; prefer logging and continue.
"""
pass
208 changes: 51 additions & 157 deletions functions-python/pmtiles_builder/src/csv_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,20 +13,16 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
import csv
import logging
import os
import subprocess
from pathlib import Path
from typing import TypedDict, List, Dict
from typing import TypedDict, List

from gtfs import stop_txt_is_lat_log_required
from pympler import asizeof

from shared.helpers.logger import get_logger
from shared.helpers.transform import get_safe_value_from_csv, get_safe_float_from_csv

from shared.helpers.utils import detect_encoding
from shared.helpers.transform import get_safe_value, get_safe_float, get_safe_int

STOP_TIMES_FILE = "stop_times.txt"
SHAPES_FILE = "shapes.txt"
Expand All @@ -43,20 +39,13 @@ class ShapeTrips(TypedDict):

def get_volume_size(mountpoint: str):
"""
Returns the total size of the specified filesystem mount point in a human-readable format.

This function uses the `df` command-line utility to determine the size of the filesystem
mounted at the path specified by `mountpoint`. If the mount point does not exist, the function
prints an error message to the standard error and returns "N/A".
Return the total size of the filesystem at `mountpoint` in a human-readable string (e.g., "10G").

Parameters:
mountpoint: str
The filesystem mount point path to check.
Implementation notes
- Uses the system `df -h` command piped through awk to extract the size column.
- Requires a valid path; when the mountpoint doesn't exist, a warning is logged and "N/A" is returned.
- Intended primarily for diagnostics in logs (does not affect processing logic).

Returns:
str
The total size of the specified filesystem mount point in human-readable format. If the
mount point is not found, returns "N/A".
"""
mp = Path(mountpoint)
if not mp.exists():
Expand All @@ -77,10 +66,18 @@ def get_volume_size(mountpoint: str):

class CsvCache:
"""
CsvCache provides cached access to GTFS CSV files in a specified working directory.
It lazily loads and caches file contents as lists of dictionaries, and offers
helper methods to retrieve relationships between routes, trips, stops, and shapes.
It lazily loads because not all files are necessarily needed.
Lightweight utility for working-directory file paths and safe CSV extraction helpers.

What it provides
- Path resolution: `get_path(filename)` -> absolute path under the configured workdir.
- Safe CSV accessors: helpers to get values/ints/floats from parsed rows by index without raising.
- Column index lookup: helper to map header names to indices safely.
- Diagnostics: optional deep-size logging of objects in DEBUG, and workdir size at initialization.

Notes
- This class does not currently implement an in-memory cache of full CSVs; processors stream files
directly and use these helpers for robust parsing.
- Constants for common GTFS filenames are exposed at module level (e.g., TRIPS_FILE, STOPS_FILE).
"""

def __init__(
Expand All @@ -95,19 +92,11 @@ def __init__(

self.workdir = workdir

self.file_data = {}
self.trip_to_stops: Dict[str, List[str]] = None
self.route_to_trip = None
self.route_to_shape: Dict[str, Dict[str, ShapeTrips]] = None
self.stop_to_route = None
self.stop_to_coordinates = None
self.trips_no_shapes_per_route: Dict[str, List[str]] = {}

self.logger.info("Using work directory: %s", self.workdir)
self.logger.info("Size of workdir: %s", get_volume_size(self.workdir))

def debug_log_size(self, label: str, obj: object) -> None:
"""Log the deep size of an object in bytes when DEBUG is enabled."""
"""Log the deep size of `obj` in bytes when DEBUG is enabled (best-effort, may fall back silently)."""
if self.logger.isEnabledFor(logging.DEBUG):
try:
size_bytes = asizeof.asizeof(obj)
Expand All @@ -116,133 +105,38 @@ def debug_log_size(self, label: str, obj: object) -> None:
self.logger.debug("asizeof Failed to compute size for %s: %s", label, e)

def get_path(self, filename: str) -> str:
"""Return the absolute path for `filename` under the current workdir."""
return os.path.join(self.workdir, filename)

def get_file(self, filename) -> list[dict]:
if self.file_data.get(filename) is None:
self.file_data[filename] = self._read_csv(self.get_path(filename))
self.debug_log_size(f"file data for {filename}", self.file_data[filename])
return self.file_data[filename]

def add_data(self, filename: str, data: list[dict]):
self.file_data[filename] = data

def _read_csv(self, filename) -> list[dict]:
"""
Reads the content of a CSV file and returns it as a list of dictionaries
where each dictionary represents a row.

Parameters:
filename (str): The file path of the CSV file to be read.

Raises:
Exception: If there is an error during file opening or reading. The raised
exception will include the original error message along with the file name.

Returns:
list[dict]: A list of dictionaries, each representing a row in the CSV file.
"""
try:
self.logger.debug("Loading %s", filename)
encoding = detect_encoding(filename, logger=self.logger)
with open(filename, newline="", encoding=encoding) as f:
return list(csv.DictReader(f))
except Exception as e:
raise Exception(f"Failed to read CSV file {filename}: {e}") from e

def clear_trip_from_route(self):
self.route_to_trip = None

def get_shape_from_route(self, route_id) -> Dict[str, ShapeTrips]:
"""
Returns a list of shape_ids with associated trip_ids information with a given route_id from the trips file.
The relationship from the route to the shape is via the trips file.
Parameters:
route_id(str): The route identifier to look up.

Returns:
The corresponding shape id.
Example return value: [{'shape_id1': { 'shape_id': 'shape_id1', 'trip_ids': ['trip1', 'trip2']}},
{'shape_id': 'shape_id2', 'trip_ids': ['trip3']}}]
"""
if self.route_to_shape is None:
self._build_route_to_shape()
return self.route_to_shape.get(route_id, {})

def _build_route_to_shape(self):
self.route_to_shape = {}
for row in self.get_file(TRIPS_FILE):
route_id = get_safe_value_from_csv(row, "route_id")
shape_id = get_safe_value_from_csv(row, "shape_id")
trip_id = get_safe_value_from_csv(row, "trip_id")
if route_id and trip_id:
if shape_id:
route_shapes = self.route_to_shape.setdefault(route_id, {})
shape_trips = route_shapes.setdefault(
shape_id, {"shape_id": shape_id, "trip_ids": []}
)
shape_trips["trip_ids"].append(trip_id)
else:
# Registering the trip without a shape for this route for later retrieval.
trip_no_shapes = (
self.trips_no_shapes_per_route.get(route_id)
if route_id in self.trips_no_shapes_per_route
else None
)
if trip_no_shapes is None:
trip_no_shapes = []
self.trips_no_shapes_per_route[route_id] = trip_no_shapes
trip_no_shapes.append(trip_id)

def clear_shape_from_route(self):
self.route_to_shape = None

def get_trips_without_shape_from_route(self, route_id) -> List[str]:
return self.trips_no_shapes_per_route.get(route_id, [])

def _build_trip_to_stops(self):
self.trip_to_stops = {}
for row in self.get_file(STOP_TIMES_FILE):
trip_id = get_safe_value_from_csv(row, "trip_id")
stop_id = get_safe_value_from_csv(row, "stop_id")
if trip_id and stop_id:
trip_to_stops = self.trip_to_stops.setdefault(trip_id, [])
trip_to_stops.append(stop_id)

def clear_stops_from_trip(self):
self.trip_to_stops = None

def get_stops_from_trip(self, trip_id):
if self.trip_to_stops is None:
self._build_trip_to_stops()
return self.trip_to_stops.get(trip_id, [])

def _build_stop_to_coordinates(self):
self.stop_to_coordinates = {}
for s in self.get_file(STOPS_FILE):
row_stop_id = get_safe_value_from_csv(s, "stop_id")
row_stop_lon = get_safe_float_from_csv(s, "stop_lon")
row_stop_lat = get_safe_float_from_csv(s, "stop_lat")
if row_stop_id is None:
self.logger.warning("Missing stop id: %s", s)
continue
if row_stop_lon is None or row_stop_lat is None:
if stop_txt_is_lat_log_required(s):
self.logger.warning("Missing stop latitude and longitude : %s", s)
else:
self.logger.debug(
"Missing optional stop latitude and longitude : %s", s
)
continue
self.stop_to_coordinates[row_stop_id] = (row_stop_lon, row_stop_lat)

def get_coordinates_for_stop(self, stop_id) -> tuple[float, float] | None:
if self.stop_to_coordinates is None:
self._build_stop_to_coordinates()
return self.stop_to_coordinates.get(stop_id, None)

def clear_coordinate_for_stops(self):
self.stop_to_coordinates = None

def set_workdir(self, workdir):
"""Update the working directory used to resolve file paths (does not move files)."""
self.workdir = workdir

@staticmethod
def get_index(columns, column_name):
"""Return the index of `column_name` in the header list `columns`, or None if absent."""
try:
return columns.index(column_name)
except ValueError:
return None

@staticmethod
def get_safe_value_from_index(columns, index, default_value: str = None):
"""Safely fetch a value from `columns` at `index`, applying default and transform semantics."""
return (
get_safe_value(columns[index], default_value)
if index is not None and index < len(columns)
else default_value
)

@staticmethod
def get_safe_float_from_index(columns, index):
"""Fetch a value and coerce to float using standard transform rules (returns None on invalid)."""
raw_value = CsvCache.get_safe_value_from_index(columns, index)
return get_safe_float(raw_value)

@staticmethod
def get_safe_int_from_index(columns, index):
"""Fetch a value and coerce to int using standard transform rules (returns None on invalid)."""
raw_value = CsvCache.get_safe_value_from_index(columns, index)
return get_safe_int(raw_value)
Loading