Source code for egon.data.datasets.emobility.heavy_duty_transport.h2_demand_distribution

"""
Calculation of hydrogen demand based on a Voronoi partition of counted truck traffic
used to allocate it to NUTS3 regions and finally aggregate it on NUTS3 level.
"""
from geovoronoi import points_to_coords, voronoi_regions_from_coords
from loguru import logger
from shapely import wkt
from shapely.geometry.multipolygon import MultiPolygon
from shapely.geometry.polygon import Polygon
from shapely.ops import cascaded_union
import geopandas as gpd

from egon.data import config, db
from egon.data.datasets.emobility.heavy_duty_transport.data_io import get_data
from egon.data.datasets.emobility.heavy_duty_transport.db_classes import (
    EgonHeavyDutyTransportVoronoi,
)

DATASET_CFG = config.datasets()["mobility_hgv"]


[docs]def run_egon_truck(): boundary_gdf, bast_gdf, nuts3_gdf = get_data() bast_gdf_within = bast_gdf.dropna().loc[ bast_gdf.within(boundary_gdf.geometry.iat[0]) ] voronoi_gdf = voronoi(bast_gdf_within, boundary_gdf) nuts3_gdf = geo_intersect(voronoi_gdf, nuts3_gdf) nuts3_gdf = nuts3_gdf.assign( normalized_truck_traffic=( nuts3_gdf.truck_traffic / nuts3_gdf.truck_traffic.sum() ) ) scenarios = DATASET_CFG["constants"]["scenarios"] for scenario in scenarios: total_hydrogen_consumption = calculate_total_hydrogen_consumption( scenario=scenario ) nuts3_gdf = nuts3_gdf.assign( hydrogen_consumption=( nuts3_gdf.normalized_truck_traffic * total_hydrogen_consumption ), scenario=scenario, ) nuts3_gdf.reset_index().to_postgis( name=EgonHeavyDutyTransportVoronoi.__table__.name, con=db.engine(), schema=EgonHeavyDutyTransportVoronoi.__table__.schema, if_exists="append", index=False, )
[docs]def calculate_total_hydrogen_consumption(scenario: str = "eGon2035"): """Calculate the total hydrogen demand for trucking in Germany.""" constants = DATASET_CFG["constants"] hgv_mileage = DATASET_CFG["hgv_mileage"] leakage = constants["leakage"] leakage_rate = constants["leakage_rate"] hydrogen_consumption = constants["hydrogen_consumption"] # kg/100km fcev_share = constants["fcev_share"] hgv_mileage = hgv_mileage[scenario] # km hydrogen_consumption_per_km = hydrogen_consumption / 100 # kg/km # calculate total hydrogen consumption kg/a if leakage: return ( hgv_mileage * hydrogen_consumption_per_km * fcev_share / (1 - leakage_rate) ) else: return hgv_mileage * hydrogen_consumption_per_km * fcev_share
[docs]def geo_intersect( voronoi_gdf: gpd.GeoDataFrame, nuts3_gdf: gpd.GeoDataFrame, mode: str = "intersection", ): """Calculate Intersections between two GeoDataFrames and distribute truck traffic""" logger.info( "Calculating Intersections between Voronoi Field and Grid Districts " "and distributing truck traffic accordingly to the area share." ) voronoi_gdf = voronoi_gdf.assign(voronoi_id=voronoi_gdf.index.tolist()) # Find Intersections between both GeoDataFrames intersection_gdf = gpd.overlay( voronoi_gdf, nuts3_gdf.reset_index()[["nuts3", "geometry"]], how=mode ) # Calc Area of Intersections intersection_gdf = intersection_gdf.assign( surface_area=intersection_gdf.geometry.area / 10**6 ) # km² # Initialize results column nuts3_gdf = nuts3_gdf.assign(truck_traffic=0) for voronoi_id in intersection_gdf.voronoi_id.unique(): voronoi_id_intersection_gdf = intersection_gdf.loc[ intersection_gdf.voronoi_id == voronoi_id ] total_area = voronoi_id_intersection_gdf.surface_area.sum() truck_traffic = voronoi_id_intersection_gdf.truck_traffic.iat[0] for idx, row in voronoi_id_intersection_gdf.iterrows(): traffic_share = truck_traffic * row["surface_area"] / total_area nuts3_gdf.at[row["nuts3"], "truck_traffic"] += traffic_share logger.info("Done.") return nuts3_gdf
[docs]def voronoi( points: gpd.GeoDataFrame, boundary: gpd.GeoDataFrame, ): """Building a Voronoi Field from points and a boundary.""" logger.info("Building Voronoi Field.") sources = DATASET_CFG["original_data"]["sources"] relevant_columns = sources["BAST"]["relevant_columns"] truck_col = relevant_columns[0] srid = DATASET_CFG["tables"]["srid"] # convert the boundary geometry into a union of the polygon # convert the Geopandas GeoSeries of Point objects to NumPy array of coordinates. boundary_shape = cascaded_union(boundary.geometry) coords = points_to_coords(points.geometry) # calculate Voronoi regions poly_shapes, pts, unassigned_pts = voronoi_regions_from_coords( coords, boundary_shape, return_unassigned_points=True ) multipoly_shapes = {} for key, shape in poly_shapes.items(): if isinstance(shape, Polygon): shape = wkt.loads(str(shape)) shape = MultiPolygon([shape]) multipoly_shapes[key] = [shape] poly_gdf = gpd.GeoDataFrame.from_dict( multipoly_shapes, orient="index", columns=["geometry"] ) # match points to old index # FIXME: This seems overcomplicated poly_gdf.index = [v[0] for v in pts.values()] poly_gdf = poly_gdf.sort_index() unmatched = [points.index[idx] for idx in unassigned_pts] points_matched = points.drop(unmatched) poly_gdf.index = points_matched.index # match truck traffic to new polys poly_gdf = poly_gdf.assign( truck_traffic=points.loc[poly_gdf.index][truck_col] ) poly_gdf = poly_gdf.set_crs(epsg=srid, inplace=True) logger.info("Done.") return poly_gdf