Skip to content

Loaders

What is the loader module?

The loader module is responsible for loading geospatial data into UrbanMapper. It provides a unified interface for loading various data formats, including shapefiles, parquet, and CSV files with geospatial information. UrbanMapper steps support using multiple datasets. The user can create multiple loader instances, one for each dataset, combine them in a single dictionary with suitable keys, and use it in your pipeline. Besides, geolocation can be loaded from latitude-longitude data columns or geometry specified in WKT format.

Meanwhile, we recommend to look through the Example's Loader for a more hands-on introduction about the Loader module and its usage.

Documentation Under Alpha Construction

This documentation is in its early stages and still being developed. The API may therefore change, and some parts might be incomplete or inaccurate.

Use at your own risk, and please report anything that seems incorrect / outdated you find.

Open An Issue!

LoaderBase

Bases: ABC

Base Class For Loaders.

This abstract class defines the common interface that all loader implementations must implement. Loaders are responsible for reading spatial data from various file formats and converting them to GeoDataFrames data structure. They handle coordinate system transformations and validation of required spatial columns.

Attributes:

Name Type Description
latitude_column str

Name of the column containing latitude values.

longitude_column str

Name of the column containing longitude values.

coordinate_reference_system Union[str, Tuple[str, str]]

If a string, it specifies the coordinate reference system to use (default: 'EPSG:4326'). If a tuple (source_crs, target_crs), it defines a conversion from the source CRS to the target CRS (default target CRS: 'EPSG:4326').

additional_loader_parameters Dict[str, Any]

Additional parameters specific to the loader implementation. Consider this as kwargs.

Source code in src/urban_mapper/modules/loader/abc_loader.py
@beartype
class LoaderBase(ABC):
    """Base Class For `Loaders`.

    This abstract class defines the common interface that all loader implementations
    **must implement**. `Loaders` are responsible for reading spatial data from various
    file formats and converting them to `GeoDataFrames` data structure. They handle coordinate system
    transformations and validation of required spatial columns.

    Attributes:
        latitude_column (str): Name of the column containing latitude values.
        longitude_column (str): Name of the column containing longitude values.
        coordinate_reference_system (Union[str, Tuple[str, str]]):
            If a string, it specifies the coordinate reference system to use (default: 'EPSG:4326').
            If a tuple (source_crs, target_crs), it defines a conversion from the source CRS to the target CRS (default target CRS: 'EPSG:4326').
        additional_loader_parameters (Dict[str, Any]): Additional parameters specific to the loader implementation. Consider this as `kwargs`.
    """

    def __init__(
        self,
        latitude_column: Optional[str] = None,
        longitude_column: Optional[str] = None,
        geometry_column: Optional[str] = None,
        coordinate_reference_system: Union[str, Tuple[str, str]] = DEFAULT_CRS,
        **additional_loader_parameters: Any,
    ) -> None:
        self.latitude_column: str = latitude_column or ""
        self.longitude_column: str = longitude_column or ""
        self.geometry_column: str = geometry_column or ""
        self.coordinate_reference_system: Union[str, Tuple[str, str]] = (
            coordinate_reference_system
        )
        self.additional_loader_parameters: Dict[str, Any] = additional_loader_parameters

    @abstractmethod
    def _load(self) -> gpd.GeoDataFrame:
        """Internal implementation method for loading data from a file.

        This method is called by `load()` after validation is performed.

        !!! warning "Method Not Implemented"
            This method must be implemented by subclasses. It should contain the logic
            for reading the file and converting it to a `GeoDataFrame`.

        Returns:
            A `GeoDataFrame` containing the loaded spatial data.

        Raises:
            ValueError: If required columns are missing or the file format is invalid.
            FileNotFoundError: If the file does not exist.
        """
        ...

    @ensure_coordinate_reference_system
    def load(self) -> gpd.GeoDataFrame:
        """Load spatial data from a source.

        This is the main public method for using `loaders`. It performs validation
        on the inputs before delegating to the implementation-specific `_load` method.
        It also ensures the file exists and that the coordinate reference system is properly set.

        Returns:
            A `GeoDataFrame` containing the loaded spatial data.

        Raises:
            FileNotFoundError: If the file does not exist.
            ValueError: If required columns are missing or the file format is invalid.

        Examples:
        """
        loaded_data = self._load()

        if self.additional_loader_parameters.get("map_columns") is not None:
            map_columns = self.additional_loader_parameters.get("map_columns")

            if (
                loaded_data.active_geometry_name is not None
                and loaded_data.active_geometry_name in map_columns.keys()
            ):
                source = loaded_data.active_geometry_name
                loaded_data = loaded_data.rename_geometry(map_columns[source])
                del map_columns[source]

            loaded_data = loaded_data.rename(columns=map_columns)

        return loaded_data

    @abstractmethod
    def preview(self, format: str = "ascii") -> Any:
        """Generate a preview of the instance's `loader`.

        Creates a summary representation of the loader for quick inspection during UrbanMapper's analysis workflow.

        !!! warning "Method Not Implemented"
            This method must be implemented by subclasses. It should provide a preview
            of the loader's configuration and data. Make sure to support all formats.

        Args:
            format: The output format for the preview. Options include:

                - [x] `ascii`: Text-based format for terminal display
                - [x] `json`: JSON-formatted data for programmatic use

        Returns:
            A representation of the `loader` in the requested format.
            Return type varies based on the format parameter.

        Raises:
            ValueError: If an unsupported format is requested.
        """
        pass

load()

Load spatial data from a source.

This is the main public method for using loaders. It performs validation on the inputs before delegating to the implementation-specific _load method. It also ensures the file exists and that the coordinate reference system is properly set.

Returns:

Type Description
GeoDataFrame

A GeoDataFrame containing the loaded spatial data.

Raises:

Type Description
FileNotFoundError

If the file does not exist.

ValueError

If required columns are missing or the file format is invalid.

Examples:

Source code in src/urban_mapper/modules/loader/abc_loader.py
@ensure_coordinate_reference_system
def load(self) -> gpd.GeoDataFrame:
    """Load spatial data from a source.

    This is the main public method for using `loaders`. It performs validation
    on the inputs before delegating to the implementation-specific `_load` method.
    It also ensures the file exists and that the coordinate reference system is properly set.

    Returns:
        A `GeoDataFrame` containing the loaded spatial data.

    Raises:
        FileNotFoundError: If the file does not exist.
        ValueError: If required columns are missing or the file format is invalid.

    Examples:
    """
    loaded_data = self._load()

    if self.additional_loader_parameters.get("map_columns") is not None:
        map_columns = self.additional_loader_parameters.get("map_columns")

        if (
            loaded_data.active_geometry_name is not None
            and loaded_data.active_geometry_name in map_columns.keys()
        ):
            source = loaded_data.active_geometry_name
            loaded_data = loaded_data.rename_geometry(map_columns[source])
            del map_columns[source]

        loaded_data = loaded_data.rename(columns=map_columns)

    return loaded_data

_load() abstractmethod

Internal implementation method for loading data from a file.

This method is called by load() after validation is performed.

Method Not Implemented

This method must be implemented by subclasses. It should contain the logic for reading the file and converting it to a GeoDataFrame.

Returns:

Type Description
GeoDataFrame

A GeoDataFrame containing the loaded spatial data.

Raises:

Type Description
ValueError

If required columns are missing or the file format is invalid.

FileNotFoundError

If the file does not exist.

Source code in src/urban_mapper/modules/loader/abc_loader.py
@abstractmethod
def _load(self) -> gpd.GeoDataFrame:
    """Internal implementation method for loading data from a file.

    This method is called by `load()` after validation is performed.

    !!! warning "Method Not Implemented"
        This method must be implemented by subclasses. It should contain the logic
        for reading the file and converting it to a `GeoDataFrame`.

    Returns:
        A `GeoDataFrame` containing the loaded spatial data.

    Raises:
        ValueError: If required columns are missing or the file format is invalid.
        FileNotFoundError: If the file does not exist.
    """
    ...

preview(format='ascii') abstractmethod

Generate a preview of the instance's loader.

Creates a summary representation of the loader for quick inspection during UrbanMapper's analysis workflow.

Method Not Implemented

This method must be implemented by subclasses. It should provide a preview of the loader's configuration and data. Make sure to support all formats.

Parameters:

Name Type Description Default
format str

The output format for the preview. Options include:

  • ascii: Text-based format for terminal display
  • json: JSON-formatted data for programmatic use
'ascii'

Returns:

Type Description
Any

A representation of the loader in the requested format.

Any

Return type varies based on the format parameter.

Raises:

Type Description
ValueError

If an unsupported format is requested.

Source code in src/urban_mapper/modules/loader/abc_loader.py
@abstractmethod
def preview(self, format: str = "ascii") -> Any:
    """Generate a preview of the instance's `loader`.

    Creates a summary representation of the loader for quick inspection during UrbanMapper's analysis workflow.

    !!! warning "Method Not Implemented"
        This method must be implemented by subclasses. It should provide a preview
        of the loader's configuration and data. Make sure to support all formats.

    Args:
        format: The output format for the preview. Options include:

            - [x] `ascii`: Text-based format for terminal display
            - [x] `json`: JSON-formatted data for programmatic use

    Returns:
        A representation of the `loader` in the requested format.
        Return type varies based on the format parameter.

    Raises:
        ValueError: If an unsupported format is requested.
    """
    pass

FileLoaderBase

Bases: LoaderBase

FileLoaderBase For Loaders.

This abstract class defines the common interface that all loader implementations must implement. Loaders are responsible for reading spatial data from various file formats and converting them to GeoDataFrames data structure. They handle coordinate system transformations and validation of required spatial columns.

Attributes:

Name Type Description
file_path Path

Path to the file to load.

latitude_column str

Name of the column containing latitude values.

longitude_column str

Name of the column containing longitude values.

coordinate_reference_system Union[str, Tuple[str, str]]

If a string, it specifies the coordinate reference system to use (default: 'EPSG:4326'). If a tuple (source_crs, target_crs), it defines a conversion from the source CRS to the target CRS (default target CRS: 'EPSG:4326').

additional_loader_parameters Dict[str, Any]

Additional parameters specific to the loader implementation. Consider this as kwargs.

Source code in src/urban_mapper/modules/loader/loaders/file_loader.py
@beartype
class FileLoaderBase(LoaderBase):
    """FileLoaderBase For `Loaders`.

    This abstract class defines the common interface that all loader implementations
    **must implement**. `Loaders` are responsible for reading spatial data from various
    file formats and converting them to `GeoDataFrames` data structure. They handle coordinate system
    transformations and validation of required spatial columns.

    Attributes:
        file_path (Path): Path to the file to load.
        latitude_column (str): Name of the column containing latitude values.
        longitude_column (str): Name of the column containing longitude values.
        coordinate_reference_system (Union[str, Tuple[str, str]]):
            If a string, it specifies the coordinate reference system to use (default: 'EPSG:4326').
            If a tuple (source_crs, target_crs), it defines a conversion from the source CRS to the target CRS (default target CRS: 'EPSG:4326').
        additional_loader_parameters (Dict[str, Any]): Additional parameters specific to the loader implementation. Consider this as `kwargs`.
    """

    def __init__(
        self,
        file_path: Union[str, Path],
        latitude_column: Optional[str] = None,
        longitude_column: Optional[str] = None,
        geometry_column: Optional[str] = None,
        coordinate_reference_system: Union[str, Tuple[str, str]] = DEFAULT_CRS,
        **additional_loader_parameters: Any,
    ) -> None:
        super().__init__(
            latitude_column=latitude_column,
            longitude_column=longitude_column,
            geometry_column=geometry_column,
            coordinate_reference_system=coordinate_reference_system,
            **additional_loader_parameters,
        )
        self.file_path: Path = Path(file_path)

CSVLoader

Bases: FileLoaderBase

Loader for CSV files containing spatial data.

This loader reads data from CSV (or other delimiter-separated) files and converts them to GeoDataFrames with point geometries. It requires latitude and longitude columns to create point geometries for each row.

Attributes:

Name Type Description
file_path Path

Path to the CSV file to load.

latitude_column str

Name of the column containing latitude values.

longitude_column str

Name of the column containing longitude values.

geometry_column str

Name of the column containing geometry data in WKT format.

coordinate_reference_system Union[str, Tuple[str, str]]

If a string, it specifies the coordinate reference system to use (default: 'EPSG:4326'). If a tuple (source_crs, target_crs), it defines a conversion from the source CRS to the target CRS (default target CRS: 'EPSG:4326').

separator str

The delimiter character used in the CSV file. Default: ","

encoding str

The character encoding of the CSV file. Default: "utf-8"

Examples:

>>> from urban_mapper.modules.loader import CSVLoader
>>>
>>> # Basic usage with lat/long
>>> loader = CSVLoader(
...     file_path="taxi_trips.csv",
...     latitude_column="pickup_lat",
...     longitude_column="pickup_lng"
... )
>>> gdf = loader.load()
>>>
>>> # Basic usage with geometry
>>> loader = CSVLoader(
...     file_path="taxi_trips.csv",
...     geometry_column="the_geom"
... )
>>> gdf = loader.load()
>>>
>>> # With custom separator and encoding
>>> loader = CSVLoader(
...     file_path="custom_data.csv",
...     geometry_column="geom",
...     separator=";",
...     encoding="latin-1"
... )
>>> gdf = loader.load()
>>>
>>> # With CRS
>>> loader = CSVLoader(
...     file_path="custom_data.csv",
...     latitude_column="lat",
...     longitude_column="lng",
...     coordinate_reference_system="EPSG:4326"
... )
>>> gdf = loader.load()
>>>
>>> # With source-target CRS
>>> loader = CSVLoader(
...     file_path="custom_data.csv",
...     latitude_column="lat",
...     longitude_column="lng",
...     coordinate_reference_system=("EPSG:4326", "EPSG:3857")
... )
>>> gdf = loader.load()
Source code in src/urban_mapper/modules/loader/loaders/csv_loader.py
@beartype
class CSVLoader(FileLoaderBase):
    """Loader for `CSV` files containing spatial data.

    This loader reads data from `CSV` (or other delimiter-separated) files and
    converts them to `GeoDataFrames` with point geometries. It requires latitude
    and longitude columns to create point geometries for each row.

    Attributes:
        file_path (Path): Path to the `CSV` file to load.
        latitude_column (str): Name of the column containing latitude values.
        longitude_column (str): Name of the column containing longitude values.
        geometry_column (str): Name of the column containing geometry data in WKT format.
        coordinate_reference_system (Union[str, Tuple[str, str]]):
            If a string, it specifies the coordinate reference system to use (default: 'EPSG:4326').
            If a tuple (source_crs, target_crs), it defines a conversion from the source CRS to the target CRS (default target CRS: 'EPSG:4326').
        separator (str): The delimiter character used in the CSV file. Default: `","`
        encoding (str): The character encoding of the CSV file. Default: `"utf-8"`

    Examples:
        >>> from urban_mapper.modules.loader import CSVLoader
        >>>
        >>> # Basic usage with lat/long
        >>> loader = CSVLoader(
        ...     file_path="taxi_trips.csv",
        ...     latitude_column="pickup_lat",
        ...     longitude_column="pickup_lng"
        ... )
        >>> gdf = loader.load()
        >>>
        >>> # Basic usage with geometry
        >>> loader = CSVLoader(
        ...     file_path="taxi_trips.csv",
        ...     geometry_column="the_geom"
        ... )
        >>> gdf = loader.load()
        >>>
        >>> # With custom separator and encoding
        >>> loader = CSVLoader(
        ...     file_path="custom_data.csv",
        ...     geometry_column="geom",
        ...     separator=";",
        ...     encoding="latin-1"
        ... )
        >>> gdf = loader.load()
        >>>
        >>> # With CRS
        >>> loader = CSVLoader(
        ...     file_path="custom_data.csv",
        ...     latitude_column="lat",
        ...     longitude_column="lng",
        ...     coordinate_reference_system="EPSG:4326"
        ... )
        >>> gdf = loader.load()
        >>>
        >>> # With source-target CRS
        >>> loader = CSVLoader(
        ...     file_path="custom_data.csv",
        ...     latitude_column="lat",
        ...     longitude_column="lng",
        ...     coordinate_reference_system=("EPSG:4326", "EPSG:3857")
        ... )
        >>> gdf = loader.load()
    """

    def __init__(
        self,
        file_path: Union[str, Path],
        latitude_column: Optional[str] = None,
        longitude_column: Optional[str] = None,
        geometry_column: Optional[str] = None,
        coordinate_reference_system: Union[str, Tuple[str, str]] = DEFAULT_CRS,
        separator: str = ",",
        encoding: str = "utf-8",
        **additional_loader_parameters: Any,
    ) -> None:
        super().__init__(
            file_path=file_path,
            latitude_column=latitude_column,
            longitude_column=longitude_column,
            geometry_column=geometry_column,
            coordinate_reference_system=coordinate_reference_system,
            **additional_loader_parameters,
        )
        self.separator = separator
        self.encoding = encoding

    @require_either_or_attributes(
        [["latitude_column", "longitude_column"], ["geometry_column"]],
        error_msg="Either both 'latitude_column' and 'longitude_column' must be set, or 'geometry_column' must be set.",
    )
    def _load(self) -> gpd.GeoDataFrame:
        """Load data from a CSV file and convert it to a `GeoDataFrame`.

        This method reads a `CSV` file using pandas, validates the latitude and
        longitude columns, and converts the data to a `GeoDataFrame` with point
        geometries using the specified coordinate reference system.

        Returns:
            A `GeoDataFrame` containing the loaded data with point geometries
            created from the latitude and longitude columns.

        Raises:
            ValueError: If latitude_column, longitude_column, or geometry_column is None.
            ValueError: If latitude_column or longitude_column and geometry_column are defined together.
            ValueError: If the specified columns are not found in the CSV file.
            pd.errors.ParserError: If the CSV file cannot be parsed.
            UnicodeDecodeError: If the file encoding is incorrect.
        """
        dataframe = pd.read_csv(
            self.file_path, sep=self.separator, encoding=self.encoding
        )

        if self.latitude_column != "" and self.longitude_column != "":
            if self.latitude_column not in dataframe.columns:
                raise ValueError(
                    f"Column '{self.latitude_column}' not found in the CSV file."
                )
            if self.longitude_column not in dataframe.columns:
                raise ValueError(
                    f"Column '{self.longitude_column}' not found in the CSV file."
                )

            # Ensure latitude and longitude columns are numeric
            dataframe[self.latitude_column] = pd.to_numeric(
                dataframe[self.latitude_column], errors="coerce"
            )
            dataframe[self.longitude_column] = pd.to_numeric(
                dataframe[self.longitude_column], errors="coerce"
            )
            geometry = gpd.points_from_xy(
                dataframe[self.longitude_column],
                dataframe[self.latitude_column],
            )
        else:
            if self.geometry_column not in dataframe.columns:
                raise ValueError(
                    f"Column '{self.geometry_column}' not found in the CSV file."
                )

            filter_not_na = dataframe[self.geometry_column].notna()
            dataframe.loc[filter_not_na, self.geometry_column] = dataframe.loc[
                filter_not_na, self.geometry_column
            ].apply(wkt.loads)
            geometry = self.geometry_column

        geodataframe = gpd.GeoDataFrame(
            dataframe,
            geometry=geometry,
            crs=self.coordinate_reference_system[0]
            if isinstance(self.coordinate_reference_system, tuple)
            else self.coordinate_reference_system,
        )
        return geodataframe

    def preview(self, format: str = "ascii") -> Any:
        """Generate a preview of this `CSV` loader.

        Creates a summary representation of the loader for quick inspection.

        Args:
            format: The output format for the preview. Options include:

                - [x] "ascii": Text-based format for terminal display
                - [x] "json": JSON-formatted data for programmatic use

        Returns:
            A string or dictionary representing the loader, depending on the format.

        Raises:
            ValueError: If an unsupported format is requested.
        """
        if format == "ascii":
            return (
                f"Loader: CSVLoader\n"
                f"  File: {self.file_path}\n"
                f"  Latitude Column: {self.latitude_column}\n"
                f"  Longitude Column: {self.longitude_column}\n"
                f"  Geometry Column: {self.geometry_column}\n"
                f"  Separator: {self.separator}\n"
                f"  Encoding: {self.encoding}\n"
                f"  CRS: {self.coordinate_reference_system}\n"
                f"  Additional params: {self.additional_loader_parameters}\n"
            )
        elif format == "json":
            return {
                "loader": "CSVLoader",
                "file": self.file_path,
                "latitude_column": self.latitude_column,
                "longitude_column": self.longitude_column,
                "geometry_column": self.geometry_column,
                "separator": self.separator,
                "encoding": self.encoding,
                "crs": self.coordinate_reference_system,
                "additional_params": self.additional_loader_parameters,
            }
        else:
            raise ValueError(f"Unsupported format: {format}")

_load()

Load data from a CSV file and convert it to a GeoDataFrame.

This method reads a CSV file using pandas, validates the latitude and longitude columns, and converts the data to a GeoDataFrame with point geometries using the specified coordinate reference system.

Returns:

Type Description
GeoDataFrame

A GeoDataFrame containing the loaded data with point geometries

GeoDataFrame

created from the latitude and longitude columns.

Raises:

Type Description
ValueError

If latitude_column, longitude_column, or geometry_column is None.

ValueError

If latitude_column or longitude_column and geometry_column are defined together.

ValueError

If the specified columns are not found in the CSV file.

ParserError

If the CSV file cannot be parsed.

UnicodeDecodeError

If the file encoding is incorrect.

Source code in src/urban_mapper/modules/loader/loaders/csv_loader.py
@require_either_or_attributes(
    [["latitude_column", "longitude_column"], ["geometry_column"]],
    error_msg="Either both 'latitude_column' and 'longitude_column' must be set, or 'geometry_column' must be set.",
)
def _load(self) -> gpd.GeoDataFrame:
    """Load data from a CSV file and convert it to a `GeoDataFrame`.

    This method reads a `CSV` file using pandas, validates the latitude and
    longitude columns, and converts the data to a `GeoDataFrame` with point
    geometries using the specified coordinate reference system.

    Returns:
        A `GeoDataFrame` containing the loaded data with point geometries
        created from the latitude and longitude columns.

    Raises:
        ValueError: If latitude_column, longitude_column, or geometry_column is None.
        ValueError: If latitude_column or longitude_column and geometry_column are defined together.
        ValueError: If the specified columns are not found in the CSV file.
        pd.errors.ParserError: If the CSV file cannot be parsed.
        UnicodeDecodeError: If the file encoding is incorrect.
    """
    dataframe = pd.read_csv(
        self.file_path, sep=self.separator, encoding=self.encoding
    )

    if self.latitude_column != "" and self.longitude_column != "":
        if self.latitude_column not in dataframe.columns:
            raise ValueError(
                f"Column '{self.latitude_column}' not found in the CSV file."
            )
        if self.longitude_column not in dataframe.columns:
            raise ValueError(
                f"Column '{self.longitude_column}' not found in the CSV file."
            )

        # Ensure latitude and longitude columns are numeric
        dataframe[self.latitude_column] = pd.to_numeric(
            dataframe[self.latitude_column], errors="coerce"
        )
        dataframe[self.longitude_column] = pd.to_numeric(
            dataframe[self.longitude_column], errors="coerce"
        )
        geometry = gpd.points_from_xy(
            dataframe[self.longitude_column],
            dataframe[self.latitude_column],
        )
    else:
        if self.geometry_column not in dataframe.columns:
            raise ValueError(
                f"Column '{self.geometry_column}' not found in the CSV file."
            )

        filter_not_na = dataframe[self.geometry_column].notna()
        dataframe.loc[filter_not_na, self.geometry_column] = dataframe.loc[
            filter_not_na, self.geometry_column
        ].apply(wkt.loads)
        geometry = self.geometry_column

    geodataframe = gpd.GeoDataFrame(
        dataframe,
        geometry=geometry,
        crs=self.coordinate_reference_system[0]
        if isinstance(self.coordinate_reference_system, tuple)
        else self.coordinate_reference_system,
    )
    return geodataframe

preview(format='ascii')

Generate a preview of this CSV loader.

Creates a summary representation of the loader for quick inspection.

Parameters:

Name Type Description Default
format str

The output format for the preview. Options include:

  • "ascii": Text-based format for terminal display
  • "json": JSON-formatted data for programmatic use
'ascii'

Returns:

Type Description
Any

A string or dictionary representing the loader, depending on the format.

Raises:

Type Description
ValueError

If an unsupported format is requested.

Source code in src/urban_mapper/modules/loader/loaders/csv_loader.py
def preview(self, format: str = "ascii") -> Any:
    """Generate a preview of this `CSV` loader.

    Creates a summary representation of the loader for quick inspection.

    Args:
        format: The output format for the preview. Options include:

            - [x] "ascii": Text-based format for terminal display
            - [x] "json": JSON-formatted data for programmatic use

    Returns:
        A string or dictionary representing the loader, depending on the format.

    Raises:
        ValueError: If an unsupported format is requested.
    """
    if format == "ascii":
        return (
            f"Loader: CSVLoader\n"
            f"  File: {self.file_path}\n"
            f"  Latitude Column: {self.latitude_column}\n"
            f"  Longitude Column: {self.longitude_column}\n"
            f"  Geometry Column: {self.geometry_column}\n"
            f"  Separator: {self.separator}\n"
            f"  Encoding: {self.encoding}\n"
            f"  CRS: {self.coordinate_reference_system}\n"
            f"  Additional params: {self.additional_loader_parameters}\n"
        )
    elif format == "json":
        return {
            "loader": "CSVLoader",
            "file": self.file_path,
            "latitude_column": self.latitude_column,
            "longitude_column": self.longitude_column,
            "geometry_column": self.geometry_column,
            "separator": self.separator,
            "encoding": self.encoding,
            "crs": self.coordinate_reference_system,
            "additional_params": self.additional_loader_parameters,
        }
    else:
        raise ValueError(f"Unsupported format: {format}")

ParquetLoader

Bases: FileLoaderBase

Loader for Parquet files containing spatial data.

This loader reads data from Parquet files and converts them to GeoDataFrames with point geometries. It requires latitude and longitude columns to create point geometries for each row.

Attributes:

Name Type Description
file_path Union[str, Path]

Path to the Parquet file to load.

latitude_column Optional[str]

Name of the column containing latitude values. Default: None

longitude_column Optional[str]

Name of the column containing longitude values. Default: None

coordinate_reference_system Union[str, Tuple[str, str]]

If a string, it specifies the coordinate reference system to use (default: 'EPSG:4326'). If a tuple (source_crs, target_crs), it defines a conversion from the source CRS to the target CRS (default target CRS: 'EPSG:4326').

engine str

The engine to use for reading Parquet files. Default: "pyarrow"

columns Optional[list[str]]

List of columns to read from the Parquet file. Default: None, which reads all columns.

Examples:

>>> from urban_mapper.modules.loader import ParquetLoader
>>>
>>> # Basic usage
>>> loader = ParquetLoader(
...     file_path="data.parquet",
...     latitude_column="lat",
...     longitude_column="lon"
... )
>>> gdf = loader.load()
>>>
>>> # With custom columns and engine
>>> loader = ParquetLoader(
...     file_path="data.parquet",
...     latitude_column="latitude",
...     longitude_column="longitude",
...     engine="fastparquet",
...     columns=["latitude", "longitude", "value"]
... )
>>> gdf = loader.load()
>>>
>>> # With CRS
>>> loader = ParquetLoader(
...     file_path="data.parquet",
...     latitude_column="latitude",
...     longitude_column="longitude",
...     coordinate_reference_system="EPSG:4326"
... )
>>> gdf = loader.load()
>>>
>>> # With source-target CRS
>>> loader = ParquetLoader(
...     file_path="data.parquet",
...     latitude_column="latitude",
...     longitude_column="longitude",
...     coordinate_reference_system=("EPSG:4326", "EPSG:3857")
... )
>>> gdf = loader.load()
Source code in src/urban_mapper/modules/loader/loaders/parquet_loader.py
@beartype
class ParquetLoader(FileLoaderBase):
    """Loader for `Parquet` files containing spatial data.

    This loader reads data from `Parquet` files and converts them to `GeoDataFrames`
    with point geometries. It requires latitude and longitude columns to create
    point geometries for each row.

    Attributes:
        file_path (Union[str, Path]): Path to the Parquet file to load.
        latitude_column (Optional[str]): Name of the column containing latitude values. Default: `None`
        longitude_column (Optional[str]): Name of the column containing longitude values. Default: `None`
        coordinate_reference_system (Union[str, Tuple[str, str]]):
            If a string, it specifies the coordinate reference system to use (default: 'EPSG:4326').
            If a tuple (source_crs, target_crs), it defines a conversion from the source CRS to the target CRS (default target CRS: 'EPSG:4326').
        engine (str): The engine to use for reading Parquet files. Default: `"pyarrow"`
        columns (Optional[list[str]]): List of columns to read from the Parquet file. Default: `None`, which reads all columns.

    Examples:
        >>> from urban_mapper.modules.loader import ParquetLoader
        >>>
        >>> # Basic usage
        >>> loader = ParquetLoader(
        ...     file_path="data.parquet",
        ...     latitude_column="lat",
        ...     longitude_column="lon"
        ... )
        >>> gdf = loader.load()
        >>>
        >>> # With custom columns and engine
        >>> loader = ParquetLoader(
        ...     file_path="data.parquet",
        ...     latitude_column="latitude",
        ...     longitude_column="longitude",
        ...     engine="fastparquet",
        ...     columns=["latitude", "longitude", "value"]
        ... )
        >>> gdf = loader.load()
        >>>
        >>> # With CRS
        >>> loader = ParquetLoader(
        ...     file_path="data.parquet",
        ...     latitude_column="latitude",
        ...     longitude_column="longitude",
        ...     coordinate_reference_system="EPSG:4326"
        ... )
        >>> gdf = loader.load()
        >>>
        >>> # With source-target CRS
        >>> loader = ParquetLoader(
        ...     file_path="data.parquet",
        ...     latitude_column="latitude",
        ...     longitude_column="longitude",
        ...     coordinate_reference_system=("EPSG:4326", "EPSG:3857")
        ... )
        >>> gdf = loader.load()
    """

    def __init__(
        self,
        file_path: Union[str, Path],
        latitude_column: Optional[str] = None,
        longitude_column: Optional[str] = None,
        geometry_column: Optional[str] = None,
        coordinate_reference_system: Union[str, Tuple[str, str]] = DEFAULT_CRS,
        engine: str = "pyarrow",
        columns: Optional[list[str]] = None,
        **additional_loader_parameters: Any,
    ) -> None:
        super().__init__(
            file_path=file_path,
            latitude_column=latitude_column,
            longitude_column=longitude_column,
            geometry_column=geometry_column,
            coordinate_reference_system=coordinate_reference_system,
            **additional_loader_parameters,
        )
        self.engine = engine
        self.columns = columns

    @require_either_or_attributes(
        [["latitude_column", "longitude_column"], ["geometry_column"]],
        error_msg="Either both 'latitude_column' and 'longitude_column' must be set, or 'geometry_column' must be set.",
    )
    def _load(self) -> gpd.GeoDataFrame:
        """Load data from a `Parquet` file and convert it to a `GeoDataFrame`.

        This method reads a `Parquet` file using `pandas`, validates the latitude and
        longitude columns, and converts the data to a `GeoDataFrame` with point
        geometries using the specified coordinate reference system.

        Returns:
            A `GeoDataFrame` containing the loaded data with point geometries
            created from the latitude and longitude columns.

        Raises:
            ValueError: If `latitude_column`, `longitude_column` or `geometry_column` is `None`.
            ValueError: If `latitude_column`/`longitude_column` and `geometry_column` are defined together.
            ValueError: If the specified latitude or longitude columns are not found in the Parquet file.
            IOError: If the Parquet file cannot be read.
        """
        dataframe = pd.read_parquet(
            self.file_path,
            engine=self.engine,
            columns=self.columns,
        )

        if self.latitude_column != "" and self.longitude_column != "":
            if self.latitude_column not in dataframe.columns:
                raise ValueError(
                    f"Column '{self.latitude_column}' not found in the Parquet file."
                )
            if self.longitude_column not in dataframe.columns:
                raise ValueError(
                    f"Column '{self.longitude_column}' not found in the Parquet file."
                )

            dataframe[self.latitude_column] = pd.to_numeric(
                dataframe[self.latitude_column], errors="coerce"
            )
            dataframe[self.longitude_column] = pd.to_numeric(
                dataframe[self.longitude_column], errors="coerce"
            )
            geometry = gpd.points_from_xy(
                dataframe[self.longitude_column],
                dataframe[self.latitude_column],
            )
        else:
            if self.geometry_column not in dataframe.columns:
                raise ValueError(
                    f"Column '{self.geometry_column}' not found in the Parquet file."
                )

            filter_not_na = dataframe[self.geometry_column].notna()
            dataframe.loc[filter_not_na, self.geometry_column] = dataframe.loc[
                filter_not_na, self.geometry_column
            ].apply(wkt.loads)
            geometry = self.geometry_column

        geodataframe = gpd.GeoDataFrame(
            dataframe,
            geometry=geometry,
            crs=self.coordinate_reference_system[0]
            if isinstance(self.coordinate_reference_system, tuple)
            else self.coordinate_reference_system,
        )
        return geodataframe

    def preview(self, format: str = "ascii") -> Any:
        """Generate a preview of this `Parquet` loader.

        Creates a summary representation of the loader for quick inspection.

        Args:
            format: The output format for the preview. Options include:

                - [x] "ascii": Text-based format for terminal display
                - [x] "json": JSON-formatted data for programmatic use

        Returns:
            A string or dictionary representing the loader, depending on the format.

        Raises:
            ValueError: If an unsupported format is requested.
        """
        cols = self.columns if self.columns else "All columns"

        if format == "ascii":
            return (
                f"Loader: ParquetLoader\n"
                f"  File: {self.file_path}\n"
                f"  Latitude Column: {self.latitude_column}\n"
                f"  Longitude Column: {self.longitude_column}\n"
                f"  Geometry Column: {self.geometry_column}\n"
                f"  Engine: {self.engine}\n"
                f"  Columns: {cols}\n"
                f"  CRS: {self.coordinate_reference_system}\n"
                f"  Additional params: {self.additional_loader_parameters}\n"
            )
        elif format == "json":
            return {
                "loader": "ParquetLoader",
                "file": self.file_path,
                "latitude_column": self.latitude_column,
                "longitude_column": self.longitude_column,
                "geometry_column": self.geometry_column,
                "engine": self.engine,
                "columns": cols,
                "coordinate_reference_system": self.coordinate_reference_system,
                "additional_params": self.additional_loader_parameters,
            }
        else:
            raise ValueError(f"Unsupported format '{format}'")

_load()

Load data from a Parquet file and convert it to a GeoDataFrame.

This method reads a Parquet file using pandas, validates the latitude and longitude columns, and converts the data to a GeoDataFrame with point geometries using the specified coordinate reference system.

Returns:

Type Description
GeoDataFrame

A GeoDataFrame containing the loaded data with point geometries

GeoDataFrame

created from the latitude and longitude columns.

Raises:

Type Description
ValueError

If latitude_column, longitude_column or geometry_column is None.

ValueError

If latitude_column/longitude_column and geometry_column are defined together.

ValueError

If the specified latitude or longitude columns are not found in the Parquet file.

IOError

If the Parquet file cannot be read.

Source code in src/urban_mapper/modules/loader/loaders/parquet_loader.py
@require_either_or_attributes(
    [["latitude_column", "longitude_column"], ["geometry_column"]],
    error_msg="Either both 'latitude_column' and 'longitude_column' must be set, or 'geometry_column' must be set.",
)
def _load(self) -> gpd.GeoDataFrame:
    """Load data from a `Parquet` file and convert it to a `GeoDataFrame`.

    This method reads a `Parquet` file using `pandas`, validates the latitude and
    longitude columns, and converts the data to a `GeoDataFrame` with point
    geometries using the specified coordinate reference system.

    Returns:
        A `GeoDataFrame` containing the loaded data with point geometries
        created from the latitude and longitude columns.

    Raises:
        ValueError: If `latitude_column`, `longitude_column` or `geometry_column` is `None`.
        ValueError: If `latitude_column`/`longitude_column` and `geometry_column` are defined together.
        ValueError: If the specified latitude or longitude columns are not found in the Parquet file.
        IOError: If the Parquet file cannot be read.
    """
    dataframe = pd.read_parquet(
        self.file_path,
        engine=self.engine,
        columns=self.columns,
    )

    if self.latitude_column != "" and self.longitude_column != "":
        if self.latitude_column not in dataframe.columns:
            raise ValueError(
                f"Column '{self.latitude_column}' not found in the Parquet file."
            )
        if self.longitude_column not in dataframe.columns:
            raise ValueError(
                f"Column '{self.longitude_column}' not found in the Parquet file."
            )

        dataframe[self.latitude_column] = pd.to_numeric(
            dataframe[self.latitude_column], errors="coerce"
        )
        dataframe[self.longitude_column] = pd.to_numeric(
            dataframe[self.longitude_column], errors="coerce"
        )
        geometry = gpd.points_from_xy(
            dataframe[self.longitude_column],
            dataframe[self.latitude_column],
        )
    else:
        if self.geometry_column not in dataframe.columns:
            raise ValueError(
                f"Column '{self.geometry_column}' not found in the Parquet file."
            )

        filter_not_na = dataframe[self.geometry_column].notna()
        dataframe.loc[filter_not_na, self.geometry_column] = dataframe.loc[
            filter_not_na, self.geometry_column
        ].apply(wkt.loads)
        geometry = self.geometry_column

    geodataframe = gpd.GeoDataFrame(
        dataframe,
        geometry=geometry,
        crs=self.coordinate_reference_system[0]
        if isinstance(self.coordinate_reference_system, tuple)
        else self.coordinate_reference_system,
    )
    return geodataframe

preview(format='ascii')

Generate a preview of this Parquet loader.

Creates a summary representation of the loader for quick inspection.

Parameters:

Name Type Description Default
format str

The output format for the preview. Options include:

  • "ascii": Text-based format for terminal display
  • "json": JSON-formatted data for programmatic use
'ascii'

Returns:

Type Description
Any

A string or dictionary representing the loader, depending on the format.

Raises:

Type Description
ValueError

If an unsupported format is requested.

Source code in src/urban_mapper/modules/loader/loaders/parquet_loader.py
def preview(self, format: str = "ascii") -> Any:
    """Generate a preview of this `Parquet` loader.

    Creates a summary representation of the loader for quick inspection.

    Args:
        format: The output format for the preview. Options include:

            - [x] "ascii": Text-based format for terminal display
            - [x] "json": JSON-formatted data for programmatic use

    Returns:
        A string or dictionary representing the loader, depending on the format.

    Raises:
        ValueError: If an unsupported format is requested.
    """
    cols = self.columns if self.columns else "All columns"

    if format == "ascii":
        return (
            f"Loader: ParquetLoader\n"
            f"  File: {self.file_path}\n"
            f"  Latitude Column: {self.latitude_column}\n"
            f"  Longitude Column: {self.longitude_column}\n"
            f"  Geometry Column: {self.geometry_column}\n"
            f"  Engine: {self.engine}\n"
            f"  Columns: {cols}\n"
            f"  CRS: {self.coordinate_reference_system}\n"
            f"  Additional params: {self.additional_loader_parameters}\n"
        )
    elif format == "json":
        return {
            "loader": "ParquetLoader",
            "file": self.file_path,
            "latitude_column": self.latitude_column,
            "longitude_column": self.longitude_column,
            "geometry_column": self.geometry_column,
            "engine": self.engine,
            "columns": cols,
            "coordinate_reference_system": self.coordinate_reference_system,
            "additional_params": self.additional_loader_parameters,
        }
    else:
        raise ValueError(f"Unsupported format '{format}'")

ShapefileLoader

Bases: FileLoaderBase

Loader for shapefiles containing spatial data.

This loader reads data from shapefiles and returns a GeoDataFrame. Shapefiles inherently contain geometry information, so explicit latitude and longitude columns are not required. However, if specified, they can be used; otherwise, representative points are generated.

Representative points are a simplified representation of the geometry, which can be useful for visualisations or when the geometry is complex. The loader will automatically create temporary columns for latitude and longitude if they are not provided or if the specified columns contain only NaN values.

Attributes:

Name Type Description
file_path Union[str, Path]

Path to the shapefile to load.

latitude_column Optional[str]

Name of the column containing latitude values. If not provided or empty, a temporary latitude column is generated from representative points. Default: None

longitude_column Optional[str]

Name of the column containing longitude values. If not provided or empty, a temporary longitude column is generated from representative points. Default: None

coordinate_reference_system Union[str, Tuple[str, str]]

If a string, it specifies the coordinate reference system to use (default: 'EPSG:4326'). If a tuple (source_crs, target_crs), it defines a conversion from the source CRS to the target CRS (default target CRS: 'EPSG:4326').

Examples:

>>> from urban_mapper.modules.loader import ShapefileLoader
>>>
>>> # Basic usage
>>> loader = ShapefileLoader(
...     file_path="data.shp"
... )
>>> gdf = loader.load()
>>>
>>> # With specified latitude and longitude columns
>>> loader = ShapefileLoader(
...     file_path="data.shp",
...     latitude_column="lat",
...     longitude_column="lon"
... )
>>> gdf = loader.load()
Source code in src/urban_mapper/modules/loader/loaders/shapefile_loader.py
@beartype
class ShapefileLoader(FileLoaderBase):
    """Loader for `shapefiles` containing spatial data.

    This loader reads data from `shapefiles` and returns a `GeoDataFrame`. Shapefiles
    inherently contain geometry information, so explicit latitude and longitude
    columns are not required. However, if specified, they can be used; otherwise,
    `representative points` are generated.

    `Representative points` are a simplified representation of the geometry, which can be
    useful for visualisations or when the geometry is complex. The loader will
    automatically create temporary columns for latitude and longitude if they are not
    provided or if the specified columns contain only `NaN` values.

    Attributes:
        file_path (Union[str, Path]): Path to the `shapefile` to load.
        latitude_column (Optional[str]): Name of the column containing latitude values. If not provided or empty,
            a temporary latitude column is generated from representative points. Default: `None`
        longitude_column (Optional[str]): Name of the column containing longitude values. If not provided or empty,
            a temporary longitude column is generated from representative points. Default: `None`
        coordinate_reference_system (Union[str, Tuple[str, str]]):
            If a string, it specifies the coordinate reference system to use (default: 'EPSG:4326').
            If a tuple (source_crs, target_crs), it defines a conversion from the source CRS to the target CRS (default target CRS: 'EPSG:4326').

    Examples:
        >>> from urban_mapper.modules.loader import ShapefileLoader
        >>>
        >>> # Basic usage
        >>> loader = ShapefileLoader(
        ...     file_path="data.shp"
        ... )
        >>> gdf = loader.load()
        >>>
        >>> # With specified latitude and longitude columns
        >>> loader = ShapefileLoader(
        ...     file_path="data.shp",
        ...     latitude_column="lat",
        ...     longitude_column="lon"
        ... )
        >>> gdf = loader.load()
    """

    def _load(self) -> gpd.GeoDataFrame:
        """Load data from a shapefile and return a `GeoDataFrame`.

        This method reads a `shapefile` using geopandas, ensures it has a geometry column,
        reprojects it to the specified `CRS` if necessary, and handles latitude and
        longitude columns. If latitude and longitude columns are not provided or are
        empty, it generates temporary columns using `representative points` of the geometries.

        Returns:
            A `GeoDataFrame` containing the loaded data with geometries and
            latitude/longitude columns as specified or generated.

        Raises:
            ValueError: If no geometry column is found in the shapefile.
            Exception: If the shapefile cannot be read (e.g., file not found or invalid format).
        """
        gdf = gpd.read_file(self.file_path)

        if "geometry" not in gdf.columns:
            raise ValueError(
                "No geometry column found in shapefile. "
                "Standard shapefile format requires a geometry column."
            )

        coord_system = (
            self.coordinate_reference_system[0]
            if isinstance(self.coordinate_reference_system, tuple)
            else self.coordinate_reference_system
        )

        if gdf.crs.to_string() != coord_system:
            gdf = gdf.to_crs(coord_system)

        if (
            not self.latitude_column
            or not self.longitude_column
            or gdf[self.latitude_column].isna().all()
            or gdf[self.longitude_column].isna().all()
        ):
            gdf["representative_points"] = gdf.geometry.representative_point()
            gdf["temporary_longitude"] = gdf["representative_points"].x
            gdf["temporary_latitude"] = gdf["representative_points"].y
            self.latitude_column = "temporary_latitude"
            self.longitude_column = "temporary_longitude"

        return gdf

    def preview(self, format: str = "ascii") -> Any:
        """Generate a preview of this `CSV` loader.

        Creates a summary representation of the loader for quick inspection.

        Args:
            format: The output format for the preview. Options include:

                - [x] "ascii": Text-based format for terminal display
                - [x] "json": JSON-formatted data for programmatic use

        Returns:
            A string or dictionary representing the loader, depending on the format.

        Raises:
            ValueError: If an unsupported format is requested.
        """
        lat_col = self.latitude_column or "temporary_latitude (generated)"
        lon_col = self.longitude_column or "temporary_longitude (generated)"

        if format == "ascii":
            return (
                f"Loader: ShapefileLoader\n"
                f"  File: {self.file_path}\n"
                f"  Latitude Column: {lat_col}\n"
                f"  Longitude Column: {lon_col}\n"
                f"  CRS: {self.coordinate_reference_system}\n"
                f"  Additional params: {self.additional_loader_parameters}\n"
            )
        elif format == "json":
            return {
                "loader": "ShapefileLoader",
                "file": self.file_path,
                "latitude_column": lat_col,
                "longitude_column": lon_col,
                "crs": self.coordinate_reference_system,
                "additional_params": self.additional_loader_parameters,
            }
        else:
            raise ValueError(f"Unsupported format: {format}")

_load()

Load data from a shapefile and return a GeoDataFrame.

This method reads a shapefile using geopandas, ensures it has a geometry column, reprojects it to the specified CRS if necessary, and handles latitude and longitude columns. If latitude and longitude columns are not provided or are empty, it generates temporary columns using representative points of the geometries.

Returns:

Type Description
GeoDataFrame

A GeoDataFrame containing the loaded data with geometries and

GeoDataFrame

latitude/longitude columns as specified or generated.

Raises:

Type Description
ValueError

If no geometry column is found in the shapefile.

Exception

If the shapefile cannot be read (e.g., file not found or invalid format).

Source code in src/urban_mapper/modules/loader/loaders/shapefile_loader.py
def _load(self) -> gpd.GeoDataFrame:
    """Load data from a shapefile and return a `GeoDataFrame`.

    This method reads a `shapefile` using geopandas, ensures it has a geometry column,
    reprojects it to the specified `CRS` if necessary, and handles latitude and
    longitude columns. If latitude and longitude columns are not provided or are
    empty, it generates temporary columns using `representative points` of the geometries.

    Returns:
        A `GeoDataFrame` containing the loaded data with geometries and
        latitude/longitude columns as specified or generated.

    Raises:
        ValueError: If no geometry column is found in the shapefile.
        Exception: If the shapefile cannot be read (e.g., file not found or invalid format).
    """
    gdf = gpd.read_file(self.file_path)

    if "geometry" not in gdf.columns:
        raise ValueError(
            "No geometry column found in shapefile. "
            "Standard shapefile format requires a geometry column."
        )

    coord_system = (
        self.coordinate_reference_system[0]
        if isinstance(self.coordinate_reference_system, tuple)
        else self.coordinate_reference_system
    )

    if gdf.crs.to_string() != coord_system:
        gdf = gdf.to_crs(coord_system)

    if (
        not self.latitude_column
        or not self.longitude_column
        or gdf[self.latitude_column].isna().all()
        or gdf[self.longitude_column].isna().all()
    ):
        gdf["representative_points"] = gdf.geometry.representative_point()
        gdf["temporary_longitude"] = gdf["representative_points"].x
        gdf["temporary_latitude"] = gdf["representative_points"].y
        self.latitude_column = "temporary_latitude"
        self.longitude_column = "temporary_longitude"

    return gdf

preview(format='ascii')

Generate a preview of this CSV loader.

Creates a summary representation of the loader for quick inspection.

Parameters:

Name Type Description Default
format str

The output format for the preview. Options include:

  • "ascii": Text-based format for terminal display
  • "json": JSON-formatted data for programmatic use
'ascii'

Returns:

Type Description
Any

A string or dictionary representing the loader, depending on the format.

Raises:

Type Description
ValueError

If an unsupported format is requested.

Source code in src/urban_mapper/modules/loader/loaders/shapefile_loader.py
def preview(self, format: str = "ascii") -> Any:
    """Generate a preview of this `CSV` loader.

    Creates a summary representation of the loader for quick inspection.

    Args:
        format: The output format for the preview. Options include:

            - [x] "ascii": Text-based format for terminal display
            - [x] "json": JSON-formatted data for programmatic use

    Returns:
        A string or dictionary representing the loader, depending on the format.

    Raises:
        ValueError: If an unsupported format is requested.
    """
    lat_col = self.latitude_column or "temporary_latitude (generated)"
    lon_col = self.longitude_column or "temporary_longitude (generated)"

    if format == "ascii":
        return (
            f"Loader: ShapefileLoader\n"
            f"  File: {self.file_path}\n"
            f"  Latitude Column: {lat_col}\n"
            f"  Longitude Column: {lon_col}\n"
            f"  CRS: {self.coordinate_reference_system}\n"
            f"  Additional params: {self.additional_loader_parameters}\n"
        )
    elif format == "json":
        return {
            "loader": "ShapefileLoader",
            "file": self.file_path,
            "latitude_column": lat_col,
            "longitude_column": lon_col,
            "crs": self.coordinate_reference_system,
            "additional_params": self.additional_loader_parameters,
        }
    else:
        raise ValueError(f"Unsupported format: {format}")

DataFrameLoader

Bases: LoaderBase

Loader for DataFrame object containing spatial data.

This loader reads data from a pandas DataFrame object and converts them to GeoDataFrames with point geometries. It requires latitude and longitude columns to create point geometries for each row.

Attributes:

Name Type Description
input_dataframe DataFrame

Original DataFrame object.

latitude_column str

Name of the column containing latitude values.

longitude_column str

Name of the column containing longitude values.

geometry_column str

Name of the column containing geometry data in WKT format.

coordinate_reference_system Union[str, Tuple[str, str]]

If a string, it specifies the coordinate reference system to use (default: 'EPSG:4326'). If a tuple (source_crs, target_crs), it defines a conversion from the source CRS to the target CRS (default target CRS: 'EPSG:4326').

Examples:

>>> from urban_mapper.modules.loader import DataFrameLoader
>>>
>>> # Load/create a `dataframe` object
...
>>> # Basic usage with lat/long
>>> loader = DataFrameLoader(
...     input_dataframe=dataframe,
...     latitude_column="pickup_lat",
...     longitude_column="pickup_lng"
... )
>>> gdf = loader.load()
>>>
>>> # Basic usage with geometry
>>> loader = DataFrameLoader(
...     input_dataframe=dataframe,
...     geometry_column="the_geom"
... )
>>> gdf = loader.load()
>>>
>>> # With custom separator and encoding
>>> loader = DataFrameLoader(
...     input_dataframe=dataframe,
...     geometry_column="geom",
...     separator=";",
...     encoding="latin-1"
... )
>>> gdf = loader.load()
>>>
>>> # With CRS
>>> loader = DataFrameLoader(
...     input_dataframe=dataframe,
...     latitude_column="lat",
...     longitude_column="lng",
...     coordinate_reference_system="EPSG:4326"
... )
>>> gdf = loader.load()
>>>
>>> # With source-target CRS
>>> loader = DataFrameLoader(
...     input_dataframe=dataframe,
...     latitude_column="lat",
...     longitude_column="lng",
...     coordinate_reference_system=("EPSG:4326", "EPSG:3857")
... )
>>> gdf = loader.load()
Source code in src/urban_mapper/modules/loader/loaders/dataframe_loader.py
@beartype
class DataFrameLoader(LoaderBase):
    """Loader for `DataFrame` object containing spatial data.

    This loader reads data from a  pandas `DataFrame` object and
    converts them to `GeoDataFrames` with point geometries. It requires latitude
    and longitude columns to create point geometries for each row.

    Attributes:
        input_dataframe (DataFrame): Original DataFrame object.
        latitude_column (str): Name of the column containing latitude values.
        longitude_column (str): Name of the column containing longitude values.
        geometry_column (str): Name of the column containing geometry data in WKT format.
        coordinate_reference_system (Union[str, Tuple[str, str]]):
            If a string, it specifies the coordinate reference system to use (default: 'EPSG:4326').
            If a tuple (source_crs, target_crs), it defines a conversion from the source CRS to the target CRS (default target CRS: 'EPSG:4326').

    Examples:
        >>> from urban_mapper.modules.loader import DataFrameLoader
        >>>
        >>> # Load/create a `dataframe` object
        ...
        >>> # Basic usage with lat/long
        >>> loader = DataFrameLoader(
        ...     input_dataframe=dataframe,
        ...     latitude_column="pickup_lat",
        ...     longitude_column="pickup_lng"
        ... )
        >>> gdf = loader.load()
        >>>
        >>> # Basic usage with geometry
        >>> loader = DataFrameLoader(
        ...     input_dataframe=dataframe,
        ...     geometry_column="the_geom"
        ... )
        >>> gdf = loader.load()
        >>>
        >>> # With custom separator and encoding
        >>> loader = DataFrameLoader(
        ...     input_dataframe=dataframe,
        ...     geometry_column="geom",
        ...     separator=";",
        ...     encoding="latin-1"
        ... )
        >>> gdf = loader.load()
        >>>
        >>> # With CRS
        >>> loader = DataFrameLoader(
        ...     input_dataframe=dataframe,
        ...     latitude_column="lat",
        ...     longitude_column="lng",
        ...     coordinate_reference_system="EPSG:4326"
        ... )
        >>> gdf = loader.load()
        >>>
        >>> # With source-target CRS
        >>> loader = DataFrameLoader(
        ...     input_dataframe=dataframe,
        ...     latitude_column="lat",
        ...     longitude_column="lng",
        ...     coordinate_reference_system=("EPSG:4326", "EPSG:3857")
        ... )
        >>> gdf = loader.load()
    """

    def __init__(
        self,
        input_dataframe: Union[pd.DataFrame, gpd.GeoDataFrame],
        latitude_column: Optional[str] = None,
        longitude_column: Optional[str] = None,
        geometry_column: Optional[str] = None,
        coordinate_reference_system: Union[str, Tuple[str, str]] = DEFAULT_CRS,
        **additional_loader_parameters: Any,
    ) -> None:
        super().__init__(
            latitude_column=latitude_column,
            longitude_column=longitude_column,
            geometry_column=geometry_column,
            coordinate_reference_system=coordinate_reference_system,
            **additional_loader_parameters,
        )
        self.dataframe = input_dataframe.copy()

    def _load(self) -> gpd.GeoDataFrame:
        """Load spatial data from a dataframe.

        This is the main public method for using `loaders`. It performs validation
        on the inputs before delegating to the implementation-specific `_load` method.
        It also ensures the file exists and that the coordinate reference system is properly set.

        Returns:
            A `GeoDataFrame` containing the loaded spatial data.

        Raises:
            FileNotFoundError: If the file does not exist.
            ValueError: If required columns are missing or the file format is invalid.

        Examples:
            >>> from urban_mapper.modules.loader import DataFrameLoader
            >>> loader = DataFrameLoader(dataframe, latitude_column="pickup_lat", longitude_column="pickup_lng")
            >>> gdf = loader.load()
        """
        if isinstance(self.dataframe, gpd.GeoDataFrame):
            geo_dataframe: gpd.GeoDataFrame = self.dataframe
        else:
            if self.latitude_column != "" and self.longitude_column != "":
                # Ensure latitude and longitude columns are numeric
                self.dataframe[self.latitude_column] = pd.to_numeric(
                    self.dataframe[self.latitude_column], errors="coerce"
                )
                self.dataframe[self.longitude_column] = pd.to_numeric(
                    self.dataframe[self.longitude_column], errors="coerce"
                )                
                geometry = gpd.points_from_xy(
                    self.dataframe[self.longitude_column],
                    self.dataframe[self.latitude_column],
                )
            else:
                filter_not_na = self.dataframe[self.geometry_column].notna()
                self.dataframe.loc[filter_not_na, self.geometry_column] = (
                    self.dataframe.loc[filter_not_na, self.geometry_column].apply(
                        wkt.loads
                    )
                )
                geometry = self.geometry_column

            geo_dataframe = gpd.GeoDataFrame(
                self.dataframe,
                geometry=geometry,
                crs=self.coordinate_reference_system[0]
                if isinstance(self.coordinate_reference_system, tuple)
                else self.coordinate_reference_system,
            )

        target_coordinate_reference_system = (
            self.coordinate_reference_system[1]
            if isinstance(self.coordinate_reference_system, tuple)
            else self.coordinate_reference_system
        )

        if geo_dataframe.crs is None:
            geo_dataframe.set_crs(target_coordinate_reference_system, inplace=True)
        elif geo_dataframe.crs.to_string() != target_coordinate_reference_system:
            geo_dataframe = geo_dataframe.to_crs(target_coordinate_reference_system)

        return geo_dataframe

    def preview(self, format: str = "ascii") -> Any:
        """Generate a preview of this `DataFrameLoader` loader.

        Creates a summary representation of the loader for quick inspection.

        Args:
            format: The output format for the preview. Options include:

                - [x] "ascii": Text-based format for terminal display
                - [x] "json": JSON-formatted data for programmatic use

        Returns:
            A string or dictionary representing the loader, depending on the format.

        Raises:
            ValueError: If an unsupported format is requested.
        """
        if format == "ascii":
            return (
                f"Loader: DataFrameLoader\n"
                f"  Latitude Column: {self.latitude_column}\n"
                f"  Longitude Column: {self.longitude_column}\n"
                f"  Geometry Column: {self.geometry_column}\n"
                f"  CRS: {self.coordinate_reference_system}\n"
                f"  Additional params: {self.additional_loader_parameters}\n"
            )
        elif format == "json":
            return {
                "loader": "DataFrameLoader",
                "latitude_column": self.latitude_column,
                "longitude_column": self.longitude_column,
                "geometry_column": self.geometry_column,
                "crs": self.coordinate_reference_system,
                "additional_params": self.additional_loader_parameters,
            }
        else:
            raise ValueError(f"Unsupported format: {format}")

_load()

Load spatial data from a dataframe.

This is the main public method for using loaders. It performs validation on the inputs before delegating to the implementation-specific _load method. It also ensures the file exists and that the coordinate reference system is properly set.

Returns:

Type Description
GeoDataFrame

A GeoDataFrame containing the loaded spatial data.

Raises:

Type Description
FileNotFoundError

If the file does not exist.

ValueError

If required columns are missing or the file format is invalid.

Examples:

>>> from urban_mapper.modules.loader import DataFrameLoader
>>> loader = DataFrameLoader(dataframe, latitude_column="pickup_lat", longitude_column="pickup_lng")
>>> gdf = loader.load()
Source code in src/urban_mapper/modules/loader/loaders/dataframe_loader.py
def _load(self) -> gpd.GeoDataFrame:
    """Load spatial data from a dataframe.

    This is the main public method for using `loaders`. It performs validation
    on the inputs before delegating to the implementation-specific `_load` method.
    It also ensures the file exists and that the coordinate reference system is properly set.

    Returns:
        A `GeoDataFrame` containing the loaded spatial data.

    Raises:
        FileNotFoundError: If the file does not exist.
        ValueError: If required columns are missing or the file format is invalid.

    Examples:
        >>> from urban_mapper.modules.loader import DataFrameLoader
        >>> loader = DataFrameLoader(dataframe, latitude_column="pickup_lat", longitude_column="pickup_lng")
        >>> gdf = loader.load()
    """
    if isinstance(self.dataframe, gpd.GeoDataFrame):
        geo_dataframe: gpd.GeoDataFrame = self.dataframe
    else:
        if self.latitude_column != "" and self.longitude_column != "":
            # Ensure latitude and longitude columns are numeric
            self.dataframe[self.latitude_column] = pd.to_numeric(
                self.dataframe[self.latitude_column], errors="coerce"
            )
            self.dataframe[self.longitude_column] = pd.to_numeric(
                self.dataframe[self.longitude_column], errors="coerce"
            )                
            geometry = gpd.points_from_xy(
                self.dataframe[self.longitude_column],
                self.dataframe[self.latitude_column],
            )
        else:
            filter_not_na = self.dataframe[self.geometry_column].notna()
            self.dataframe.loc[filter_not_na, self.geometry_column] = (
                self.dataframe.loc[filter_not_na, self.geometry_column].apply(
                    wkt.loads
                )
            )
            geometry = self.geometry_column

        geo_dataframe = gpd.GeoDataFrame(
            self.dataframe,
            geometry=geometry,
            crs=self.coordinate_reference_system[0]
            if isinstance(self.coordinate_reference_system, tuple)
            else self.coordinate_reference_system,
        )

    target_coordinate_reference_system = (
        self.coordinate_reference_system[1]
        if isinstance(self.coordinate_reference_system, tuple)
        else self.coordinate_reference_system
    )

    if geo_dataframe.crs is None:
        geo_dataframe.set_crs(target_coordinate_reference_system, inplace=True)
    elif geo_dataframe.crs.to_string() != target_coordinate_reference_system:
        geo_dataframe = geo_dataframe.to_crs(target_coordinate_reference_system)

    return geo_dataframe

preview(format='ascii')

Generate a preview of this DataFrameLoader loader.

Creates a summary representation of the loader for quick inspection.

Parameters:

Name Type Description Default
format str

The output format for the preview. Options include:

  • "ascii": Text-based format for terminal display
  • "json": JSON-formatted data for programmatic use
'ascii'

Returns:

Type Description
Any

A string or dictionary representing the loader, depending on the format.

Raises:

Type Description
ValueError

If an unsupported format is requested.

Source code in src/urban_mapper/modules/loader/loaders/dataframe_loader.py
def preview(self, format: str = "ascii") -> Any:
    """Generate a preview of this `DataFrameLoader` loader.

    Creates a summary representation of the loader for quick inspection.

    Args:
        format: The output format for the preview. Options include:

            - [x] "ascii": Text-based format for terminal display
            - [x] "json": JSON-formatted data for programmatic use

    Returns:
        A string or dictionary representing the loader, depending on the format.

    Raises:
        ValueError: If an unsupported format is requested.
    """
    if format == "ascii":
        return (
            f"Loader: DataFrameLoader\n"
            f"  Latitude Column: {self.latitude_column}\n"
            f"  Longitude Column: {self.longitude_column}\n"
            f"  Geometry Column: {self.geometry_column}\n"
            f"  CRS: {self.coordinate_reference_system}\n"
            f"  Additional params: {self.additional_loader_parameters}\n"
        )
    elif format == "json":
        return {
            "loader": "DataFrameLoader",
            "latitude_column": self.latitude_column,
            "longitude_column": self.longitude_column,
            "geometry_column": self.geometry_column,
            "crs": self.coordinate_reference_system,
            "additional_params": self.additional_loader_parameters,
        }
    else:
        raise ValueError(f"Unsupported format: {format}")

HuggingFaceLoader

Bases: LoaderBase

Load a dataset from Hugging Face's Hub using the datasets library.

What Are Hugging Face Datasets?

πŸ€— Hugging Face Datasets is your gateway to a vast list of datasets tailored for various application domains such as urban computing. In a nuthsell, this library simplifies data access, letting you load datasets with a single line of code.

How to Find and Use Datasets: Head to the Hugging Face Datasets Hub, where you can search anything you like (e.g., "PLUTO" for NYC buildings information).

For from_huggingface, you need the repo_id of the dataset you want to load. To find the repo_id, look for the <namespace>/<dataset_name> format in each card displaying / dataset's URL. For example, click on one of the card / dataset of interest, and lookup for the website's URL. E.g. https://huggingface.co/datasets/oscur/pluto, the repo_id is oscur/pluto. The namespace is the organisation or user who created the dataset, and the dataset_name is the specific dataset name. In this case, oscur is the namespace and pluto is the dataset name.

OSCUR: Pioneering Urban Science

🌍 OSCUR (Open-Source Cyberinfrastructure for Urban Computing) integrates tools for data exploration, analytics, and machine learning, all while fostering a collaborative community to advance urban science.

All datasets used by any of the initiatives under OSCUR are open-source and available on Hugging Face Datasets Hub. As UrbanMapper is one of the initiatives under OSCUR, all datasets throughout our examples and case studies are available under the oscur namespace.

Feel free to explore our datasets, at https://huggingface.co/oscur.

Load them easily:

loader = mapper.loader.from_huggingface("oscur/taxisvis1M")

Dive deeper at oscur.org for other open-source initiatives and tools.

Potential Errors Explained

Mistakes happenβ€”here’s what might go wrong and how we help:

If repo_id is invalid, a ValueError pops up with smart suggestions powered by TheFuzz, a fuzzy matching library. We compare your input to existing datasets and offer the closest matches:

  • No Slash (e.g., plutoo): Assumes it’s a dataset name and suggests full repo_ids (e.g., oscur/pluto). Or closest matches.
  • Bad Namespace (e.g., oscurq/pluto): If the namespace doesn’t exist, we suggest similar ones (e.g., oscur).
  • Bad Dataset Name (e.g., oscur/plutoo): If the namespace is valid but the dataset isn’t, we suggest close matches.

Errors come with contextβ€”like available datasets in a namespaceβ€”so you can fix it fast.

Parameters:

Name Type Description Default
repo_id str

The dataset repository ID on Hugging Face.

required
number_of_rows Optional[int]

Number of rows to load. Defaults to None.

None
streaming Optional[bool]

Whether to use streaming mode. Defaults to False.

False
debug_limit_list_datasets Optional[int]

Limit on datasets fetched for error handling. Defaults to None.

None

Returns:

Name Type Description
LoaderFactory

The updated LoaderFactory instance for method chaining.

Raises:

Type Description
ValueError

If the dataset cannot be loaded due to an invalid repo_id or other issues.

Examples:

>>> # Load a full dataset
>>> loader = mapper.loader.from_huggingface("oscur/pluto")
>>> gdf = loader.load()
>>> print(gdf.head())  # Next steps: analyze or visualize the data
>>> # Load 500 rows with streaming (i.e without loading the entire dataset)
>>> loader = mapper.loader.from_huggingface("oscur/NYC_311", number_of_rows=500, streaming=True)
>>> gdf = loader.load()
>>> print(gdf.head())  # Next steps: process the loaded subset
>>> # Load 1000 rows without streaming
>>> loader = mapper.loader.from_huggingface("oscur/taxisvis1M", number_of_rows=1000)
>>> gdf = loader.load()
>>> print(gdf.head())  # Next steps: explore the sliced data
>>> # Handle typo in namespace
>>> try:
...     loader = mapper.loader.from_huggingface("oscurq/pluto")
... except ValueError as e:
...     print(e)
ValueError: The repository 'oscurq' does not exist on Hugging Face. Maybe you meant one of these:
- oscur (similarity: 90%)
- XXX (similarity: 85%)
>>> # Handle typo in dataset name
>>> try:
...     loader = mapper.loader.from_huggingface("oscur/plutoo")
... except ValueError as e:
...     print(e)
ValueError: The dataset 'plutoo' does not exist in repository 'oscur'. Maybe you meant one of these:
- oscur/pluto (similarity: 90%)
- XXX (similarity: 80%)
>>> # Handle input without namespace
>>> try:
...     loader = mapper.loader.from_huggingface("plutoo")
... except ValueError as e:
...     print(e)
ValueError: The dataset 'plutoo' does not exist on Hugging Face. Maybe you meant one of these:
- oscur/pluto (similarity: 90%)
- XXX (similarity: 85%)
Source code in src/urban_mapper/modules/loader/loaders/huggingface_loader.py
@beartype
class HuggingFaceLoader(LoaderBase):
    """
    Load a dataset from `Hugging Face's Hub` using the `datasets` library.

    !!! info "What Are Hugging Face Datasets?"
        πŸ€— **Hugging Face Datasets** is your gateway to a vast list of datasets tailored for various application domains
        such as urban computing. In a nuthsell, this library simplifies data access, letting you load datasets
        with a single line of code.

        **How to Find and Use Datasets**: Head to the [Hugging Face Datasets Hub](https://huggingface.co/datasets),
        where you can search anything you like (e.g., "PLUTO" for NYC buildings information).

        For `from_huggingface`, you need the `repo_id` of the dataset you want to load. To find the `repo_id`, look for the
        `<namespace>/<dataset_name>` format in each card displaying / dataset's URL.
        For example, click on one of the card / dataset of interest, and lookup for the website's URL. E.g. `https://huggingface.co/datasets/oscur/pluto`,
        the `repo_id` is `oscur/pluto`. The `namespace` is the organisation or user who created the dataset,
        and the `dataset_name` is the specific dataset name.
        In this case, `oscur` is the namespace and `pluto` is the dataset name.

    !!! success "OSCUR: Pioneering Urban Science"
        🌍 **OSCUR** (Open-Source Cyberinfrastructure for Urban Computing) integrates tools for data exploration,
        analytics, and machine learning, all while fostering a collaborative community to advance urban science.

        All datasets used by any of the initiatives under OSCUR are open-source and available on Hugging Face
        Datasets Hub. As `UrbanMapper` is one of the initiatives under OSCUR, all datasets throughout our examples
        and case studies are available under the `oscur` namespace.

        Feel free to explore our datasets, at [https://huggingface.co/oscur](https://huggingface.co/oscur).

        Load them easily:
        ```python
        loader = mapper.loader.from_huggingface("oscur/taxisvis1M")
        ```

        Dive deeper at [oscur.org](https://oscur.org/) for other open-source initiatives and tools.

    !!! warning "Potential Errors Explained"
        Mistakes happenβ€”here’s what might go wrong and how we help:

        If `repo_id` is invalid, a `ValueError` pops up with smart suggestions powered by
        [TheFuzz](https://github.com/seatgeek/thefuzz), a fuzzy matching library. We compare your input to
        existing datasets and offer the closest matches:

        - **No Slash (e.g., `plutoo`)**: Assumes it’s a dataset name and suggests full `repo_id`s (e.g., `oscur/pluto`). Or closest matches.
        - **Bad Namespace (e.g., `oscurq/pluto`)**: If the namespace doesn’t exist, we suggest similar ones (e.g., `oscur`).
        - **Bad Dataset Name (e.g., `oscur/plutoo`)**: If the namespace is valid but the dataset isn’t, we suggest close matches.

        Errors come with contextβ€”like available datasets in a namespaceβ€”so you can fix it fast.

    Args:
        repo_id (str): The dataset repository ID on Hugging Face.
        number_of_rows (Optional[int]): Number of rows to load. Defaults to None.
        streaming (Optional[bool]): Whether to use streaming mode. Defaults to False.
        debug_limit_list_datasets (Optional[int]): Limit on datasets fetched for error handling. Defaults to None.

    Returns:
        LoaderFactory: The updated LoaderFactory instance for method chaining.

    Raises:
        ValueError: If the dataset cannot be loaded due to an invalid `repo_id` or other issues.

    Examples:
        >>> # Load a full dataset
        >>> loader = mapper.loader.from_huggingface("oscur/pluto")
        >>> gdf = loader.load()
        >>> print(gdf.head())  # Next steps: analyze or visualize the data

        >>> # Load 500 rows with streaming (i.e without loading the entire dataset)
        >>> loader = mapper.loader.from_huggingface("oscur/NYC_311", number_of_rows=500, streaming=True)
        >>> gdf = loader.load()
        >>> print(gdf.head())  # Next steps: process the loaded subset

        >>> # Load 1000 rows without streaming
        >>> loader = mapper.loader.from_huggingface("oscur/taxisvis1M", number_of_rows=1000)
        >>> gdf = loader.load()
        >>> print(gdf.head())  # Next steps: explore the sliced data

        >>> # Handle typo in namespace
        >>> try:
        ...     loader = mapper.loader.from_huggingface("oscurq/pluto")
        ... except ValueError as e:
        ...     print(e)
        ValueError: The repository 'oscurq' does not exist on Hugging Face. Maybe you meant one of these:
        - oscur (similarity: 90%)
        - XXX (similarity: 85%)

        >>> # Handle typo in dataset name
        >>> try:
        ...     loader = mapper.loader.from_huggingface("oscur/plutoo")
        ... except ValueError as e:
        ...     print(e)
        ValueError: The dataset 'plutoo' does not exist in repository 'oscur'. Maybe you meant one of these:
        - oscur/pluto (similarity: 90%)
        - XXX (similarity: 80%)

        >>> # Handle input without namespace
        >>> try:
        ...     loader = mapper.loader.from_huggingface("plutoo")
        ... except ValueError as e:
        ...     print(e)
        ValueError: The dataset 'plutoo' does not exist on Hugging Face. Maybe you meant one of these:
        - oscur/pluto (similarity: 90%)
        - XXX (similarity: 85%)

    """

    def __init__(
        self,
        repo_id: str,
        number_of_rows: Optional[int] = None,
        streaming: Optional[bool] = False,
        debug_limit_list_datasets: Optional[int] = None,
        latitude_column: Optional[str] = None,
        longitude_column: Optional[str] = None,
        geometry_column: Optional[str] = None,
        coordinate_reference_system: Union[str, Tuple[str, str]] = DEFAULT_CRS,
        **additional_loader_parameters: Any,
    ) -> None:
        super().__init__(
            latitude_column=latitude_column,
            longitude_column=longitude_column,
            geometry_column=geometry_column,
            coordinate_reference_system=coordinate_reference_system,
            **additional_loader_parameters,
        )
        self.repo_id = repo_id
        self.number_of_rows = number_of_rows
        self.streaming = streaming
        self.debug_limit_list_datasets = debug_limit_list_datasets
        self.source_data = None

    def _load(self) -> gpd.GeoDataFrame:
        try:
            if self.number_of_rows:
                if self.streaming:
                    # Use streaming mode to fetch only the required rows
                    dataset = datasets.load_dataset(
                        self.repo_id, split="train", streaming=True
                    )
                    limited_rows = list(islice(dataset, self.number_of_rows))
                    self.source_data = pd.DataFrame(limited_rows)
                    logger.log(
                        "DEBUG_LOW",
                        f"Loaded {self.number_of_rows} rows in streaming mode from {self.repo_id}.",
                    )
                else:
                    # Use slicing with split for non-streaming mode
                    dataset = datasets.load_dataset(
                        self.repo_id, split=f"train[:{self.number_of_rows}]"
                    )
                    self.source_data = pd.DataFrame(dataset)
                    logger.log(
                        "DEBUG_LOW",
                        f"Loaded {self.number_of_rows} rows from {self.repo_id}.",
                    )
            else:
                dataset = datasets.load_dataset(self.repo_id, split="train")
                self.source_data = pd.DataFrame(dataset)
                logger.log("DEBUG_LOW", f"Loaded dataset {self.repo_id}.")

            self.additional_loader_parameters.pop("input_dataframe", None)
            dataframe_loader = DataFrameLoader(
                input_dataframe=self.source_data,
                latitude_column=self.latitude_column,
                longitude_column=self.longitude_column,
                geometry_column=self.geometry_column,
                coordinate_reference_system=self.coordinate_reference_system,
                **self.additional_loader_parameters,
            )

            return dataframe_loader.load()

        except datasets.exceptions.DatasetNotFoundError as e:
            dataset_dict = self._build_dataset_dict(
                limit=self.debug_limit_list_datasets
            )
            if "/" not in self.repo_id:
                all_datasets = [
                    f"{repo}/{ds}"
                    for repo, ds_list in dataset_dict.items()
                    for ds in ds_list
                ]
                matches = process.extract(
                    self.repo_id,
                    all_datasets,
                    processor=lambda x: x.split("/")[-1] if "/" in x else x,
                )
                filtered_matches = [
                    (match, score) for match, score in matches if score > 80
                ]
                top_matches = filtered_matches[:10]
                suggestions = [
                    f"{match} (similarity: {score}%)" for match, score in top_matches
                ]
                suggestion_text = (
                    " Maybe you meant one of these:\n" + "\n".join(suggestions)
                    if suggestions
                    else ""
                )
                raise ValueError(
                    f"The dataset '{self.repo_id}' does not exist on Hugging Face. "
                    f"Please verify the dataset ID.{suggestion_text}"
                ) from e
            else:
                repo_name, dataset_name = self.repo_id.split("/", 1)
                if repo_name not in dataset_dict:
                    all_repos = list(dataset_dict.keys())
                    matches = process.extract(repo_name, all_repos, limit=1000)
                    filtered_matches = [
                        (match, score) for match, score in matches if score > 80
                    ]
                    top_matches = filtered_matches[:10]
                    suggestions = [
                        f"{match} (similarity: {score}%)"
                        for match, score in top_matches
                    ]
                    suggestion_text = (
                        " Maybe you meant one of these:\n" + "\n".join(suggestions)
                        if suggestions
                        else ""
                    )
                    raise ValueError(
                        f"The repository '{repo_name}' does not exist on Hugging Face. "
                        f"Please verify the repository name.{suggestion_text}"
                    ) from e
                else:
                    available_datasets = dataset_dict[repo_name]
                    matches = process.extract(
                        dataset_name, available_datasets, limit=None
                    )
                    filtered_matches = [
                        (match, score) for match, score in matches if score > 80
                    ]
                    top_matches = filtered_matches[:10]
                    suggestions = [
                        f"{repo_name}/{match} (similarity: {score}%)"
                        for match, score in top_matches
                    ]
                    suggestion_text = (
                        " Maybe you meant one of these:\n" + "\n".join(suggestions)
                        if suggestions
                        else ""
                    )
                    raise ValueError(
                        f"The dataset '{dataset_name}' does not exist in repository '{repo_name}'. "
                        f"Available datasets: {', '.join(available_datasets)}.{suggestion_text}"
                    ) from e

        except Exception as e:
            raise ValueError(f"Error loading dataset '{self.repo_id}': {str(e)}") from e

    def preview(self, format: str = "ascii") -> Any:
        """Generate a preview of this `DataFrameLoader` loader.

        Creates a summary representation of the loader for quick inspection.

        Args:
            format: The output format for the preview. Options include:

                - [x] "ascii": Text-based format for terminal display
                - [x] "json": JSON-formatted data for programmatic use

        Returns:
            A string or dictionary representing the loader, depending on the format.

        Raises:
            ValueError: If an unsupported format is requested.
        """
        if format == "ascii":
            return (
                f"Loader: DataFrameLoader\n"
                f"  Latitude Column: {self.latitude_column}\n"
                f"  Longitude Column: {self.longitude_column}\n"
                f"  Geometry Column: {self.geometry_column}\n"
                f"  CRS: {self.coordinate_reference_system}\n"
                f"  Additional params: {self.additional_loader_parameters}\n"
            )
        elif format == "json":
            return {
                "loader": "DataFrameLoader",
                "latitude_column": self.latitude_column,
                "longitude_column": self.longitude_column,
                "geometry_column": self.geometry_column,
                "crs": self.coordinate_reference_system,
                "additional_params": self.additional_loader_parameters,
            }
        else:
            raise ValueError(f"Unsupported format: {format}")

_load()

Source code in src/urban_mapper/modules/loader/loaders/huggingface_loader.py
def _load(self) -> gpd.GeoDataFrame:
    try:
        if self.number_of_rows:
            if self.streaming:
                # Use streaming mode to fetch only the required rows
                dataset = datasets.load_dataset(
                    self.repo_id, split="train", streaming=True
                )
                limited_rows = list(islice(dataset, self.number_of_rows))
                self.source_data = pd.DataFrame(limited_rows)
                logger.log(
                    "DEBUG_LOW",
                    f"Loaded {self.number_of_rows} rows in streaming mode from {self.repo_id}.",
                )
            else:
                # Use slicing with split for non-streaming mode
                dataset = datasets.load_dataset(
                    self.repo_id, split=f"train[:{self.number_of_rows}]"
                )
                self.source_data = pd.DataFrame(dataset)
                logger.log(
                    "DEBUG_LOW",
                    f"Loaded {self.number_of_rows} rows from {self.repo_id}.",
                )
        else:
            dataset = datasets.load_dataset(self.repo_id, split="train")
            self.source_data = pd.DataFrame(dataset)
            logger.log("DEBUG_LOW", f"Loaded dataset {self.repo_id}.")

        self.additional_loader_parameters.pop("input_dataframe", None)
        dataframe_loader = DataFrameLoader(
            input_dataframe=self.source_data,
            latitude_column=self.latitude_column,
            longitude_column=self.longitude_column,
            geometry_column=self.geometry_column,
            coordinate_reference_system=self.coordinate_reference_system,
            **self.additional_loader_parameters,
        )

        return dataframe_loader.load()

    except datasets.exceptions.DatasetNotFoundError as e:
        dataset_dict = self._build_dataset_dict(
            limit=self.debug_limit_list_datasets
        )
        if "/" not in self.repo_id:
            all_datasets = [
                f"{repo}/{ds}"
                for repo, ds_list in dataset_dict.items()
                for ds in ds_list
            ]
            matches = process.extract(
                self.repo_id,
                all_datasets,
                processor=lambda x: x.split("/")[-1] if "/" in x else x,
            )
            filtered_matches = [
                (match, score) for match, score in matches if score > 80
            ]
            top_matches = filtered_matches[:10]
            suggestions = [
                f"{match} (similarity: {score}%)" for match, score in top_matches
            ]
            suggestion_text = (
                " Maybe you meant one of these:\n" + "\n".join(suggestions)
                if suggestions
                else ""
            )
            raise ValueError(
                f"The dataset '{self.repo_id}' does not exist on Hugging Face. "
                f"Please verify the dataset ID.{suggestion_text}"
            ) from e
        else:
            repo_name, dataset_name = self.repo_id.split("/", 1)
            if repo_name not in dataset_dict:
                all_repos = list(dataset_dict.keys())
                matches = process.extract(repo_name, all_repos, limit=1000)
                filtered_matches = [
                    (match, score) for match, score in matches if score > 80
                ]
                top_matches = filtered_matches[:10]
                suggestions = [
                    f"{match} (similarity: {score}%)"
                    for match, score in top_matches
                ]
                suggestion_text = (
                    " Maybe you meant one of these:\n" + "\n".join(suggestions)
                    if suggestions
                    else ""
                )
                raise ValueError(
                    f"The repository '{repo_name}' does not exist on Hugging Face. "
                    f"Please verify the repository name.{suggestion_text}"
                ) from e
            else:
                available_datasets = dataset_dict[repo_name]
                matches = process.extract(
                    dataset_name, available_datasets, limit=None
                )
                filtered_matches = [
                    (match, score) for match, score in matches if score > 80
                ]
                top_matches = filtered_matches[:10]
                suggestions = [
                    f"{repo_name}/{match} (similarity: {score}%)"
                    for match, score in top_matches
                ]
                suggestion_text = (
                    " Maybe you meant one of these:\n" + "\n".join(suggestions)
                    if suggestions
                    else ""
                )
                raise ValueError(
                    f"The dataset '{dataset_name}' does not exist in repository '{repo_name}'. "
                    f"Available datasets: {', '.join(available_datasets)}.{suggestion_text}"
                ) from e

    except Exception as e:
        raise ValueError(f"Error loading dataset '{self.repo_id}': {str(e)}") from e

preview(format='ascii')

Generate a preview of this DataFrameLoader loader.

Creates a summary representation of the loader for quick inspection.

Parameters:

Name Type Description Default
format str

The output format for the preview. Options include:

  • "ascii": Text-based format for terminal display
  • "json": JSON-formatted data for programmatic use
'ascii'

Returns:

Type Description
Any

A string or dictionary representing the loader, depending on the format.

Raises:

Type Description
ValueError

If an unsupported format is requested.

Source code in src/urban_mapper/modules/loader/loaders/huggingface_loader.py
def preview(self, format: str = "ascii") -> Any:
    """Generate a preview of this `DataFrameLoader` loader.

    Creates a summary representation of the loader for quick inspection.

    Args:
        format: The output format for the preview. Options include:

            - [x] "ascii": Text-based format for terminal display
            - [x] "json": JSON-formatted data for programmatic use

    Returns:
        A string or dictionary representing the loader, depending on the format.

    Raises:
        ValueError: If an unsupported format is requested.
    """
    if format == "ascii":
        return (
            f"Loader: DataFrameLoader\n"
            f"  Latitude Column: {self.latitude_column}\n"
            f"  Longitude Column: {self.longitude_column}\n"
            f"  Geometry Column: {self.geometry_column}\n"
            f"  CRS: {self.coordinate_reference_system}\n"
            f"  Additional params: {self.additional_loader_parameters}\n"
        )
    elif format == "json":
        return {
            "loader": "DataFrameLoader",
            "latitude_column": self.latitude_column,
            "longitude_column": self.longitude_column,
            "geometry_column": self.geometry_column,
            "crs": self.coordinate_reference_system,
            "additional_params": self.additional_loader_parameters,
        }
    else:
        raise ValueError(f"Unsupported format: {format}")

LoaderFactory

Factory class for creating and configuring data loaders.

This class implements a fluent chaining methods-based interface for creating and configuring data loaders.

The factory manages the details of loader instantiation, coordinate reference system conversion, column mapping, and other data loading concerns, providing a consistent interface regardless of the underlying data source.

Attributes:

Name Type Description
source_type Optional[str]

The type of data source ("file" or "dataframe").

source_data Optional[Union[str, DataFrame, GeoDataFrame]]

The actual data source (file path or dataframe).

latitude_column Optional[str]

The name of the column containing latitude values.

longitude_column Optional[str]

The name of the column containing longitude values.

crs Union[str, Tuple[str, str]]

The coordinate reference system to use for the loaded data.

_instance Optional[LoaderBase]

The underlying loader instance (internal use only).

_preview Optional[dict]

Preview configuration (internal use only).

Examples:

>>> from urban_mapper import UrbanMapper
>>> 
>>> # Initialise UrbanMapper
>>> mapper = UrbanMapper()
>>> 
>>> # Load data from a CSV file with coordinate columns
>>> gdf = (
...         mapper.loader\
...         .from_file("your_file_path.csv")\
...         .with_columns(longitude_column="lon", latitude_column="lat")\
...         .load()
...     )
>>>
>>> # Load data from a GeoDataFrame
>>> import geopandas as gpd
>>> existing_data = gpd.read_file("data/some_shapefile.shp")
>>> gdf = mapper.loader.from_dataframe(existing_data).load() # Concise inline manner
Source code in src/urban_mapper/modules/loader/loader_factory.py
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
@beartype
class LoaderFactory:
    """Factory class for creating and configuring data loaders.

    This class implements a fluent chaining methods-based interface for creating and configuring data loaders.

    The factory manages the details of `loader instantiation`, `coordinate reference system`
    conversion, `column mapping`, and other data loading concerns, providing a consistent
    interface regardless of the underlying data source.

    Attributes:
        source_type: The type of data source ("file" or "dataframe").
        source_data: The actual data source (file path or dataframe).
        latitude_column: The name of the column containing latitude values.
        longitude_column: The name of the column containing longitude values.
        crs: The coordinate reference system to use for the loaded data.
        _instance: The underlying loader instance (internal use only).
        _preview: Preview configuration (internal use only).

    Examples:
        >>> from urban_mapper import UrbanMapper
        >>> 
        >>> # Initialise UrbanMapper
        >>> mapper = UrbanMapper()
        >>> 
        >>> # Load data from a CSV file with coordinate columns
        >>> gdf = (
        ...         mapper.loader\\
        ...         .from_file("your_file_path.csv")\\
        ...         .with_columns(longitude_column="lon", latitude_column="lat")\\
        ...         .load()
        ...     )
        >>>
        >>> # Load data from a GeoDataFrame
        >>> import geopandas as gpd
        >>> existing_data = gpd.read_file("data/some_shapefile.shp")
        >>> gdf = mapper.loader.from_dataframe(existing_data).load() # Concise inline manner
    """

    def __init__(self):
        self.source_type: Optional[str] = None
        self.source_data: Optional[Union[str, pd.DataFrame, gpd.GeoDataFrame]] = None
        self.latitude_column: Optional[str] = None
        self.longitude_column: Optional[str] = None
        self.map_columns: Optional[Dict[str, str]] = None
        self.geometry_column: Optional[str] = None
        self.crs: Union[str, Tuple[str, str]] = DEFAULT_CRS
        self._instance: Optional[LoaderBase] = None
        self._preview: Optional[dict] = None

    def _reset(self):
        self.source_type = None
        self.source_data = None
        self.latitude_column = None
        self.longitude_column = None
        self.map_columns = None
        self.geometry_column = None
        self.crs = DEFAULT_CRS
        self.repo_id = None
        self.number_of_row = None
        self.streaming = False
        self.debug_limit_list_datasets = None
        self._instance = None
        self._preview = None

    def from_file(self, file_path: str) -> "LoaderFactory":
        """Configure the factory to load data from a file.

        This method sets up the factory to load data from a file path. The file format
        is determined by the file extension. Supported formats include `CSV`, `shapefile`,
        and `Parquet`.

        Args:
            file_path: Path to the data file to load.

        Returns:
            The LoaderFactory instance for method chaining.

        Examples:
            >>> loader = mapper.loader.from_file("data/points.csv")
            >>> # Next steps would typically be to call with_columns() and load()
        """
        self._reset()
        self.source_type = "file"
        self.source_data = file_path
        logger.log(
            "DEBUG_LOW",
            f"FROM_FILE: Initialised LoaderFactory with file_path={file_path}",
        )
        return self

    def from_dataframe(
        self, dataframe: Union[pd.DataFrame, gpd.GeoDataFrame]
    ) -> "LoaderFactory":
        """Configure the factory to load data from an existing dataframe.

        This method sets up the factory to load data from a pandas `DataFrame` or
        geopandas `GeoDataFrame`. For `DataFrames` without geometry, you will need
        to call `with_columns()` to specify the latitude and longitude columns.

        Args:
            dataframe: The pandas DataFrame or geopandas GeoDataFrame to load.

        Returns:
            The LoaderFactory instance for method chaining.

        Examples:
            >>> import pandas as pd
            >>> df = pd.read_csv("data/points.csv")
            >>> loader = mapper.loader.from_dataframe(df)
            >>> # For regular DataFrames, you must specify coordinate columns:
            >>> loader.with_columns(longitude_column="lon", latitude_column="lat")
        """
        self._reset()
        self.source_type = "dataframe"
        self.source_data = dataframe
        logger.log(
            "DEBUG_LOW",
            f"FROM_DATAFRAME: Initialised LoaderFactory with dataframe={dataframe}",
        )
        return self

    def _build_dataset_dict(self, limit: Optional[int] = None):
        all_datasets = [
            dataset.id
            for dataset in (
                huggingface_hub.list_datasets(limit=limit)
                if limit
                else huggingface_hub.list_datasets()
            )
        ]
        dataset_dict = defaultdict(list)
        for dataset_id in all_datasets:
            if "/" in dataset_id:
                repo_name, dataset_name = dataset_id.split("/", 1)
                dataset_dict[repo_name].append(dataset_name)
        return dataset_dict

    def from_huggingface(
        self,
        repo_id: str,
        number_of_rows: Optional[int] = None,
        streaming: Optional[bool] = False,
        debug_limit_list_datasets: Optional[int] = None,
    ) -> "LoaderFactory":
        self._reset()
        self.source_type = "huggingface"
        self.source_data = repo_id
        self.repo_id = repo_id
        self.number_of_row = number_of_rows
        self.streaming = streaming
        self.debug_limit_list_datasets = debug_limit_list_datasets

        logger.log(
            "DEBUG_LOW",
            f"FROM_HUGGINGFACE: Loaded dataset {repo_id} with "
            f"{'all rows' if number_of_rows is None else number_of_rows} rows "
            f"{'(streaming mode)' if streaming else '(non-streaming mode)'}.",
        )
        return self

    def with_columns(
        self,
        longitude_column: Optional[str] = None,
        latitude_column: Optional[str] = None,
        geometry_column: Optional[str] = None,
    ) -> "LoaderFactory":
        """Specify either the latitude and longitude columns or a single geometry column in the data source.

        This method configures which columns in the data source contain the latitude,
        longitude coordinates, or geometry data. Either both `latitude_column` and
        `longitude_column` must be set, or `geometry_column` must be set.

        Args:
            longitude_column: Name of the column containing longitude values (optional).
            latitude_column: Name of the column containing latitude values (optional).
            geometry_column: Name of the column containing geometry data (optional).

        Returns:
            The LoaderFactory instance for method chaining.

        Examples:
            >>> loader = mapper.loader.from_file("data/points.csv")\
            ...     .with_columns(longitude_column="lon", latitude_column="lat")
            >>> loader = mapper.loader.from_file("data/points.csv")\
            ...     .with_columns(geometry_column="geom")
        """
        self.latitude_column = latitude_column
        self.longitude_column = longitude_column
        self.geometry_column = geometry_column
        logger.log(
            "DEBUG_LOW",
            f"WITH_COLUMNS: Initialised LoaderFactory "
            f"with either latitude_column={latitude_column} and longitude_column={longitude_column} or geometry_column={geometry_column}",
        )
        return self

    def with_crs(
        self, crs: Union[str, Tuple[str, str]] = DEFAULT_CRS
    ) -> "LoaderFactory":
        """Specify the coordinate reference system for the loaded data.

        This method configures the `coordinate reference system (CRS)` to use for the loaded
        data. If the source data already has a `CRS`, it will be converted to the specified `CRS`.

        Args:
            crs: The coordinate reference system to use, in any format accepted by geopandas
                (default: `EPSG:4326`, which is standard `WGS84` coordinates).
                If a string, it specifies the coordinate reference system to use (default: 'EPSG:4326').
                If a tuple (source_crs, target_crs), it defines a conversion from the source CRS to the target CRS (default target CRS: 'EPSG:4326').


        Returns:
            The LoaderFactory instance for method chaining.

        Examples:
            >>> loader = mapper.loader.from_file("data/points.csv")\
            ...     .with_columns(longitude_column="lon", latitude_column="lat")\
            ...     .with_crs("EPSG:3857")  # Use Web Mercator projection
            >>> loader = mapper.loader.from_file("data/points.csv")\
            ...     .with_columns(longitude_column="lon", latitude_column="lat")\
            ...     .with_crs( ("EPSG:2263", "EPSG:3857") )  # Use NY State Plane to load data and convert them to Web Mercator projection
        """
        self.crs = crs
        logger.log(
            "DEBUG_LOW",
            f"WITH_CRS: Initialised LoaderFactory with crs={crs}",
        )
        return self

    def with_map(
        self,
        map_columns: Dict[str, str],
    ) -> "LoaderFactory":
        """Specify a set of source-target to map column names.

        This method configures which columns in the data source should have column names changed.

        Args:
            map_columns: dictionary with source-target (key-value) columns to map from source to target names.

        Returns:
            The LoaderFactory instance for method chaining.

        Examples:
            >>> loader = mapper.loader.from_file("data/points.csv")\
            ...     .with_map(map_columns={"long": "longitude", "lat": "latitude"})
        """
        self.map_columns = map_columns
        logger.log(
            "DEBUG_LOW",
            f"WITH_MAP: Initialised LoaderFactory with map_columns={map_columns}",
        )
        return self

    @require_attributes(["source_type", "source_data"])
    def load(self) -> gpd.GeoDataFrame:
        """Load the data and return it as a `GeoDataFrame`.

        This method loads the data from the configured source and returns it as a
        geopandas `GeoDataFrame`. It handles the details of loading from different
        source types and formats.

        Returns:
            A GeoDataFrame containing the loaded data.

        Raises:
            ValueError: If the source type is invalid, the file format is unsupported,
                or required parameters (like latitude/longitude columns) are missing.

        Examples:
            >>> # Load CSV data
            >>> gdf = mapper.loader.from_file("data/points.csv")\
            ...     .with_columns(longitude_column="lon", latitude_column="lat")\
            ...     .load()
            >>> 
            >>> # Load shapefile data
            >>> gdf = mapper.loader.from_file("data/boundaries.shp").load()
        """
        self.build()
        return self._instance.load()

    def build(self) -> LoaderBase:
        """Build and return a `loader` instance without loading the data.

        This method creates and returns a loader instance without immediately loading
        the data. It is primarily intended for use in the `UrbanPipeline`, where the
        actual loading is deferred until pipeline execution.

        Returns:
            A LoaderBase instance configured to load the data when needed.

        Raises:
            ValueError: If the source type is not supported, the file format is unsupported,
                or required parameters (like latitude/longitude columns) are missing.

        Note:
            For most use cases outside of pipelines, using load() is preferred as it
            directly returns the loaded data.

        Examples:
            >>> # Creating a pipeline component
            >>> loader = mapper.loader.from_file("data/points.csv")\
            ...     .with_columns(longitude_column="lon", latitude_column="lat")\
            ...     .build()
            >>> step_loader_for_pipeline = ("My Loader", loader) # Add this in the list of steps in the `UrbanPipeline`.
        """
        logger.log(
            "DEBUG_MID",
            "WARNING: build() should only be used in UrbanPipeline. "
            "In other cases, using .load() is a better option.",
        )
        has_geometry = self.geometry_column is not None
        has_lat_or_long = (
            self.latitude_column is not None or self.longitude_column is not None
        )
        has_lat_and_long = (
            self.latitude_column is not None and self.longitude_column is not None
        )
        file_path = ""
        loader_class = None
        input_data = None

        if self.source_type == "file":
            file_path = self.source_data
            file_ext = Path(self.source_data).suffix.lower()
            if file_ext not in LOADER_FACTORY:
                raise ValueError(f"Unsupported file format: {file_ext}")
            loader_info = LOADER_FACTORY[file_ext]
            if loader_info["requires_columns"] and (
                (has_geometry and has_lat_or_long)
                or (not has_geometry and not has_lat_and_long)
            ):
                raise ValueError(
                    f"Loader for {file_ext} requires latitude and longitude columns or only geometry column. Call with_columns() with valid column names."
                )
            loader_class = loader_info["class"]
        elif self.source_type == "dataframe":
            if (has_geometry and has_lat_or_long) or (
                not has_geometry and not has_lat_and_long
            ):
                raise ValueError(
                    "DataFrame loading requires latitude and longitude columns or only geometry column. Call with_columns() with valid column names."
                )
            loader_class = LOADER_FACTORY[self.source_type]["class"]
            input_data = self.source_data.copy()
        elif self.source_type == "huggingface":
            if (has_geometry and has_lat_or_long) or (
                not has_geometry and not has_lat_and_long
            ):
                raise ValueError(
                    "Hugging Face dataset loading requires latitude and longitude columns or only geometry column. "
                    "Call with_columns() with valid column names."
                )
            loader_class = LOADER_FACTORY[self.source_type]["class"]
        else:
            raise ValueError("Invalid source type.")

        self._instance = loader_class(
            latitude_column=self.latitude_column,
            longitude_column=self.longitude_column,
            geometry_column=self.geometry_column,
            coordinate_reference_system=self.crs,
            map_columns=self.map_columns,
            ## specific to FileLoaders (CSVLoader, ParquetLoader, and ShapefileLoader)
            file_path=file_path,
            ## specific to DataFrameLoader
            input_dataframe=input_data,
            ## specific to HuggingFaceLoader
            repo_id=self.repo_id,
            number_of_rows=self.number_of_row,
            streaming=self.streaming,
            debug_limit_list_datasets=self.debug_limit_list_datasets,
        )
        if self._preview is not None:
            self.preview(format=self._preview["format"])
        return self._instance

    def preview(self, format="ascii") -> None:
        """Display a preview of the `loader` configuration and settings.

        This method generates and displays a preview of the `loader`, showing its
        `configuration`, `settings`, and `other metadata`. The preview can be displayed
        in different formats.

        Args:
            format: The format to display the preview in (default: "ascii").

                - [x] "ascii": Text-based format for terminal display
                - [x] "json": JSON-formatted data for programmatic use

        Raises:
            ValueError: If an unsupported format is specified.

        Note:
            This method requires a loader instance to be available. Call load()
            or build() first to create an instance.

        Examples:
            >>> loader = mapper.loader.from_file("data/points.csv")\
            ...     .with_columns(longitude_column="lon", latitude_column="lat")
            >>> # Preview after loading data
            >>> loader.load()
            >>> loader.preview()
            >>> # Or JSON format
            >>> loader.preview(format="json")
        """
        if self._instance is None:
            logger.log(
                "DEBUG_LOW",
                "No loader instance available to preview. Call load() first.",
            )
            return

        if hasattr(self._instance, "preview"):
            preview_data = self._instance.preview(format=format)
            if format == "ascii":
                print(preview_data)
            elif format == "json":
                print(json.dumps(preview_data, indent=2))
            else:
                raise ValueError(f"Unsupported format '{format}'.")
        else:
            logger.log("DEBUG_LOW", "Preview not supported for this loader's instance.")

    def with_preview(self, format="ascii") -> "LoaderFactory":
        """Configure the factory to display a preview after loading or building.

        This method configures the factory to automatically display a preview after
        loading data with `load()` or building a loader with `build()`. It's a convenient
        way to inspect the loader configuration and the loaded data.

        Args:
            format: The format to display the preview in (default: "ascii").

                - [x] "ascii": Text-based format for terminal display
                - [x] "json": JSON-formatted data for programmatic use

        Returns:
            The LoaderFactory instance for method chaining.

        Examples:
            >>> # Auto-preview after loading
            >>> gdf = mapper.loader.from_file("data/points.csv")\
            ...     .with_columns(longitude_column="lon", latitude_column="lat")\
            ...     .with_preview(format="json")\
            ...     .load()
        """
        self._preview = {
            "format": format,
        }
        return self

from_file(file_path)

Configure the factory to load data from a file.

This method sets up the factory to load data from a file path. The file format is determined by the file extension. Supported formats include CSV, shapefile, and Parquet.

Parameters:

Name Type Description Default
file_path str

Path to the data file to load.

required

Returns:

Type Description
LoaderFactory

The LoaderFactory instance for method chaining.

Examples:

>>> loader = mapper.loader.from_file("data/points.csv")
>>> # Next steps would typically be to call with_columns() and load()
Source code in src/urban_mapper/modules/loader/loader_factory.py
def from_file(self, file_path: str) -> "LoaderFactory":
    """Configure the factory to load data from a file.

    This method sets up the factory to load data from a file path. The file format
    is determined by the file extension. Supported formats include `CSV`, `shapefile`,
    and `Parquet`.

    Args:
        file_path: Path to the data file to load.

    Returns:
        The LoaderFactory instance for method chaining.

    Examples:
        >>> loader = mapper.loader.from_file("data/points.csv")
        >>> # Next steps would typically be to call with_columns() and load()
    """
    self._reset()
    self.source_type = "file"
    self.source_data = file_path
    logger.log(
        "DEBUG_LOW",
        f"FROM_FILE: Initialised LoaderFactory with file_path={file_path}",
    )
    return self

from_dataframe(dataframe)

Configure the factory to load data from an existing dataframe.

This method sets up the factory to load data from a pandas DataFrame or geopandas GeoDataFrame. For DataFrames without geometry, you will need to call with_columns() to specify the latitude and longitude columns.

Parameters:

Name Type Description Default
dataframe Union[DataFrame, GeoDataFrame]

The pandas DataFrame or geopandas GeoDataFrame to load.

required

Returns:

Type Description
LoaderFactory

The LoaderFactory instance for method chaining.

Examples:

>>> import pandas as pd
>>> df = pd.read_csv("data/points.csv")
>>> loader = mapper.loader.from_dataframe(df)
>>> # For regular DataFrames, you must specify coordinate columns:
>>> loader.with_columns(longitude_column="lon", latitude_column="lat")
Source code in src/urban_mapper/modules/loader/loader_factory.py
def from_dataframe(
    self, dataframe: Union[pd.DataFrame, gpd.GeoDataFrame]
) -> "LoaderFactory":
    """Configure the factory to load data from an existing dataframe.

    This method sets up the factory to load data from a pandas `DataFrame` or
    geopandas `GeoDataFrame`. For `DataFrames` without geometry, you will need
    to call `with_columns()` to specify the latitude and longitude columns.

    Args:
        dataframe: The pandas DataFrame or geopandas GeoDataFrame to load.

    Returns:
        The LoaderFactory instance for method chaining.

    Examples:
        >>> import pandas as pd
        >>> df = pd.read_csv("data/points.csv")
        >>> loader = mapper.loader.from_dataframe(df)
        >>> # For regular DataFrames, you must specify coordinate columns:
        >>> loader.with_columns(longitude_column="lon", latitude_column="lat")
    """
    self._reset()
    self.source_type = "dataframe"
    self.source_data = dataframe
    logger.log(
        "DEBUG_LOW",
        f"FROM_DATAFRAME: Initialised LoaderFactory with dataframe={dataframe}",
    )
    return self

from_huggingface(repo_id, number_of_rows=None, streaming=False, debug_limit_list_datasets=None)

Source code in src/urban_mapper/modules/loader/loader_factory.py
def from_huggingface(
    self,
    repo_id: str,
    number_of_rows: Optional[int] = None,
    streaming: Optional[bool] = False,
    debug_limit_list_datasets: Optional[int] = None,
) -> "LoaderFactory":
    self._reset()
    self.source_type = "huggingface"
    self.source_data = repo_id
    self.repo_id = repo_id
    self.number_of_row = number_of_rows
    self.streaming = streaming
    self.debug_limit_list_datasets = debug_limit_list_datasets

    logger.log(
        "DEBUG_LOW",
        f"FROM_HUGGINGFACE: Loaded dataset {repo_id} with "
        f"{'all rows' if number_of_rows is None else number_of_rows} rows "
        f"{'(streaming mode)' if streaming else '(non-streaming mode)'}.",
    )
    return self

with_columns(longitude_column=None, latitude_column=None, geometry_column=None)

Specify either the latitude and longitude columns or a single geometry column in the data source.

This method configures which columns in the data source contain the latitude, longitude coordinates, or geometry data. Either both latitude_column and longitude_column must be set, or geometry_column must be set.

Parameters:

Name Type Description Default
longitude_column Optional[str]

Name of the column containing longitude values (optional).

None
latitude_column Optional[str]

Name of the column containing latitude values (optional).

None
geometry_column Optional[str]

Name of the column containing geometry data (optional).

None

Returns:

Type Description
LoaderFactory

The LoaderFactory instance for method chaining.

Examples:

>>> loader = mapper.loader.from_file("data/points.csv")            ...     .with_columns(longitude_column="lon", latitude_column="lat")
>>> loader = mapper.loader.from_file("data/points.csv")            ...     .with_columns(geometry_column="geom")
Source code in src/urban_mapper/modules/loader/loader_factory.py
def with_columns(
    self,
    longitude_column: Optional[str] = None,
    latitude_column: Optional[str] = None,
    geometry_column: Optional[str] = None,
) -> "LoaderFactory":
    """Specify either the latitude and longitude columns or a single geometry column in the data source.

    This method configures which columns in the data source contain the latitude,
    longitude coordinates, or geometry data. Either both `latitude_column` and
    `longitude_column` must be set, or `geometry_column` must be set.

    Args:
        longitude_column: Name of the column containing longitude values (optional).
        latitude_column: Name of the column containing latitude values (optional).
        geometry_column: Name of the column containing geometry data (optional).

    Returns:
        The LoaderFactory instance for method chaining.

    Examples:
        >>> loader = mapper.loader.from_file("data/points.csv")\
        ...     .with_columns(longitude_column="lon", latitude_column="lat")
        >>> loader = mapper.loader.from_file("data/points.csv")\
        ...     .with_columns(geometry_column="geom")
    """
    self.latitude_column = latitude_column
    self.longitude_column = longitude_column
    self.geometry_column = geometry_column
    logger.log(
        "DEBUG_LOW",
        f"WITH_COLUMNS: Initialised LoaderFactory "
        f"with either latitude_column={latitude_column} and longitude_column={longitude_column} or geometry_column={geometry_column}",
    )
    return self

with_crs(crs=DEFAULT_CRS)

Specify the coordinate reference system for the loaded data.

This method configures the coordinate reference system (CRS) to use for the loaded data. If the source data already has a CRS, it will be converted to the specified CRS.

Parameters:

Name Type Description Default
crs Union[str, Tuple[str, str]]

The coordinate reference system to use, in any format accepted by geopandas (default: EPSG:4326, which is standard WGS84 coordinates). If a string, it specifies the coordinate reference system to use (default: 'EPSG:4326'). If a tuple (source_crs, target_crs), it defines a conversion from the source CRS to the target CRS (default target CRS: 'EPSG:4326').

DEFAULT_CRS

Returns:

Type Description
LoaderFactory

The LoaderFactory instance for method chaining.

Examples:

>>> loader = mapper.loader.from_file("data/points.csv")            ...     .with_columns(longitude_column="lon", latitude_column="lat")            ...     .with_crs("EPSG:3857")  # Use Web Mercator projection
>>> loader = mapper.loader.from_file("data/points.csv")            ...     .with_columns(longitude_column="lon", latitude_column="lat")            ...     .with_crs( ("EPSG:2263", "EPSG:3857") )  # Use NY State Plane to load data and convert them to Web Mercator projection
Source code in src/urban_mapper/modules/loader/loader_factory.py
def with_crs(
    self, crs: Union[str, Tuple[str, str]] = DEFAULT_CRS
) -> "LoaderFactory":
    """Specify the coordinate reference system for the loaded data.

    This method configures the `coordinate reference system (CRS)` to use for the loaded
    data. If the source data already has a `CRS`, it will be converted to the specified `CRS`.

    Args:
        crs: The coordinate reference system to use, in any format accepted by geopandas
            (default: `EPSG:4326`, which is standard `WGS84` coordinates).
            If a string, it specifies the coordinate reference system to use (default: 'EPSG:4326').
            If a tuple (source_crs, target_crs), it defines a conversion from the source CRS to the target CRS (default target CRS: 'EPSG:4326').


    Returns:
        The LoaderFactory instance for method chaining.

    Examples:
        >>> loader = mapper.loader.from_file("data/points.csv")\
        ...     .with_columns(longitude_column="lon", latitude_column="lat")\
        ...     .with_crs("EPSG:3857")  # Use Web Mercator projection
        >>> loader = mapper.loader.from_file("data/points.csv")\
        ...     .with_columns(longitude_column="lon", latitude_column="lat")\
        ...     .with_crs( ("EPSG:2263", "EPSG:3857") )  # Use NY State Plane to load data and convert them to Web Mercator projection
    """
    self.crs = crs
    logger.log(
        "DEBUG_LOW",
        f"WITH_CRS: Initialised LoaderFactory with crs={crs}",
    )
    return self

with_preview(format='ascii')

Configure the factory to display a preview after loading or building.

This method configures the factory to automatically display a preview after loading data with load() or building a loader with build(). It's a convenient way to inspect the loader configuration and the loaded data.

Parameters:

Name Type Description Default
format

The format to display the preview in (default: "ascii").

  • "ascii": Text-based format for terminal display
  • "json": JSON-formatted data for programmatic use
'ascii'

Returns:

Type Description
LoaderFactory

The LoaderFactory instance for method chaining.

Examples:

>>> # Auto-preview after loading
>>> gdf = mapper.loader.from_file("data/points.csv")            ...     .with_columns(longitude_column="lon", latitude_column="lat")            ...     .with_preview(format="json")            ...     .load()
Source code in src/urban_mapper/modules/loader/loader_factory.py
def with_preview(self, format="ascii") -> "LoaderFactory":
    """Configure the factory to display a preview after loading or building.

    This method configures the factory to automatically display a preview after
    loading data with `load()` or building a loader with `build()`. It's a convenient
    way to inspect the loader configuration and the loaded data.

    Args:
        format: The format to display the preview in (default: "ascii").

            - [x] "ascii": Text-based format for terminal display
            - [x] "json": JSON-formatted data for programmatic use

    Returns:
        The LoaderFactory instance for method chaining.

    Examples:
        >>> # Auto-preview after loading
        >>> gdf = mapper.loader.from_file("data/points.csv")\
        ...     .with_columns(longitude_column="lon", latitude_column="lat")\
        ...     .with_preview(format="json")\
        ...     .load()
    """
    self._preview = {
        "format": format,
    }
    return self

load()

Load the data and return it as a GeoDataFrame.

This method loads the data from the configured source and returns it as a geopandas GeoDataFrame. It handles the details of loading from different source types and formats.

Returns:

Type Description
GeoDataFrame

A GeoDataFrame containing the loaded data.

Raises:

Type Description
ValueError

If the source type is invalid, the file format is unsupported, or required parameters (like latitude/longitude columns) are missing.

Examples:

>>> # Load CSV data
>>> gdf = mapper.loader.from_file("data/points.csv")            ...     .with_columns(longitude_column="lon", latitude_column="lat")            ...     .load()
>>> 
>>> # Load shapefile data
>>> gdf = mapper.loader.from_file("data/boundaries.shp").load()
Source code in src/urban_mapper/modules/loader/loader_factory.py
@require_attributes(["source_type", "source_data"])
def load(self) -> gpd.GeoDataFrame:
    """Load the data and return it as a `GeoDataFrame`.

    This method loads the data from the configured source and returns it as a
    geopandas `GeoDataFrame`. It handles the details of loading from different
    source types and formats.

    Returns:
        A GeoDataFrame containing the loaded data.

    Raises:
        ValueError: If the source type is invalid, the file format is unsupported,
            or required parameters (like latitude/longitude columns) are missing.

    Examples:
        >>> # Load CSV data
        >>> gdf = mapper.loader.from_file("data/points.csv")\
        ...     .with_columns(longitude_column="lon", latitude_column="lat")\
        ...     .load()
        >>> 
        >>> # Load shapefile data
        >>> gdf = mapper.loader.from_file("data/boundaries.shp").load()
    """
    self.build()
    return self._instance.load()

build()

Build and return a loader instance without loading the data.

This method creates and returns a loader instance without immediately loading the data. It is primarily intended for use in the UrbanPipeline, where the actual loading is deferred until pipeline execution.

Returns:

Type Description
LoaderBase

A LoaderBase instance configured to load the data when needed.

Raises:

Type Description
ValueError

If the source type is not supported, the file format is unsupported, or required parameters (like latitude/longitude columns) are missing.

Note

For most use cases outside of pipelines, using load() is preferred as it directly returns the loaded data.

Examples:

>>> # Creating a pipeline component
>>> loader = mapper.loader.from_file("data/points.csv")            ...     .with_columns(longitude_column="lon", latitude_column="lat")            ...     .build()
>>> step_loader_for_pipeline = ("My Loader", loader) # Add this in the list of steps in the `UrbanPipeline`.
Source code in src/urban_mapper/modules/loader/loader_factory.py
def build(self) -> LoaderBase:
    """Build and return a `loader` instance without loading the data.

    This method creates and returns a loader instance without immediately loading
    the data. It is primarily intended for use in the `UrbanPipeline`, where the
    actual loading is deferred until pipeline execution.

    Returns:
        A LoaderBase instance configured to load the data when needed.

    Raises:
        ValueError: If the source type is not supported, the file format is unsupported,
            or required parameters (like latitude/longitude columns) are missing.

    Note:
        For most use cases outside of pipelines, using load() is preferred as it
        directly returns the loaded data.

    Examples:
        >>> # Creating a pipeline component
        >>> loader = mapper.loader.from_file("data/points.csv")\
        ...     .with_columns(longitude_column="lon", latitude_column="lat")\
        ...     .build()
        >>> step_loader_for_pipeline = ("My Loader", loader) # Add this in the list of steps in the `UrbanPipeline`.
    """
    logger.log(
        "DEBUG_MID",
        "WARNING: build() should only be used in UrbanPipeline. "
        "In other cases, using .load() is a better option.",
    )
    has_geometry = self.geometry_column is not None
    has_lat_or_long = (
        self.latitude_column is not None or self.longitude_column is not None
    )
    has_lat_and_long = (
        self.latitude_column is not None and self.longitude_column is not None
    )
    file_path = ""
    loader_class = None
    input_data = None

    if self.source_type == "file":
        file_path = self.source_data
        file_ext = Path(self.source_data).suffix.lower()
        if file_ext not in LOADER_FACTORY:
            raise ValueError(f"Unsupported file format: {file_ext}")
        loader_info = LOADER_FACTORY[file_ext]
        if loader_info["requires_columns"] and (
            (has_geometry and has_lat_or_long)
            or (not has_geometry and not has_lat_and_long)
        ):
            raise ValueError(
                f"Loader for {file_ext} requires latitude and longitude columns or only geometry column. Call with_columns() with valid column names."
            )
        loader_class = loader_info["class"]
    elif self.source_type == "dataframe":
        if (has_geometry and has_lat_or_long) or (
            not has_geometry and not has_lat_and_long
        ):
            raise ValueError(
                "DataFrame loading requires latitude and longitude columns or only geometry column. Call with_columns() with valid column names."
            )
        loader_class = LOADER_FACTORY[self.source_type]["class"]
        input_data = self.source_data.copy()
    elif self.source_type == "huggingface":
        if (has_geometry and has_lat_or_long) or (
            not has_geometry and not has_lat_and_long
        ):
            raise ValueError(
                "Hugging Face dataset loading requires latitude and longitude columns or only geometry column. "
                "Call with_columns() with valid column names."
            )
        loader_class = LOADER_FACTORY[self.source_type]["class"]
    else:
        raise ValueError("Invalid source type.")

    self._instance = loader_class(
        latitude_column=self.latitude_column,
        longitude_column=self.longitude_column,
        geometry_column=self.geometry_column,
        coordinate_reference_system=self.crs,
        map_columns=self.map_columns,
        ## specific to FileLoaders (CSVLoader, ParquetLoader, and ShapefileLoader)
        file_path=file_path,
        ## specific to DataFrameLoader
        input_dataframe=input_data,
        ## specific to HuggingFaceLoader
        repo_id=self.repo_id,
        number_of_rows=self.number_of_row,
        streaming=self.streaming,
        debug_limit_list_datasets=self.debug_limit_list_datasets,
    )
    if self._preview is not None:
        self.preview(format=self._preview["format"])
    return self._instance

preview(format='ascii')

Display a preview of the loader configuration and settings.

This method generates and displays a preview of the loader, showing its configuration, settings, and other metadata. The preview can be displayed in different formats.

Parameters:

Name Type Description Default
format

The format to display the preview in (default: "ascii").

  • "ascii": Text-based format for terminal display
  • "json": JSON-formatted data for programmatic use
'ascii'

Raises:

Type Description
ValueError

If an unsupported format is specified.

Note

This method requires a loader instance to be available. Call load() or build() first to create an instance.

Examples:

>>> loader = mapper.loader.from_file("data/points.csv")            ...     .with_columns(longitude_column="lon", latitude_column="lat")
>>> # Preview after loading data
>>> loader.load()
>>> loader.preview()
>>> # Or JSON format
>>> loader.preview(format="json")
Source code in src/urban_mapper/modules/loader/loader_factory.py
def preview(self, format="ascii") -> None:
    """Display a preview of the `loader` configuration and settings.

    This method generates and displays a preview of the `loader`, showing its
    `configuration`, `settings`, and `other metadata`. The preview can be displayed
    in different formats.

    Args:
        format: The format to display the preview in (default: "ascii").

            - [x] "ascii": Text-based format for terminal display
            - [x] "json": JSON-formatted data for programmatic use

    Raises:
        ValueError: If an unsupported format is specified.

    Note:
        This method requires a loader instance to be available. Call load()
        or build() first to create an instance.

    Examples:
        >>> loader = mapper.loader.from_file("data/points.csv")\
        ...     .with_columns(longitude_column="lon", latitude_column="lat")
        >>> # Preview after loading data
        >>> loader.load()
        >>> loader.preview()
        >>> # Or JSON format
        >>> loader.preview(format="json")
    """
    if self._instance is None:
        logger.log(
            "DEBUG_LOW",
            "No loader instance available to preview. Call load() first.",
        )
        return

    if hasattr(self._instance, "preview"):
        preview_data = self._instance.preview(format=format)
        if format == "ascii":
            print(preview_data)
        elif format == "json":
            print(json.dumps(preview_data, indent=2))
        else:
            raise ValueError(f"Unsupported format '{format}'.")
    else:
        logger.log("DEBUG_LOW", "Preview not supported for this loader's instance.")
Fabio, Fabio, Provost Simon, sonia