Source code for ocr.datasets

import typing

import duckdb
import geopandas as gpd
import icechunk as ic
import pydantic
import xarray as xr
from shapely import wkt


[docs] class Dataset(pydantic.BaseModel): """ Base class for datasets. """ name: str description: str bucket: str prefix: str data_format: typing.Literal['geoparquet', 'zarr'] version: str = 'v1' license: str | None = None
[docs] def to_xarray( self, *, is_icechunk: bool | None = None, xarray_open_kwargs: dict | None = None, xarray_storage_options: dict | None = None, ) -> xr.Dataset: """ Convert the dataset to an xarray.Dataset. Parameters ---------- is_icechunk : bool | None, default None Whether to use icechunk to access the data. - If True: only try using icechunk - If None: try icechunk first, fall back to direct S3 access if it fails - If False: only use direct S3 access xarray_open_kwargs : dict, optional Additional keyword arguments to pass to xarray.open_dataset. xarray_storage_options : dict, optional Storage options for S3 access when not using icechunk. Returns ------- xr.Dataset The opened dataset. Raises ------ ValueError If the dataset is not in 'zarr' format. FileNotFoundError If the dataset cannot be found or accessed. """ if self.data_format != 'zarr': raise ValueError("Dataset must be in 'zarr' format to convert to xarray.") xarray_open_kwargs = xarray_open_kwargs or {} xarray_storage_options = xarray_storage_options or {} # Try icechunk if is_icechunk is True or None if is_icechunk is not False: try: storage = ic.s3_storage(bucket=self.bucket, prefix=self.prefix) repo = ic.Repository.open(storage=storage) session = repo.readonly_session('main') icechunk_kwargs = { 'consolidated': False, 'engine': 'zarr', 'chunks': {}, **xarray_open_kwargs, } ds = xr.open_dataset(session.store, **icechunk_kwargs) return ds except Exception as exc: # If is_icechunk=True but icechunk failed, raise the error if is_icechunk is True: raise FileNotFoundError( f"Failed to open icechunk repository: '{self.bucket}/{self.prefix}'" ) from exc # Otherwise, if is_icechunk=None, we'll try the fallback method # Direct S3 access (either is_icechunk=False or icechunk failed with is_icechunk=None) try: direct_s3_kwargs = {'engine': 'zarr', 'chunks': {}, **xarray_open_kwargs} ds = xr.open_dataset( f's3://{self.bucket}/{self.prefix}', **direct_s3_kwargs, storage_options=xarray_storage_options, ) return ds except Exception as exc: raise FileNotFoundError( f"No such file or directory: 's3://{self.bucket}/{self.prefix}'" ) from exc
[docs] def query_geoparquet( self, query: str | None = None, *, install_extensions: bool = True, ) -> 'duckdb.DuckDBPyRelation': """ Query a geoparquet file using DuckDB. Parameters ---------- query : str, optional SQL query to execute. If not provided, returns all data. install_extensions : bool, default True Whether to install and load the spatial and httpfs extensions. Returns ------- duckdb.DuckDBPyRelation Result of the DuckDB query. Raises ------ ValueError If dataset is not in 'geoparquet' format. Example ------- Example of querying buildings with a converted geometry column: >>> buildings = catalog.get_dataset('conus-overture-buildings', 'v2025-03-19.1') >>> result = buildings.query_geoparquet(\"\"\" ... SELECT ... id, ... roof_material, ... geometry ... FROM read_parquet('{s3_path}') ... WHERE roof_material = 'concrete' ... \"\"\") >>> # Then convert to GeoDataFrame >>> gdf = buildings.to_geopandas(\"\"\" ... SELECT ... id, ... roof_material, ... geometry ... FROM read_parquet('{s3_path}') ... WHERE roof_material = 'concrete' ... \"\"\") """ from ocr.utils import install_load_extensions if self.data_format != 'geoparquet': raise ValueError("Dataset must be in 'geoparquet' format to query with DuckDB.") if install_extensions: install_load_extensions() s3_path = f's3://{self.bucket}/{self.prefix}' if query is None: return duckdb.sql(f"SELECT * FROM read_parquet('{s3_path}')") else: # Replace placeholder in query if present if '{s3_path}' in query: query = query.format(s3_path=s3_path) return duckdb.sql(query)
[docs] def to_geopandas( self, query: str | None = None, geometry_column='geometry', crs: str = 'EPSG:4326', target_crs: str | None = None, **kwargs, ) -> gpd.GeoDataFrame: """Convert query results to a GeoPandas GeoDataFrame. Parameters ---------- query : str, optional SQL query to execute. If not provided, returns all data. geometry_column : str, default 'geometry' The name of the geometry column in the query result. crs : str, default 'EPSG:4326' The coordinate reference system to use for the geometries. target_crs : str, optional The target coordinate reference system to convert the geometries to. **kwargs : dict Additional keyword arguments passed to `query_geoparquet`. Returns ------- gpd.GeoDataFrame A GeoPandas GeoDataFrame containing the queried data with geometries. Raises ------ ValueError If dataset is not in 'geoparquet' format or if the geometry column is not found. Example ------- Example of converting buildings to GeoPandas GeoDataFrame - no need for ST_AsText(): >>> buildings = catalog.get_dataset('conus-overture-buildings', 'v2025-03-19.1') >>> gdf = buildings.to_geopandas(\"\"\" ... SELECT ... id, ... roof_material, ... geometry ... FROM read_parquet('{s3_path}') ... WHERE roof_material = 'concrete' ... \"\"\") >>> gdf.head() """ if query is not None: # Only add the conversion if the geometry column doesn't already have a transformation if f'ST_AsText({geometry_column})' not in query: # Construct new query that preserves all columns but converts the geometry modified_query = query # Check if the query has a SELECT clause we can modify if 'SELECT' in query.upper() and 'FROM' in query.upper(): select_part, _ = query.upper().split('FROM', 1) # case 1: SELECT * query if 'SELECT *' in query.upper(): # get the column names to build an explicit query columns_query = "SELECT * FROM read_parquet('{s3_path}') LIMIT 0" columns_result = self.query_geoparquet(columns_query, **kwargs) columns_df = columns_result.df() # Create new query with explicit columns col_list = [] for col in columns_df.columns: if col.lower() == geometry_column.lower(): col_list.append(f'ST_AsText({col}) as {col}') else: col_list.append(col) # Replace SELECT * with explicit column list modified_query = query.replace('*', ', '.join(col_list), 1) # If the geometry column is directly selected (not already transformed) elif geometry_column.upper() in select_part: original_query = query modified_query = original_query.replace( geometry_column, f'ST_AsText({geometry_column}) as {geometry_column}', ) query = modified_query result = self.query_geoparquet(query, **kwargs) df = result.df() if geometry_column not in df.columns: raise ValueError(f"Geometry column '{geometry_column}' not found in results") try: geometry_series = df[geometry_column].apply( lambda g: wkt.loads(g) if g is not None else None ) gdf = gpd.GeoDataFrame(df, geometry=geometry_series, crs=crs) if target_crs is not None: gdf = gdf.to_crs(target_crs) return gdf except Exception as e: raise ValueError(f'Failed to convert geometry column: {geometry_column}') from e
[docs] class Catalog(pydantic.BaseModel): """ Base class for datasets catalog. """ datasets: list[Dataset]
[docs] def get_dataset( self, name: str, version: str | None = None, *, case_sensitive: bool = True, latest: bool = False, ) -> Dataset: """ Get a dataset by name and optionally version. Parameters ---------- name : str Name of the dataset to retrieve version : str, optional Specific version of the dataset. If not provided, returns the dataset if only one version exists, or raises an error if multiple versions exist, unless get_latest=True. case_sensitive : bool, default True Whether to match dataset names case-sensitively latest : bool, default False If True and version=None, returns the latest version instead of raising an error when multiple versions exist Returns ------- Dataset The matched dataset Raises ------ ValueError If multiple versions exist and version is not specified (and latest=False) KeyError If no matching dataset is found Examples -------- >>> # Get a dataset with a specific version >>> catalog.get_dataset('conus-overture-buildings', 'v2025-03-19.1') >>> >>> # Get latest version of a dataset >>> catalog.get_dataset('conus-overture-buildings', get_latest=True) """ found_datasets = [] name_matches = [] for dataset in self.datasets: dataset_name = dataset.name if case_sensitive else dataset.name.lower() search_name = name if case_sensitive else name.lower() if dataset_name == search_name: name_matches.append(dataset.name) if version is None or dataset.version == version: found_datasets.append(dataset) if version is None: if len(found_datasets) == 1: return found_datasets[0] elif len(found_datasets) > 1: if latest: try: return sorted(found_datasets, key=lambda x: x.version, reverse=True)[0] except Exception as e: found_versions = {dataset.version for dataset in found_datasets} raise ValueError( f'Could not determine the latest version from {found_versions}. ' f'Please specify a version explicitly.' ) from e else: found_versions = {dataset.version for dataset in found_datasets} raise ValueError( f"Multiple versions found for dataset '{name}'. " f'Please specify a version: {sorted(found_versions)} ' f'or use get_latest=True to automatically select the latest version.' ) if found_datasets: return found_datasets[0] if name_matches: # We found the name but not the specific version found_versions = { dataset.version for dataset in self.datasets if ( dataset.name == name if case_sensitive else dataset.name.lower() == name.lower() ) } raise KeyError( f"Dataset '{name}' exists, but version '{version}' was not found. " f'Available versions: {sorted(found_versions)}' ) raise KeyError(f"Dataset '{name}' not found in the catalog.")
def __iter__(self): return iter(self.datasets) def __str__(self) -> str: """ Return a string representation of the catalog. """ return self.__repr__() def __repr__(self) -> str: """ Return a string representation of the catalog. """ try: from io import StringIO from rich.console import Console from rich.table import Table output = StringIO() console = Console(file=output) # Create the table table = Table( title=f'📊 OCR Dataset Catalog ({len(self.datasets)} datasets)', show_lines=True, expand=True, ) # Add columns table.add_column('Name', style='cyan bold', ratio=3, overflow='fold') table.add_column('Description', style='green', ratio=4, overflow='fold') table.add_column('Format', style='magenta', ratio=1, overflow='fold') table.add_column('Version', style='blue', ratio=2, overflow='fold') table.add_column('Storage Location', style='yellow', ratio=5, overflow='fold') # Add rows for ds in self.datasets: table.add_row( ds.name, ds.description, f'{ds.data_format}', ds.version, f's3://{ds.bucket}/{ds.prefix}', ) console.print(table) return output.getvalue() except ImportError: # Fallback if rich is not available result = [f'📊 OCR Dataset Catalog ({len(self.datasets)} datasets)'] for ds in self.datasets: result.append(f'- {ds.name}: {ds.description} [{ds.data_format}]') return '\n'.join(result)
datasets = [ Dataset( name='conus-overture-addresses', description='CONUS Overture Addresses', bucket='carbonplan-ocr', prefix='input/fire-risk/vector/overture-maps/CONUS-overture-addresses-2025-09-24.0.parquet', data_format='geoparquet', version='v2025-09-24.0', ), Dataset( name='conus-overture-buildings', description='CONUS Overture Buildings', bucket='carbonplan-ocr', prefix='input/fire-risk/vector/overture-maps/CONUS-overture-buildings-2025-09-24.0.parquet', data_format='geoparquet', version='v2025-09-24.0', ), Dataset( name='conus-overture-region-id-tagged-buildings', description='CONUS Overture Buildings with census region identifiers added', bucket='carbonplan-ocr', prefix='input/fire-risk/vector/overture-maps/CONUS-overture-region-tagged-buildings-2025-09-24.0.parquet', data_format='geoparquet', ), Dataset( name='conus-overture-buildings-5070', description='CONUS Overture Buildings in EPSG 5070. Columns are: bbox, bbox_5070, geometry, geometry_5070', bucket='carbonplan-ocr', prefix='input/fire-risk/vector/CONUS_overture_buildings_5070_2025-03-19.1.parquet', data_format='geoparquet', version='v2025-03-19.1', ), Dataset( name='conus404-hourly-Q2', description='Q2 variable from CONUS404 hourly data in Icechunk format', bucket='carbonplan-ocr', prefix='input/conus404-hourly-icechunk/Q2', data_format='zarr', ), Dataset( name='conus404-hourly-TD2', description='TD2 variable from CONUS404 hourly data in Icechunk format', bucket='carbonplan-ocr', prefix='input/conus404-hourly-icechunk/TD2', data_format='zarr', ), Dataset( name='conus404-hourly-PSFC', description='PSFC variable from CONUS404 hourly data in Icechunk format', bucket='carbonplan-ocr', prefix='input/conus404-hourly-icechunk/PSFC', data_format='zarr', ), Dataset( name='conus404-hourly-T2', description='T2 variable from CONUS404 hourly data in Icechunk format', bucket='carbonplan-ocr', prefix='input/conus404-hourly-icechunk/T2', data_format='zarr', ), Dataset( name='conus404-hourly-V10', description='V10 variable from CONUS404 hourly data in Icechunk format', bucket='carbonplan-ocr', prefix='input/conus404-hourly-icechunk/V10', data_format='zarr', ), Dataset( name='conus404-hourly-U10', description='U10 variable from CONUS404 hourly data in Icechunk format', bucket='carbonplan-ocr', prefix='input/conus404-hourly-icechunk/U10', data_format='zarr', ), Dataset( name='us-census-blocks', description='US Census Blocks', bucket='carbonplan-ocr', prefix='input/fire-risk/vector/census-tiger/blocks/blocks.parquet', data_format='geoparquet', ), Dataset( name='us-census-tracts', description='US Census Tracts', bucket='carbonplan-ocr', prefix='input/fire-risk/vector/census-tiger/tracts/tracts.parquet', data_format='geoparquet', ), Dataset( name='us-census-counties', description='US Census Counties', bucket='carbonplan-ocr', prefix='input/fire-risk/vector/census-tiger/counties/counties.parquet', data_format='geoparquet', ), Dataset( name='us-census-states', description='US Census States', bucket='carbonplan-ocr', prefix='input/fire-risk/vector/census-tiger/states/states.parquet', data_format='geoparquet', ), Dataset( name='us-census-nation', description='US Census CONUS States', bucket='carbonplan-ocr', prefix='input/fire-risk/vector/census-tiger/nation/nation.parquet', data_format='geoparquet', ), # CONUS404 Fosberg Fire Weather Index (FFWI) datasets Dataset( name='conus404-ffwi', description='Fosberg Fire Weather Index (FFWI) on CONUS404 native grid', bucket='carbonplan-ocr', prefix='input/fire-risk/tensor/conus404-ffwi/fosberg-fire-weather-index.icechunk', data_format='zarr', ), Dataset( name='conus404-ffwi-p99', description='FFWI p99 (99th percentile) on CONUS404 native grid', bucket='carbonplan-ocr', prefix='input/fire-risk/tensor/conus404-ffwi/fosberg-fire-weather-index-p99.icechunk', data_format='zarr', ), Dataset( name='conus404-ffwi-p99-wind-direction-distribution', description='Wind direction distribution during FFWI p99 conditions on CONUS404 native grid', bucket='carbonplan-ocr', prefix='input/fire-risk/tensor/conus404-ffwi/fosberg-fire-weather-index-p99-wind-direction-distribution.icechunk', data_format='zarr', ), Dataset( name='conus404-ffwi-winds', description='Wind variables associated with FFWI computations on CONUS404 native grid', bucket='carbonplan-ocr', prefix='input/fire-risk/tensor/conus404-ffwi/winds.icechunk', data_format='zarr', ), Dataset( name='conus404-ffwi-p99-wind-direction-distribution-30m-4326', description='Wind direction distribution during FFWI p99 conditions reprojected to USFS wildfire risk geobox (EPSG:4326)', bucket='carbonplan-ocr', prefix='input/fire-risk/tensor/conus404-ffwi/fosberg-fire-weather-index-p99-wind-direction-distribution-30m-4326.icechunk', data_format='zarr', ), # USFS Scott et al. 2024 (RDS-2020-0016-02) Dataset( name='scott-et-al-2024', description='USFS Wildfire Risk to Communities (2nd Edition, RDS-2020-0016-02, EPSG:5070)', bucket='carbonplan-ocr', prefix='input/fire-risk/tensor/USFS/scott-et-al-2024/processed.icechunk', data_format='zarr', ), Dataset( name='scott-et-al-2024-30m-4326', description='USFS Wildfire Risk to Communities (2nd Edition, RDS-2020-0016-02, EPSG:4326, 30m)', bucket='carbonplan-ocr', prefix='input/fire-risk/tensor/USFS/scott-et-al-2024/processed-30m-4326.icechunk', data_format='zarr', ), # USFS Riley et al. 2025 (RDS-2025-0006) Dataset( name='riley-et-al-2025-2011-270m-5070', description='USFS Riley et al. 2025 2011 Climate Run (EPSG:5070)', bucket='carbonplan-ocr', prefix='input/fire-risk/tensor/USFS/riley-et-al-2025/2011-climate-run-270m-5070.icechunk', data_format='zarr', ), Dataset( name='riley-et-al-2025-2047-270m-5070', description='USFS Riley et al. 2025 2047 Climate Run (EPSG:5070)', bucket='carbonplan-ocr', prefix='input/fire-risk/tensor/USFS/riley-et-al-2025/2047-climate-run-270m-5070.icechunk', data_format='zarr', ), Dataset( name='riley-et-al-2025-2011-30m-4326', description='USFS Riley et al. 2025 2011 Climate Run (EPSG:4326, 30m)', bucket='carbonplan-ocr', prefix='input/fire-risk/tensor/USFS/riley-et-al-2025/2011-climate-run-30m-4326.icechunk', data_format='zarr', ), Dataset( name='riley-et-al-2025-2047-30m-4326', description='USFS Riley et al. 2025 2047 Climate Run (EPSG:4326, 30m)', bucket='carbonplan-ocr', prefix='input/fire-risk/tensor/USFS/riley-et-al-2025/2047-climate-run-30m-4326.icechunk', data_format='zarr', ), Dataset( name='dillon-et-al-2023-270m-5070', description='USFS Dillon et al. 2023 Wildfire Risk to Communities (EPSG:5070)', bucket='carbonplan-ocr', prefix='input/fire-risk/tensor/USFS/dillon-et-al-2023/processed-270m-5070.icechunk', data_format='zarr', ), Dataset( name='dillon-et-al-2023-30m-4326', description='USFS Dillon et al. 2023 Wildfire Risk to Communities (EPSG:4326, 30m)', bucket='carbonplan-ocr', prefix='input/fire-risk/tensor/USFS/dillon-et-al-2023/processed-30m-4326.icechunk', data_format='zarr', ), Dataset( name='calfire-fhsz-3310', description='California Fire Hazard Severity Zones (FHSZ) in EPSG 3310', bucket='carbonplan-ocr', prefix='input/fire-risk/tensor/calfire-fhsz/calfire-risk-raster-3310.icechunk', data_format='zarr', ), Dataset( name='calfire-fhsz-4326', description='California Fire Hazard Severity Zones (FHSZ) in EPSG 4326', bucket='carbonplan-ocr', prefix='input/fire-risk/tensor/calfire-fhsz/calfire-risk-raster-4326.icechunk', data_format='zarr', ), ] catalog = Catalog(datasets=sorted(datasets, key=lambda x: x.name))