Source code for egon.data.datasets.electricity_demand

"""The central module containing all code dealing with processing
 data from demandRegio

"""
from sqlalchemy import Column, Float, ForeignKey, Integer, String
from sqlalchemy.ext.declarative import declarative_base
import pandas as pd

from egon.data import db
from egon.data.datasets import Dataset
from egon.data.datasets.electricity_demand.temporal import insert_cts_load
from egon.data.datasets.electricity_demand_timeseries.hh_buildings import (
    HouseholdElectricityProfilesOfBuildings,
    get_iee_hh_demand_profiles_raw,
)
from egon.data.datasets.electricity_demand_timeseries.hh_profiles import (
    HouseholdElectricityProfilesInCensusCells,
)
from egon.data.datasets.zensus_vg250 import DestatisZensusPopulationPerHa
import egon.data.config

# will be later imported from another file ###
Base = declarative_base()
engine = db.engine()


[docs]class HouseholdElectricityDemand(Dataset): def __init__(self, dependencies): super().__init__( name="HouseholdElectricityDemand", version="0.0.3", dependencies=dependencies, tasks=(create_tables, get_annual_household_el_demand_cells), )
[docs]class CtsElectricityDemand(Dataset): def __init__(self, dependencies): super().__init__( name="CtsElectricityDemand", version="0.0.2", dependencies=dependencies, tasks=(distribute_cts_demands, insert_cts_load), )
[docs]class EgonDemandRegioZensusElectricity(Base): __tablename__ = "egon_demandregio_zensus_electricity" __table_args__ = {"schema": "demand", "extend_existing": True} zensus_population_id = Column( Integer, ForeignKey(DestatisZensusPopulationPerHa.id), primary_key=True ) scenario = Column(String(50), primary_key=True) sector = Column(String, primary_key=True) demand = Column(Float)
[docs]def create_tables(): """Create tables for demandregio data Returns ------- None. """ db.execute_sql("CREATE SCHEMA IF NOT EXISTS demand;") db.execute_sql("CREATE SCHEMA IF NOT EXISTS society;") engine = db.engine() EgonDemandRegioZensusElectricity.__table__.drop( bind=engine, checkfirst=True ) EgonDemandRegioZensusElectricity.__table__.create( bind=engine, checkfirst=True )
[docs]def get_annual_household_el_demand_cells(): """ Annual electricity demand per cell is determined Timeseries for every cell are accumulated, the maximum value determined and with the respective nuts3 factor scaled for 2035 and 2050 scenario. Note ---------- In test-mode 'SH' the iteration takes place by 'cell_id' to avoid intensive RAM usage. For whole Germany 'nuts3' are taken and RAM > 32GB is necessary. """ with db.session_scope() as session: cells_query = ( session.query( HouseholdElectricityProfilesOfBuildings, HouseholdElectricityProfilesInCensusCells.nuts3, HouseholdElectricityProfilesInCensusCells.factor_2035, HouseholdElectricityProfilesInCensusCells.factor_2050, ) .filter( HouseholdElectricityProfilesOfBuildings.cell_id == HouseholdElectricityProfilesInCensusCells.cell_id ) .order_by(HouseholdElectricityProfilesOfBuildings.id) ) df_buildings_and_profiles = pd.read_sql( cells_query.statement, cells_query.session.bind, index_col="id" ) # Read demand profiles from egon-data-bundle df_profiles = get_iee_hh_demand_profiles_raw() def ve(s): raise (ValueError(s)) dataset = egon.data.config.settings()["egon-data"]["--dataset-boundary"] iterate_over = ( "nuts3" if dataset == "Everything" else "cell_id" if dataset == "Schleswig-Holstein" else ve(f"'{dataset}' is not a valid dataset boundary.") ) df_annual_demand = pd.DataFrame( columns=["eGon2035", "eGon100RE", "zensus_population_id"] ) for _, df in df_buildings_and_profiles.groupby(by=iterate_over): df_annual_demand_iter = pd.DataFrame( columns=["eGon2035", "eGon100RE", "zensus_population_id"] ) df_annual_demand_iter["eGon2035"] = ( df_profiles.loc[:, df["profile_id"]].sum(axis=0) * df["factor_2035"].values ) df_annual_demand_iter["eGon100RE"] = ( df_profiles.loc[:, df["profile_id"]].sum(axis=0) * df["factor_2050"].values ) df_annual_demand_iter["zensus_population_id"] = df["cell_id"].values df_annual_demand = df_annual_demand.append(df_annual_demand_iter) df_annual_demand = ( df_annual_demand.groupby("zensus_population_id").sum().reset_index() ) df_annual_demand["sector"] = "residential" df_annual_demand = df_annual_demand.melt( id_vars=["zensus_population_id", "sector"], var_name="scenario", value_name="demand", ) # convert from Wh to MWh df_annual_demand["demand"] = df_annual_demand["demand"] / 1e6 # delete all cells for residentials with db.session_scope() as session: session.query(EgonDemandRegioZensusElectricity).filter( EgonDemandRegioZensusElectricity.sector == "residential" ).delete() # Insert data to target table df_annual_demand.to_sql( name=EgonDemandRegioZensusElectricity.__table__.name, schema=EgonDemandRegioZensusElectricity.__table__.schema, con=db.engine(), index=False, if_exists="append", )
[docs]def distribute_cts_demands(): """Distribute electrical demands for cts to zensus cells. The demands on nuts3-level from demandregio are linear distributed to the heat demand of cts in each zensus cell. Returns ------- None. """ sources = egon.data.config.datasets()["electrical_demands_cts"]["sources"] target = egon.data.config.datasets()["electrical_demands_cts"]["targets"][ "cts_demands_zensus" ] db.execute_sql( f"""DELETE FROM {target['schema']}.{target['table']} WHERE sector = 'service'""" ) # Select match between zensus cells and nuts3 regions of vg250 map_nuts3 = db.select_dataframe( f"""SELECT zensus_population_id, vg250_nuts3 as nuts3 FROM {sources['map_zensus_vg250']['schema']}. {sources['map_zensus_vg250']['table']}""", index_col="zensus_population_id", ) # Insert data per scenario for scn in sources["demandregio"]["scenarios"]: # Select heat_demand per zensus cell peta = db.select_dataframe( f"""SELECT zensus_population_id, demand as heat_demand, sector, scenario FROM {sources['heat_demand_cts']['schema']}. {sources['heat_demand_cts']['table']} WHERE scenario = '{scn}' AND sector = 'service'""", index_col="zensus_population_id", ) # Add nuts3 key to zensus cells peta["nuts3"] = map_nuts3.nuts3 # Calculate share of nuts3 heat demand per zensus cell peta["share"] = peta.heat_demand.groupby(peta.nuts3).apply( lambda grp: grp / grp.sum() ) # Select forecasted electrical demands from demandregio table demand_nuts3 = db.select_dataframe( f"""SELECT nuts3, SUM(demand) as demand FROM {sources['demandregio']['schema']}. {sources['demandregio']['table']} WHERE scenario = '{scn}' AND wz IN ( SELECT wz FROM {sources['demandregio_wz']['schema']}. {sources['demandregio_wz']['table']} WHERE sector = 'CTS') GROUP BY nuts3""", index_col="nuts3", ) # Scale demands on nuts3 level linear to heat demand share peta["demand"] = peta["share"].mul( demand_nuts3.demand[peta["nuts3"]].values ) # Rename index peta.index = peta.index.rename("zensus_population_id") # Insert data to target table peta[["scenario", "demand", "sector"]].to_sql( target["table"], schema=target["schema"], con=db.engine(), if_exists="append", )