Source code for egon.data.datasets.emobility.heavy_duty_transport.create_h2_buses

"""
Map demand to H2 buses and write to DB.
"""
from __future__ import annotations

from loguru import logger
import geopandas as gpd
import numpy as np
import pandas as pd

from egon.data import config, db
from egon.data.datasets.emobility.heavy_duty_transport.db_classes import (
    EgonHeavyDutyTransportVoronoi,
)

DATASET_CFG = config.datasets()["mobility_hgv"]
CARRIER = DATASET_CFG["constants"]["carrier"]
SCENARIOS = DATASET_CFG["constants"]["scenarios"]
ENERGY_VALUE = DATASET_CFG["constants"]["energy_value_h2"]
FAC = DATASET_CFG["constants"]["fac"]
HOURS_PER_YEAR = DATASET_CFG["constants"]["hours_per_year"]


[docs]def insert_hgv_h2_demand(): """ Insert list of hgv H2 demand (one per NUTS3) in database. """ for scenario in SCENARIOS: delete_old_entries(scenario) hgv_gdf = assign_h2_buses(scenario=scenario) hgv_gdf = insert_new_entries(hgv_gdf) ts_df = kg_per_year_to_mega_watt(hgv_gdf) ts_df.to_sql( "egon_etrago_load_timeseries", schema="grid", con=db.engine(), if_exists="append", index=False, )
[docs]def kg_per_year_to_mega_watt(df: pd.DataFrame | gpd.GeoDataFrame): df = df.assign( p_set=df.hydrogen_consumption * ENERGY_VALUE * FAC / HOURS_PER_YEAR, q_set=np.nan, temp_id=1, ) df.p_set = [[p_set] * HOURS_PER_YEAR for p_set in df.p_set] logger.debug(str(df.columns)) df = ( df.rename(columns={"scenario": "scn_name"}) .drop( columns=[ "hydrogen_consumption", "geometry", "bus", "carrier", ] ) .reset_index(drop=True) ) return pd.DataFrame(df)
[docs]def insert_new_entries(hgv_h2_demand_gdf: gpd.GeoDataFrame): """ Insert loads. Parameters ---------- hgv_h2_demand_gdf : geopandas.GeoDataFrame Load data to insert. """ new_id = db.next_etrago_id("load") hgv_h2_demand_gdf["load_id"] = range( new_id, new_id + len(hgv_h2_demand_gdf) ) # Add missing columns c = {"sign": -1, "type": np.nan, "p_set": np.nan, "q_set": np.nan} rename = {"scenario": "scn_name"} drop = ["hydrogen_consumption", "geometry"] hgv_h2_demand_df = pd.DataFrame( hgv_h2_demand_gdf.assign(**c) .rename(columns=rename) .drop(columns=drop) .reset_index(drop=True) ) engine = db.engine() # Insert data to db hgv_h2_demand_df.to_sql( "egon_etrago_load", engine, schema="grid", index=False, if_exists="append", ) return hgv_h2_demand_gdf
[docs]def delete_old_entries(scenario: str): """ Delete loads and load timeseries. Parameters ---------- scenario : str Name of the scenario. """ # Clean tables db.execute_sql( f""" DELETE FROM grid.egon_etrago_load_timeseries WHERE "load_id" IN ( SELECT load_id FROM grid.egon_etrago_load WHERE carrier = '{CARRIER}' AND scn_name = '{scenario}' ) """ ) db.execute_sql( f""" DELETE FROM grid.egon_etrago_load WHERE carrier = '{CARRIER}' AND scn_name = '{scenario}' """ )
[docs]def assign_h2_buses(scenario: str = "eGon2035"): hgv_h2_demand_gdf = read_hgv_h2_demand(scenario=scenario) hgv_h2_demand_gdf = db.assign_gas_bus_id( hgv_h2_demand_gdf, scenario, "H2_grid" ) # Add carrier c = {"carrier": CARRIER} hgv_h2_demand_gdf = hgv_h2_demand_gdf.assign(**c) # Remove useless columns hgv_h2_demand_gdf = hgv_h2_demand_gdf.drop( columns=["geom", "NUTS0", "NUTS1", "bus_id"], errors="ignore" ) return hgv_h2_demand_gdf
[docs]def read_hgv_h2_demand(scenario: str = "eGon2035"): with db.session_scope() as session: query = session.query( EgonHeavyDutyTransportVoronoi.nuts3, EgonHeavyDutyTransportVoronoi.scenario, EgonHeavyDutyTransportVoronoi.hydrogen_consumption, ).filter(EgonHeavyDutyTransportVoronoi.scenario == scenario) df = pd.read_sql(query.statement, query.session.bind, index_col="nuts3") sql_vg250 = """ SELECT nuts as nuts3, geometry as geom FROM boundaries.vg250_krs WHERE gf = 4 """ srid = DATASET_CFG["tables"]["srid"] gdf_vg250 = db.select_geodataframe(sql_vg250, index_col="nuts3", epsg=srid) gdf_vg250["geometry"] = gdf_vg250.geom.centroid srid_buses = DATASET_CFG["tables"]["srid_buses"] return gpd.GeoDataFrame( df.merge(gdf_vg250[["geometry"]], left_index=True, right_index=True), crs=gdf_vg250.crs, ).to_crs(epsg=srid_buses)