Source code for egon.data.datasets.demandregio

"""The central module containing all code dealing with importing and
adjusting data from demandRegio

"""

from pathlib import Path
import os
import zipfile

from sqlalchemy import ARRAY, Column, Float, ForeignKey, Integer, String
from sqlalchemy.ext.declarative import declarative_base
import numpy as np
import pandas as pd

from egon.data import db, logger
from egon.data.datasets import Dataset, DatasetSources, DatasetTargets
from egon.data.datasets.scenario_parameters import (
    EgonScenario,
    get_sector_parameters,
)
import egon.data.config
import egon.data.datasets.scenario_parameters.parameters as scenario_parameters

try:
    from disaggregator import config, data, spatial, temporal

except ImportError:
    pass

# will be later imported from another file ###
Base = declarative_base()


[docs] class DemandRegio(Dataset): """ Extract and adjust data from DemandRegio Demand data for the sectors households, CTS and industry are calculated using DemandRegio's diaggregator and input data. To bring the resulting data in line with other data used in eGon-data and the eGon project in general some data needed to be adjusted or extended, e.g. in function :py:func:`adjust_ind_pes` or function :py:func:`adjust_cts_ind_nep`. The resulting data is written into newly created tables. *Dependencies* * :py:class:`DataBundle <egon.data.datasets.data_bundle.DataBundle>` * :py:class:`ScenarioParameters <egon.data.datasets.scenario_parameters.ScenarioParameters>` * :py:class:`ZensusVg250 <egon.data.datasets.zensus_vg250.ZensusVg250>` *Resulting tables* * :py:class:`demand.egon_demandregio_hh <egon.data.datasets.demandregio.EgonDemandRegioHH>` is created and filled * :py:class:`demand.egon_demandregio_cts_ind <egon.data.datasets.demandregio.EgonDemandRegioCtsInd>` is created and filled * :py:class:`society.egon_demandregio_population <egon.data.datasets.demandregio.EgonDemandRegioPopulation>` is created and filled * :py:class:`society.egon_demandregio_household <egon.data.datasets.demandregio.EgonDemandRegioHouseholds>` is created and filled * :py:class:`demand.egon_demandregio_wz <egon.data.datasets.demandregio.EgonDemandRegioWz>` is created and filled * :py:class:`demand.egon_demandregio_timeseries_cts_ind <egon.data.datasets.demandregio.EgonDemandRegioTimeseriesCtsInd>` is created and filled """ sources = DatasetSources( files={ "wz_cts": "WZ_definition/CTS_WZ_definition.csv", "wz_industry": "WZ_definition/ind_WZ_definition.csv", "new_consumers_2035": "new_largescale_consumers_nep.csv", "cache_zip": "demand_regio_backup/cache.zip", "dbdump_zip": "demand_regio_backup/status2019-egon-demandregio-cts-ind.zip", }, tables={ "vg250_krs": "boundaries.vg250_krs", }, ) targets = DatasetTargets( files={ "cache_dir": "demandregio/cache", "dbdump_dir": "demandregio/dbdump", }, tables={ "hh_demand": "demand.egon_demandregio_hh", "cts_ind_demand": "demand.egon_demandregio_cts_ind", "population": "society.egon_demandregio_population", "households": "society.egon_demandregio_household", "wz_definitions": "demand.egon_demandregio_wz", "timeseries_cts_ind": "demand.egon_demandregio_timeseries_cts_ind", }, ) #: name: str = "DemandRegio" #: version: str = "0.0.19" def __init__(self, dependencies): super().__init__( name=self.name, version=self.version, dependencies=dependencies, tasks=( get_cached_tables, # adhoc workaround #180 create_tables, { insert_household_demand, insert_society_data, insert_cts_ind_demands, }, ), )
[docs] class DemandRegioLoadProfiles(Base): __tablename__ = "demandregio_household_load_profiles" __table_args__ = {"schema": "demand"} id = Column(Integer, primary_key=True) year = Column(Integer) nuts3 = Column(String) load_in_mwh = Column(ARRAY(Float()))
[docs] class EgonDemandRegioHH(Base): __tablename__ = "egon_demandregio_hh" __table_args__ = {"schema": "demand"} nuts3 = Column(String(5), primary_key=True) hh_size = Column(Integer, primary_key=True) scenario = Column(String, ForeignKey(EgonScenario.name), primary_key=True) year = Column(Integer) demand = Column(Float)
[docs] class EgonDemandRegioCtsInd(Base): __tablename__ = "egon_demandregio_cts_ind" __table_args__ = {"schema": "demand"} nuts3 = Column(String(5), primary_key=True) wz = Column(Integer, primary_key=True) scenario = Column(String, ForeignKey(EgonScenario.name), primary_key=True) year = Column(Integer) demand = Column(Float)
[docs] class EgonDemandRegioPopulation(Base): __tablename__ = "egon_demandregio_population" __table_args__ = {"schema": "society"} nuts3 = Column(String(5), primary_key=True) year = Column(Integer, primary_key=True) population = Column(Float)
[docs] class EgonDemandRegioHouseholds(Base): __tablename__ = "egon_demandregio_household" __table_args__ = {"schema": "society"} nuts3 = Column(String(5), primary_key=True) hh_size = Column(Integer, primary_key=True) year = Column(Integer, primary_key=True) households = Column(Integer)
[docs] class EgonDemandRegioWz(Base): __tablename__ = "egon_demandregio_wz" __table_args__ = {"schema": "demand"} wz = Column(Integer, primary_key=True) sector = Column(String(50)) definition = Column(String(150))
[docs] class EgonDemandRegioTimeseriesCtsInd(Base): __tablename__ = "egon_demandregio_timeseries_cts_ind" __table_args__ = {"schema": "demand"} wz = Column(Integer, primary_key=True) year = Column(Integer, primary_key=True) slp = Column(String(50)) load_curve = Column(ARRAY(Float()))
[docs] def create_tables(): """Create tables for demandregio data Returns ------- None. """ db.execute_sql("CREATE SCHEMA IF NOT EXISTS demand;") db.execute_sql("CREATE SCHEMA IF NOT EXISTS society;") engine = db.engine() EgonDemandRegioHH.__table__.create(bind=engine, checkfirst=True) EgonDemandRegioCtsInd.__table__.create(bind=engine, checkfirst=True) EgonDemandRegioPopulation.__table__.create(bind=engine, checkfirst=True) EgonDemandRegioHouseholds.__table__.create(bind=engine, checkfirst=True) EgonDemandRegioWz.__table__.create(bind=engine, checkfirst=True) DemandRegioLoadProfiles.__table__.create(bind=db.engine(), checkfirst=True) EgonDemandRegioTimeseriesCtsInd.__table__.drop( bind=engine, checkfirst=True ) EgonDemandRegioTimeseriesCtsInd.__table__.create( bind=engine, checkfirst=True )
[docs] def data_in_boundaries(df): """Select rows with nuts3 code within boundaries, used for testmode Parameters ---------- df : pandas.DataFrame Data for all nuts3 regions Returns ------- pandas.DataFrame Data for nuts3 regions within boundaries """ engine = db.engine() df = df.reset_index() # Change nuts3 region names to 2016 version nuts_names = {"DEB16": "DEB1C", "DEB19": "DEB1D"} df.loc[df.nuts3.isin(nuts_names), "nuts3"] = df.loc[ df.nuts3.isin(nuts_names), "nuts3" ].map(nuts_names) df = df.set_index("nuts3") return df[ df.index.isin( pd.read_sql( f"SELECT DISTINCT ON (nuts) nuts FROM {DemandRegio.sources.tables['vg250_krs']}", engine, ).nuts ) ]
[docs] def insert_cts_ind_wz_definitions(): """Insert demandregio's definitions of CTS and industrial branches""" engine = db.engine() # This dictionary replaces the logic from the old config file wz_files = {"CTS": "wz_cts", "industry": "wz_industry"} for sector, file_key in wz_files.items(): file_path = ( Path("data_bundle_egon_data") / DemandRegio.sources.files[file_key] ) delimiter = ";" if sector == "CTS" else "," df = ( pd.read_csv(file_path, delimiter=delimiter, header=None) .rename({0: "wz", 1: "definition"}, axis="columns") .set_index("wz") ) df["sector"] = sector df.to_sql( DemandRegio.targets.get_table_name("wz_definitions"), engine, schema=DemandRegio.targets.get_table_schema("wz_definitions"), if_exists="append", )
[docs] def match_nuts3_bl(): """Function that maps the federal state to each nuts3 region Returns ------- df : pandas.DataFrame List of nuts3 regions and the federal state of Germany. """ engine = db.engine() df = pd.read_sql( "SELECT DISTINCT ON (boundaries.vg250_krs.nuts) " "boundaries.vg250_krs.nuts, boundaries.vg250_lan.gen " "FROM boundaries.vg250_lan, boundaries.vg250_krs " " WHERE ST_CONTAINS(" "boundaries.vg250_lan.geometry, " "boundaries.vg250_krs.geometry)", con=engine, ) df.gen[df.gen == "Baden-Württemberg (Bodensee)"] = "Baden-Württemberg" df.gen[df.gen == "Bayern (Bodensee)"] = "Bayern" return df.set_index("nuts")
[docs] def adjust_ind_pes(ec_cts_ind): """ Adjust electricity demand of industrial consumers due to electrification of process heat based on assumptions of pypsa-eur-sec. Parameters ---------- ec_cts_ind : pandas.DataFrame Industrial demand without additional electrification Returns ------- ec_cts_ind : pandas.DataFrame Industrial demand with additional electrification """ pes_path = Path("data_bundle_egon_data") # All file paths now use the new class attributes demand_today = pd.read_csv( pes_path / DemandRegio.sources.files["pes_demand_today"], header=None, ).transpose() # Filter data demand_today[1].fillna("carrier", inplace=True) demand_today = demand_today[ (demand_today[0] == "DE") | (demand_today[1] == "carrier") ].drop([0, 2], axis="columns") demand_today = ( demand_today.transpose() .set_index(0) .transpose() .set_index("carrier") .transpose() .loc["electricity"] .astype(float) ) # Calculate future industrial demand from pypsa-eur-sec # based on production and energy demands per carrier ('sector ratios') prod_tomorrow = pd.read_csv( pes_path / DemandRegio.sources.files["pes_production_tomorrow"] ) prod_tomorrow = prod_tomorrow[prod_tomorrow["kton/a"] == "DE"].set_index( "kton/a" ) sector_ratio = ( pd.read_csv(pes_path / DemandRegio.sources.files["pes_sector_ratios"]) .set_index("MWh/tMaterial") .loc["elec"] ) demand_tomorrow = prod_tomorrow.multiply( sector_ratio.div(1000) ).transpose()["DE"] # Calculate changes of electrical demand per sector in pypsa-eur-sec change = pd.DataFrame( (demand_tomorrow / demand_today) / (demand_tomorrow / demand_today).sum() ) # Drop rows without changes change = change[~change[0].isnull()] # Map industrial branches of pypsa-eur-sec to WZ2008 used in demandregio change["wz"] = change.index.map( { "Alumina production": 24, "Aluminium - primary production": 24, "Aluminium - secondary production": 24, "Ammonia": 20, "Basic chemicals (without ammonia)": 20, "Cement": 23, "Ceramics & other NMM": 23, "Electric arc": 24, "Food, beverages and tobacco": 10, "Glass production": 23, "Integrated steelworks": 24, "Machinery Equipment": 28, "Other Industrial Sectors": 32, "Other chemicals": 20, "Other non-ferrous metals": 24, "Paper production": 17, "Pharmaceutical products etc.": 21, "Printing and media reproduction": 18, "Pulp production": 17, "Textiles and leather": 13, "Transport Equipment": 29, "Wood and wood products": 16, } ) # Group by WZ2008 shares_per_wz = change.groupby("wz")[0].sum() # Calculate addtional demands needed to meet future demand of pypsa-eur-sec addtional_mwh = shares_per_wz.multiply( demand_tomorrow.sum() * 1000000 - ec_cts_ind.sum().sum() ) # Calulate overall industrial demand for eGon100RE final_mwh = addtional_mwh + ec_cts_ind[addtional_mwh.index].sum() # Linear scale the industrial demands per nuts3 and wz to meet final demand ec_cts_ind[addtional_mwh.index] *= ( final_mwh / ec_cts_ind[addtional_mwh.index].sum() ) return ec_cts_ind
[docs] def adjust_cts_ind_nep(ec_cts_ind, sector): """Add electrical demand of new largescale CTS und industrial consumers according to NEP 2021, scneario C 2035. Values per federal state are linear distributed over all CTS branches and nuts3 regions. Parameters ---------- ec_cts_ind : pandas.DataFrame CTS or industry demand without new largescale consumers. Returns ------- ec_cts_ind : pandas.DataFrame CTS or industry demand including new largescale consumers. """ file_path = ( Path("data_bundle_egon_data") / DemandRegio.sources.files["new_consumers_2035"] ) # get data from NEP per federal state new_con = pd.read_csv(file_path, delimiter=";", decimal=",", index_col=0) # match nuts3 regions to federal states groups = ec_cts_ind.groupby(match_nuts3_bl().gen) # update demands per federal state for group in groups.indices.keys(): g = groups.get_group(group) data_new = g.mul(1 + new_con[sector][group] * 1e6 / g.sum().sum()) ec_cts_ind[ec_cts_ind.index.isin(g.index)] = data_new return ec_cts_ind
[docs] def disagg_households_power(scenario, year, original=False, **kwargs): """ Perform spatial disaggregation of electric power in [GWh/a] by key Similar to disaggregator.spatial.disagg_households_power Parameters ---------- by : str must be one of ['households', 'population'] orignal : bool, optional Throughput to function households_per_size, A flag if the results should be left untouched and returned in original form for the year 2011 (True) or if they should be scaled to the given `year` by the population in that year (False). Returns ------- pd.DataFrame or pd.Series """ # source: survey of energieAgenturNRW # with/without direct water heating (DHW), and weighted average # https://1-stromvergleich.com/wp-content/uploads/erhebung_wo_bleibt_der_strom.pdf demand_per_hh_size = pd.DataFrame( index=range(1, 7), data={ # "weighted DWH": [2290, 3202, 4193, 4955, 5928, 5928], # "without DHW": [1714, 2812, 3704, 4432, 5317, 5317], "with_DHW": [2181, 3843, 5151, 6189, 7494, 8465], "without_DHW": [1798, 2850, 3733, 4480, 5311, 5816], "weighted": [2256, 3248, 4246, 5009, 5969, 6579], }, ) if scenario == "eGon100RE": # chose demand per household size from survey without DHW power_per_HH = demand_per_hh_size["without_DHW"] / 1e3 # calculate demand per nuts3 in 2011 df_2011 = data.households_per_size(year=2011) * power_per_HH # scale demand per hh-size to meet demand without heat # according to JRC in 2011 (136.6-(20.14+9.41) TWh) power_per_HH *= (136.6 - (20.14 + 9.41)) * 1e6 / df_2011.sum().sum() # calculate demand per nuts3 in 2050 df = data.households_per_size(year=year) * power_per_HH # Bottom-Up: Power demand by household sizes in [MWh/a] for each scenario else: # chose demand per household size from survey including weighted DHW power_per_HH = demand_per_hh_size["weighted"] / 1e3 # calculate demand per nuts3 df = ( data.households_per_size(original=original, year=year) * power_per_HH ) return df
[docs] def write_demandregio_hh_profiles_to_db(hh_profiles): """Write HH demand profiles from demand regio into db. One row per year and nuts3. The annual load profile timeseries is an array. schema: demand tablename: demandregio_household_load_profiles Parameters ---------- hh_profiles: pd.DataFrame Returns ------- """ years = hh_profiles.index.year.unique().values df_to_db = pd.DataFrame( columns=["id", "year", "nuts3", "load_in_mwh"] ).set_index("id") dataset = egon.data.config.settings()["egon-data"]["--dataset-boundary"] if dataset == "Schleswig-Holstein": hh_profiles = hh_profiles.loc[ :, hh_profiles.columns.str.contains("DEF0") ] idx = pd.read_sql_query( f""" SELECT MAX(id) FROM {DemandRegioLoadProfiles.__table__.schema}. {DemandRegioLoadProfiles.__table__.name} """, con=db.engine(), ).iat[0, 0] idx = 0 if idx is None else idx + 1 for year in years: df = hh_profiles[hh_profiles.index.year == year] for nuts3 in hh_profiles.columns: idx += 1 df_to_db.at[idx, "year"] = year df_to_db.at[idx, "nuts3"] = nuts3 df_to_db.at[idx, "load_in_mwh"] = df[nuts3].to_list() df_to_db["year"] = df_to_db["year"].apply(int) df_to_db["nuts3"] = df_to_db["nuts3"].astype(str) df_to_db["load_in_mwh"] = df_to_db["load_in_mwh"].apply(list) df_to_db = df_to_db.reset_index() df_to_db.to_sql( name=DemandRegioLoadProfiles.__table__.name, schema=DemandRegioLoadProfiles.__table__.schema, con=db.engine(), if_exists="append", index=-False, )
[docs] def insert_hh_demand(scenario, year, engine): """Calculates electrical demands of private households using demandregio's disaggregator and insert results into the database. Parameters ---------- scenario : str Name of the corresponding scenario. year : int The number of households per region is taken from this year. Returns ------- None. """ # get spatial distribution of demands of private households per nuts # and size from demandregio ec_hh = disagg_households_power(scenario, year) # Scale to meet target value # For status2019 and eGon2021 the final demand from demandregio is kept if scenario not in ["status2019", "eGon2021"]: ec_hh *= ( get_sector_parameters("electricity", scenario=scenario)[ "annual_demand" ]["households"] * 1e6 / ec_hh.sum().sum() ) # Select demands for nuts3-regions in boundaries (needed for testmode) ec_hh = data_in_boundaries(ec_hh) # insert into database for hh_size in ec_hh.columns: df = pd.DataFrame(ec_hh[hh_size]) df["year"] = ( 2023 if scenario == "status2023" else year ) # TODO status2023 # adhoc fix until ffeopendata servers are up and population_year # can be set df["scenario"] = scenario df["hh_size"] = hh_size df = df.rename({hh_size: "demand"}, axis="columns") df.to_sql( DemandRegio.targets.get_table_name("hh_demand"), engine, schema=DemandRegio.targets.get_table_schema("hh_demand"), if_exists="append", ) # insert housholds demand timeseries try: hh_load_timeseries = ( temporal.disagg_temporal_power_housholds_slp( use_nuts3code=True, by="households", weight_by_income=False, year=year, ) .resample("h") .sum() ) hh_load_timeseries.rename( columns={"DEB16": "DEB1C", "DEB19": "DEB1D"}, inplace=True ) except Exception as e: logger.warning( f"Couldnt get profiles from FFE, will use pickeld fallback! \n {e}" ) hh_load_timeseries = pd.read_csv( Path("data_bundle_egon_data") / "demand_regio_backup" / "df_load_profiles.csv", index_col="time", ) hh_load_timeseries.index = pd.to_datetime( hh_load_timeseries.index, format="%Y-%m-%d %H:%M:%S" ) def change_year(dt, year): return dt.replace(year=year) year = 2023 if scenario == "status2023" else year # TODO status2023 hh_load_timeseries.index = hh_load_timeseries.index.map( lambda dt: change_year(dt, year) ) if scenario == "status2023": hh_load_timeseries = hh_load_timeseries.shift(24 * 2) hh_load_timeseries.iloc[: 24 * 7] = hh_load_timeseries.iloc[ 24 * 7 : 24 * 7 * 2 ].values write_demandregio_hh_profiles_to_db(hh_load_timeseries)
[docs] def insert_cts_ind(scenario, year, engine, target_values): """Calculates electrical demands of CTS and industry using demandregio's disaggregator, adjusts them according to resulting values of NEP 2021 or JRC IDEES and insert results into the database. Parameters ---------- scenario : str Name of the corresponing scenario. year : int The number of households per region is taken from this year. target_values : dict List of target values for each scenario and sector. Returns ------- None. """ # targets = egon.data.config.datasets()["demandregio_cts_ind_demand"]["targets"] wz_table = pd.read_sql( f"SELECT wz, sector FROM {DemandRegio.targets.tables['wz_definitions']}", con=engine, index_col="wz", ) # Workaround: Since the disaggregator does not work anymore, data from # previous runs is used for eGon2035 and eGon100RE if scenario == "eGon2035": file2035_path = ( Path("data_bundle_egon_data") / "demand_regio_backup" / "egon_demandregio_cts_ind_egon2035.csv" ) ec_cts_ind2 = pd.read_csv(file2035_path) ec_cts_ind2.to_sql( DemandRegio.targets.get_table_name("cts_ind_demand"), engine, schema=DemandRegio.targets.get_table_schema("cts_ind_demand"), if_exists="append", index=False, ) return if scenario == "eGon100RE": ec_cts_ind2 = pd.read_csv( Path("data_bundle_egon_data") / "demand_regio_backup" / "egon_demandregio_cts_ind.csv" ) ec_cts_ind2["sector"] = ec_cts_ind2["wz"].map(wz_table["sector"]) factor_ind = target_values["industry"] / ( ec_cts_ind2[ec_cts_ind2["sector"] == "industry"]["demand"].sum() / 1000 ) factor_cts = target_values["CTS"] / ( ec_cts_ind2[ec_cts_ind2["sector"] == "CTS"]["demand"].sum() / 1000 ) ec_cts_ind2["demand"] = ec_cts_ind2.apply( lambda x: ( x["demand"] * factor_ind if x["sector"] == "industry" else x["demand"] * factor_cts ), axis=1, ) ec_cts_ind2.drop(columns=["sector"], inplace=True) ec_cts_ind2.to_sql( DemandRegio.targets.get_table_name("cts_ind_demand"), engine, schema=DemandRegio.targets.get_table_schema("cts_ind_demand"), if_exists="append", index=False, ) return for sector in ["CTS", "industry"]: # get demands per nuts3 and wz of demandregio ec_cts_ind = spatial.disagg_CTS_industry( use_nuts3code=True, source="power", sector=sector, year=year ).transpose() ec_cts_ind.index = ec_cts_ind.index.rename("nuts3") # exclude mobility sector from GHD ec_cts_ind = ec_cts_ind.drop(columns=49, errors="ignore") # scale values according to target_values if target_values: if sector in target_values.keys(): ec_cts_ind *= ( target_values[sector] * 1e3 / ec_cts_ind.sum().sum() ) else: print( f"No scaling factors for scenario {scenario}." "Data from demandregio is used without scaling." ) # include new largescale consumers according to NEP 2021 if scenario == "eGon2035": ec_cts_ind = adjust_cts_ind_nep(ec_cts_ind, sector) # include new industrial demands due to sector coupling if (scenario == "eGon100RE") & (sector == "industry"): ec_cts_ind = adjust_ind_pes(ec_cts_ind) # Select demands for nuts3-regions in boundaries (needed for testmode) ec_cts_ind = data_in_boundaries(ec_cts_ind) # insert into database for wz in ec_cts_ind.columns: df = pd.DataFrame(ec_cts_ind[wz]) df["year"] = year df["wz"] = wz df["scenario"] = scenario df = df.rename({wz: "demand"}, axis="columns") df.index = df.index.rename("nuts3") df.to_sql( DemandRegio.targets.get_table_name("cts_ind_demand"), engine, schema=DemandRegio.targets.get_table_schema("cts_ind_demand"), if_exists="append", )
[docs] def insert_household_demand(): """Insert electrical demands for households according to demandregio using its disaggregator-tool in MWh Returns ------- None. """ engine = db.engine() scenarios = egon.data.config.settings()["egon-data"]["--scenarios"] scenarios.append("eGon2021") for table_key in ["hh_demand"]: # Assuming this is the only target here db.execute_sql(f"DELETE FROM {DemandRegio.targets.tables[table_key]};") for scn in scenarios: year = ( 2023 if scn == "status2023" else scenario_parameters.global_settings(scn)["population_year"] ) # Insert demands of private households insert_hh_demand(scn, year, engine)
[docs] def insert_cts_ind_demands(): """Insert electricity demands per nuts3-region in Germany according to demandregio using its disaggregator-tool in MWh Returns ------- None. """ engine = db.engine() for table_key in [ "cts_ind_demand", "wz_definitions", "timeseries_cts_ind", ]: db.execute_sql(f"DELETE FROM {DemandRegio.targets.tables[table_key]};") insert_cts_ind_wz_definitions() scenarios = egon.data.config.settings()["egon-data"]["--scenarios"] scenarios.append("eGon2021") for scn in scenarios: year = scenario_parameters.global_settings(scn)["population_year"] if year > 2035: year = 2035 # target values per scenario in MWh # for eGon2021 and status2019 demandregio-data is used without scaling if scn not in ["eGon2021", "status2019"]: target_values = { "CTS": get_sector_parameters("electricity", scenario=scn)[ "annual_demand" ]["CTS"], "industry": get_sector_parameters("electricity", scenario=scn)[ "annual_demand" ]["industry"], } else: target_values = None insert_cts_ind(scn, year, engine, target_values) # Insert load curves per wz timeseries_per_wz()
[docs] def insert_society_data(): """Insert population and number of households per nuts3-region in Germany according to demandregio using its disaggregator-tool Returns ------- None. """ engine = db.engine() for table_key in ["population", "households"]: db.execute_sql(f"DELETE FROM {DemandRegio.targets.tables[table_key]};") target_years = np.append( get_sector_parameters("global").population_year.values, 2018 ) for year in target_years: df_pop = pd.DataFrame(data.population(year=year)) df_pop["year"] = year df_pop = df_pop.rename({"value": "population"}, axis="columns") # Select data for nuts3-regions in boundaries (needed for testmode) df_pop = data_in_boundaries(df_pop) df_pop.to_sql( DemandRegio.targets.get_table_name("population"), engine, schema=DemandRegio.targets.get_table_schema("population"), if_exists="append", ) for year in target_years: df_hh = pd.DataFrame(data.households_per_size(year=year)) # Select data for nuts3-regions in boundaries (needed for testmode) df_hh = data_in_boundaries(df_hh) for hh_size in df_hh.columns: df = pd.DataFrame(df_hh[hh_size]) df["year"] = year df["hh_size"] = hh_size df = df.rename({hh_size: "households"}, axis="columns") df.to_sql( DemandRegio.targets.get_table_name("households"), engine, schema=DemandRegio.targets.get_table_schema("households"), if_exists="append", )
[docs] def insert_timeseries_per_wz(sector, year): """Insert normalized electrical load time series for the selected sector Parameters ---------- sector : str Name of the sector. ['CTS', 'industry'] year : int Selected weather year Returns ------- None. """ if sector == "CTS": profiles = ( data.CTS_power_slp_generator("SH", year=year) .drop( [ "Day", "Hour", "DayOfYear", "WD", "SA", "SU", "WIZ", "SOZ", "UEZ", ], axis="columns", ) .resample("H") .sum() ) wz_slp = config.slp_branch_cts_power() elif sector == "industry": profiles = ( data.shift_load_profile_generator(state="SH", year=year) .resample("H") .sum() ) wz_slp = config.shift_profile_industry() else: print(f"Sector {sector} is not valid.") df = pd.DataFrame( index=wz_slp.keys(), columns=["slp", "load_curve", "year"] ) df.index.rename("wz", inplace=True) df.slp = wz_slp.values() df.year = year df.load_curve = profiles[df.slp].transpose().values.tolist() db.execute_sql(f""" DELETE FROM {DemandRegio.targets.tables['timeseries_cts_ind']} WHERE wz IN ( SELECT wz FROM {DemandRegio.targets.tables['wz_definitions']} WHERE sector = '{sector}') """) df.to_sql( DemandRegio.targets.get_table_name("timeseries_cts_ind"), schema=DemandRegio.targets.get_table_schema("timeseries_cts_ind"), con=db.engine(), if_exists="append", )
[docs] def timeseries_per_wz(): """Calcultae and insert normalized timeseries per wz for cts and industry Returns ------- None. """ scenarios = egon.data.config.settings()["egon-data"]["--scenarios"] year_already_in_database = [] for scn in scenarios: year = int(scenario_parameters.global_settings(scn)["weather_year"]) for sector in ["CTS", "industry"]: if year not in year_already_in_database: insert_timeseries_per_wz(sector, int(year)) year_already_in_database.append(year)
[docs] def get_cached_tables(): """Get cached demandregio tables and db-dump from former runs""" data_config = egon.data.config.datasets() for s in ["cache", "dbdump"]: source_path = data_config["demandregio_workaround"]["source"][s][ "path" ] target_path = Path( ".", data_config["demandregio_workaround"]["targets"][s]["path"] ) os.makedirs(target_path, exist_ok=True) with zipfile.ZipFile(source_path, "r") as zip_ref: zip_ref.extractall(path=target_path)