Pipeline¶
This notebook demonstrates a streamlined UrbanMapper workflow using the UrbanPipeline
class, replicating the step-by-step example with PLUTO data in Downtown Brooklyn
. We’ll define all steps upfront, execute them in one go, and visualise the results.
Essentially, this notebook covers the Basics/[7]urban_pipeline.ipynb
example.
Data source used:
- PLUTO data from NYC Open Data. https://www.nyc.gov/content/planning/pages/resources/datasets/mappluto-pluto-change
from urban_mapper import UrbanMapper
from urban_mapper.pipeline import UrbanPipeline
# Initialise UrbanMapper
um = UrbanMapper()
Step 1: Define the Pipeline¶
Goal: Set up all components of the workflow in a single pipeline.
Input: Configurations for each UrbanMapper module.
Output: An UrbanPipeline
object ready to process data.
We define each step—urban layer, loader, imputer, filter, enricher, and visualiser—with their specific roles:
- Urban Layer: Street intersections in Downtown Brooklyn.
- Loader: PLUTO data from CSV.
- Imputer: Fills missing coordinates.
- Filter: Trims data to the bounding box.
- Enricher: Adds average floors per intersection.
- Visualiser: Prepares an interactive map.
urban_layer = (
um.urban_layer.with_type("streets_intersections")
.from_place("Downtown Brooklyn, New York City, USA", network_type="drive")
.with_mapping(
longitude_column="longitude",
latitude_column="latitude",
# geometry_column=<geometry_column_name>", # Replace <geometry_column_name> with the actual name of your geometry column instead of latitude and longitude columns.
output_column="nearest_intersection",
threshold_distance=50,
) # Recall that with mapping is to tell `map_nearest_layer` how it should map the urban data with the urban layer.
.build()
)
# Note: For the documentation interactive mode, we only query 5000 records from the dataset. Feel free to remove for a more realistic analysis.
loader = (
um
.loader
.from_huggingface("oscur/pluto", number_of_rows=5000, streaming=True)
.with_columns(longitude_column="longitude", latitude_column="latitude")
# .with_columns(geometry_column=<geometry_column_name>") # Replace <geometry_column_name> with the actual name of your geometry column instead of latitude and longitude columns.
.build()
)
imputer = (
um.imputer.with_type("SimpleGeoImputer")
.on_columns(longitude_column="longitude", latitude_column="latitude")
# .on_columns(geometry_column=<geometry_column_name>") # Replace <geometry_column_name> with the actual name of your geometry column instead of latitude and longitude columns.
.build()
)
filter_step = um.filter.with_type("BoundingBoxFilter").build()
enricher = (
um.enricher.with_data(group_by="nearest_intersection", values_from="numfloors")
.aggregate_by(method="mean", output_column="avg_floors")
.build()
)
visualiser = (
um.visual.with_type("Interactive")
.with_style({"tiles": "CartoDB Positron", "colorbar_text_color": "gray"})
.build()
)
# Assemble the pipeline
pipeline = UrbanPipeline(
[
("urban_layer", urban_layer),
("loader", loader),
("imputer", imputer),
("filter", filter_step),
("enricher", enricher),
("visualiser", visualiser),
]
)
# Let's preview the urban pipeline we just created
pipeline.preview()
Urban Pipeline Preview: Step 1: urban_layer Urban Layer: OSMNXIntersections CRS: EPSG:4326 Mappings: Mapping: - lon=longitude, lat=latitude, output=nearest_intersection Step 2: loader Loader: DataFrameLoader Latitude Column: latitude Longitude Column: longitude Geometry Column: CRS: EPSG:4326 Additional params: {'map_columns': None, 'file_path': '', 'input_dataframe': None} Step 3: imputer Imputer: SimpleGeoImputer Action: Drop rows with missing 'latitude' or 'longitude' Step 4: filter Filter: BoundingBoxFilter Action: Filter data to the bounding box of the urban layer Step 5: enricher Enricher Workflow: ├── Step 1: Data Input │ ├── Group By: nearest_intersection │ └── Values From: numfloors ├── Step 2: Action │ ├── Type: Aggregate │ ├── Aggregator: SimpleAggregator │ ├── Method: mean │ └── Output Column: avg_floors └── Step 3: Enricher ├── Type: SingleAggregatorEnricher └── Status: Ready Step 6: visualiser Visualiser: InteractiveVisualiser using Folium Style: tiles: CartoDB Positron, colorbar_text_color: gray
Step 2: Execute the Pipeline¶
Goal: Process the data through all defined steps in one operation.
Input: The UrbanPipeline
object from Step 1.
Output: A mapped GeoDataFrame and an enriched UrbanLayer
with processed data.
The compose_transform
method runs the entire workflow—loading data, imputing, filtering, mapping, and enriching—in a single call, ensuring seamless data flow.
mapped_data, enriched_layer = pipeline.compose_transform()
~> Loading: loader...
|████████ |
▁▃▅
1/5 [20%]
i
~> Loading: loader...
|████████ |
▂▄▆
1/5 [20%]
i
~> Loading: loader...
|████████ |
▃▅▇
1/5 [20%]
i
~> Loading: loader...
|████████ |
▄▆█
1/5 [20%]
i
~> Loading: loader...
|████████ |
▅▇▇
1/5 [20%]
i
~> Loading: loader...
|████████ |
▆█▆
1/5 [20%]
i
~> Loading: loader...
|████████ |
▇▇▅
1/5 [20%]
i
~> Loading: loader...
|████████ |
█▆▄
1/5 [20%]
i
~> Loading: loader...
|████████ |
▇▅▃
1/5 [20%]
i
~> Loading: loader...
|████████ |
▆▄▂
1/5 [20%]
i
~> Loading: loader...
|████████ |
▅▃▁
1/5 [20%]
i
~> Loading: loader...
|████████ |
▄▂▂
1/5 [20%]
i
~> Loading: loader...
|████████ |
▃▁▃
1/5 [20%]
i
~> Loading: loader...
|████████ |
▂▂▄
1/5 [20%]
i
~> Loading: loader...
|████████ |
▁▃▅
1/5 [20%]
i
~> Loading: loader...
|████████ |
▂▄▆
1/5 [20%]
i
~> Loading: loader...
|████████ |
▃▅▇
1/5 [20%]
i
~> Loading: loader...
|████████ |
▄▆█
1/5 [20%]
i
~> Loading: loader...
|████████ |
▅▇▇
1/5 [20%]
i
~> Loading: loader...
|████████ |
▆█▆
1/5 [20%]
i
~> Loading: loader...
|████████ |
▇▇▅
1/5 [20%]
i
~> Loading: loader...
|████████ |
█▆▄
1/5 [20%]
i
~> Loading: loader...
|████████ |
▇▅▃
1/5 [20%]
i
~> Loading: loader...
|████████ |
▆▄▂
1/5 [20%]
i
~> Loading: loader...
|████████ |
▅▃▁
1/5 [20%]
i
~> Loading: loader...
|████████ |
▄▂▂
1/5 [20%]
i
~> Loading: loader...
|████████ |
▃▁▃
1/5 [20%]
i
~> Loading: loader...
|████████ |
▂▂▄
1/5 [20%]
i
~> Loading: loader...
|████████ |
▁▃▅
1/5 [20%]
i
~> Loading: loader...
|████████ |
▂▄▆
1/5 [20%]
i
~> Loading: loader...
|████████ |
▃▅▇
1/5 [20%]
i
~> Loading: loader...
|████████ |
▄▆█
1/5 [20%]
i
~> Loading: loader...
|████████ |
▅▇▇
1/5 [20%]
i
~> Loading: loader...
|████████ |
▆█▆
1/5 [20%]
i
~> Loading: loader...
|████████ |
▇▇▅
1/5 [20%]
i
~> Loading: loader...
|████████ |
█▆▄
1/5 [20%]
i
~> Loading: loader...
|████████ |
▇▅▃
1/5 [20%]
i
~> Loading: loader...
|████████ |
▆▄▂
1/5 [20%]
i
~> Loading: loader...
|████████ |
▅▃▁
1/5 [20%]
i
~> Loading: loader...
|████████ |
▄▂▂
1/5 [20%]
i
~> Loading: loader...
|████████ |
▃▁▃
1/5 [20%]
i
~> Loading: loader...
|████████ |
▂▂▄
1/5 [20%]
i
~> Loading: loader...
|████████ |
▁▃▅
1/5 [20%]
i
~> Loading: loader...
|████████ |
▂▄▆
1/5 [20%]
i
~> Loading: loader...
|████████ |
▃▅▇
1/5 [20%]
i
~> Loading: loader...
|████████ |
▄▆█
1/5 [20%]
i
🗺️ Successfully composed pipeline with 5 steps!
|███████████████████████████████
Step 3: Visualise Results¶
Goal: Present the enriched data on an interactive map.
Input: The enriched layer from Step 2 and columns to display (avg_floors
).
Output: An interactive Folium map showing average floors per intersection.
The pipeline’s visualise
method leverages the pre-configured visualiser to generate the map directly from the enriched layer.
fig = pipeline.visualise(["avg_floors"])
fig # Display the interactive map
Step 4: Save and Load Pipeline¶
Goal: Preserve the pipeline for future use or sharing.
Input: A file path (./my_pipeline.dill
) for saving.
Output: A saved pipeline file and a reloaded UrbanPipeline
object.
Saving with save
and loading with load
allows you to reuse or distribute your workflow effortlessly.
# Save the pipeline
pipeline.save("./my_pipeline.dill")
# Load it back
loaded_pipeline = UrbanPipeline.load("./my_pipeline.dill")
# Preview the loaded pipeline
loaded_pipeline.preview()
# Visualise with the loaded pipeline
fig = loaded_pipeline.visualise(["avg_floors"])
Urban Pipeline Preview: Step 1: urban_layer Urban Layer: OSMNXIntersections CRS: EPSG:4326 Mappings: Mapping: - lon=longitude, lat=latitude, output=nearest_intersection Step 2: loader Loader: DataFrameLoader Latitude Column: latitude Longitude Column: longitude Geometry Column: CRS: EPSG:4326 Additional params: {'map_columns': None, 'file_path': ''} Step 3: imputer Imputer: SimpleGeoImputer Action: Drop rows with missing 'latitude' or 'longitude' Step 4: filter Filter: BoundingBoxFilter Action: Filter data to the bounding box of the urban layer Step 5: enricher Enricher Workflow: ├── Step 1: Data Input │ ├── Group By: nearest_intersection │ └── Values From: numfloors ├── Step 2: Action │ ├── Type: Aggregate │ ├── Aggregator: SimpleAggregator │ ├── Method: mean │ └── Output Column: avg_floors └── Step 3: Enricher ├── Type: SingleAggregatorEnricher └── Status: Ready Step 6: visualiser Visualiser: InteractiveVisualiser using Folium Style: tiles: CartoDB Positron, colorbar_text_color: gray
Conclusion¶
Well done! Using UrbanPipeline
, you’ve efficiently processed and visualised PLUTO data with less code than the step-by-step approach. This method shines for its simplicity and reusability. Compare it with the Step-by-Step notebook for a detailed breakdown of each stage!