"""The central module containing all code dealing with importing and
adjusting data from demandRegio
"""
from pathlib import Path
from sqlalchemy import ARRAY, Column, Float, ForeignKey, Integer, String
from sqlalchemy.ext.declarative import declarative_base
import numpy as np
import pandas as pd
from egon.data import db
from egon.data.datasets import Dataset
from egon.data.datasets.demandregio.install_disaggregator import (
clone_and_install,
)
from egon.data.datasets.scenario_parameters import (
EgonScenario,
get_sector_parameters,
)
import egon.data.config
import egon.data.datasets.scenario_parameters.parameters as scenario_parameters
try:
from disaggregator import config, data, spatial
except ImportError as e:
pass
# will be later imported from another file ###
Base = declarative_base()
[docs]class DemandRegio(Dataset):
def __init__(self, dependencies):
super().__init__(
name="DemandRegio",
version="0.0.5",
dependencies=dependencies,
tasks=(
clone_and_install,
create_tables,
{
insert_household_demand,
insert_society_data,
insert_cts_ind_demands,
},
),
)
[docs]class EgonDemandRegioHH(Base):
__tablename__ = "egon_demandregio_hh"
__table_args__ = {"schema": "demand"}
nuts3 = Column(String(5), primary_key=True)
hh_size = Column(Integer, primary_key=True)
scenario = Column(String, ForeignKey(EgonScenario.name), primary_key=True)
year = Column(Integer)
demand = Column(Float)
[docs]class EgonDemandRegioCtsInd(Base):
__tablename__ = "egon_demandregio_cts_ind"
__table_args__ = {"schema": "demand"}
nuts3 = Column(String(5), primary_key=True)
wz = Column(Integer, primary_key=True)
scenario = Column(String, ForeignKey(EgonScenario.name), primary_key=True)
year = Column(Integer)
demand = Column(Float)
[docs]class EgonDemandRegioPopulation(Base):
__tablename__ = "egon_demandregio_population"
__table_args__ = {"schema": "society"}
nuts3 = Column(String(5), primary_key=True)
year = Column(Integer, primary_key=True)
population = Column(Float)
[docs]class EgonDemandRegioHouseholds(Base):
__tablename__ = "egon_demandregio_household"
__table_args__ = {"schema": "society"}
nuts3 = Column(String(5), primary_key=True)
hh_size = Column(Integer, primary_key=True)
year = Column(Integer, primary_key=True)
households = Column(Integer)
[docs]class EgonDemandRegioWz(Base):
__tablename__ = "egon_demandregio_wz"
__table_args__ = {"schema": "demand"}
wz = Column(Integer, primary_key=True)
sector = Column(String(50))
definition = Column(String(150))
[docs]class EgonDemandRegioTimeseriesCtsInd(Base):
__tablename__ = "egon_demandregio_timeseries_cts_ind"
__table_args__ = {"schema": "demand"}
wz = Column(Integer, primary_key=True)
year = Column(Integer, primary_key=True)
slp = Column(String(50))
load_curve = Column(ARRAY(Float()))
[docs]def create_tables():
"""Create tables for demandregio data
Returns
-------
None.
"""
db.execute_sql("CREATE SCHEMA IF NOT EXISTS demand;")
db.execute_sql("CREATE SCHEMA IF NOT EXISTS society;")
engine = db.engine()
EgonDemandRegioHH.__table__.create(bind=engine, checkfirst=True)
EgonDemandRegioCtsInd.__table__.create(bind=engine, checkfirst=True)
EgonDemandRegioPopulation.__table__.create(bind=engine, checkfirst=True)
EgonDemandRegioHouseholds.__table__.create(bind=engine, checkfirst=True)
EgonDemandRegioWz.__table__.create(bind=engine, checkfirst=True)
EgonDemandRegioTimeseriesCtsInd.__table__.drop(
bind=engine, checkfirst=True
)
EgonDemandRegioTimeseriesCtsInd.__table__.create(
bind=engine, checkfirst=True
)
[docs]def data_in_boundaries(df):
"""Select rows with nuts3 code within boundaries, used for testmode
Parameters
----------
df : pandas.DataFrame
Data for all nuts3 regions
Returns
-------
pandas.DataFrame
Data for nuts3 regions within boundaries
"""
engine = db.engine()
df = df.reset_index()
# Change nuts3 region names to 2016 version
nuts_names = {"DEB16": "DEB1C", "DEB19": "DEB1D"}
df.loc[df.nuts3.isin(nuts_names), "nuts3"] = df.loc[
df.nuts3.isin(nuts_names), "nuts3"
].map(nuts_names)
df = df.set_index("nuts3")
return df[
df.index.isin(
pd.read_sql(
"SELECT DISTINCT ON (nuts) nuts FROM boundaries.vg250_krs",
engine,
).nuts
)
]
[docs]def insert_cts_ind_wz_definitions():
"""Insert demandregio's definitions of CTS and industrial branches
Returns
-------
None.
"""
source = egon.data.config.datasets()["demandregio_cts_ind_demand"][
"sources"
]
target = egon.data.config.datasets()["demandregio_cts_ind_demand"][
"targets"
]["wz_definitions"]
engine = db.engine()
for sector in source["wz_definitions"]:
file_path = (
Path(".")
/ "data_bundle_egon_data"
/ "WZ_definition"
/ source["wz_definitions"][sector]
)
if sector == "CTS":
delimiter = ";"
else:
delimiter = ","
df = (
pd.read_csv(file_path, delimiter=delimiter, header=None)
.rename({0: "wz", 1: "definition"}, axis="columns")
.set_index("wz")
)
df["sector"] = sector
df.to_sql(
target["table"],
engine,
schema=target["schema"],
if_exists="append",
)
[docs]def match_nuts3_bl():
"""Function that maps the federal state to each nuts3 region
Returns
-------
df : pandas.DataFrame
List of nuts3 regions and the federal state of Germany.
"""
engine = db.engine()
df = pd.read_sql(
"SELECT DISTINCT ON (boundaries.vg250_krs.nuts) "
"boundaries.vg250_krs.nuts, boundaries.vg250_lan.gen "
"FROM boundaries.vg250_lan, boundaries.vg250_krs "
" WHERE ST_CONTAINS("
"boundaries.vg250_lan.geometry, "
"boundaries.vg250_krs.geometry)",
con=engine,
)
df.gen[df.gen == "Baden-Württemberg (Bodensee)"] = "Baden-Württemberg"
df.gen[df.gen == "Bayern (Bodensee)"] = "Bayern"
return df.set_index("nuts")
[docs]def adjust_ind_pes(ec_cts_ind):
"""
Adjust electricity demand of industrial consumers due to electrification
of process heat based on assumptions of pypsa-eur-sec.
Parameters
----------
ec_cts_ind : pandas.DataFrame
Industrial demand without additional electrification
Returns
-------
ec_cts_ind : pandas.DataFrame
Industrial demand with additional electrification
"""
pes_path = (
Path(".") / "data_bundle_egon_data" / "pypsa_eur_sec" / "resources"
)
sources = egon.data.config.datasets()["demandregio_cts_ind_demand"][
"sources"
]["new_consumers_2050"]
# Extract today's industrial demand from pypsa-eur-sec
demand_today = pd.read_csv(
pes_path / sources["pes-demand-today"],
header=None,
).transpose()
# Filter data
demand_today[1].fillna("carrier", inplace=True)
demand_today = demand_today[
(demand_today[0] == "DE") | (demand_today[1] == "carrier")
].drop([0, 2], axis="columns")
demand_today = (
demand_today.transpose()
.set_index(0)
.transpose()
.set_index("carrier")
.transpose()
.loc["electricity"]
.astype(float)
)
# Calculate future industrial demand from pypsa-eur-sec
# based on production and energy demands per carrier ('sector ratios')
prod_tomorrow = pd.read_csv(pes_path / sources["pes-production-tomorrow"])
prod_tomorrow = prod_tomorrow[prod_tomorrow["kton/a"] == "DE"].set_index(
"kton/a"
)
sector_ratio = (
pd.read_csv(pes_path / sources["pes-sector-ratios"])
.set_index("MWh/tMaterial")
.loc["elec"]
)
demand_tomorrow = prod_tomorrow.multiply(
sector_ratio.div(1000)
).transpose()["DE"]
# Calculate changes of electrical demand per sector in pypsa-eur-sec
change = pd.DataFrame(
(demand_tomorrow / demand_today)
/ (demand_tomorrow / demand_today).sum()
)
# Drop rows without changes
change = change[~change[0].isnull()]
# Map industrial branches of pypsa-eur-sec to WZ2008 used in demandregio
change["wz"] = change.index.map(
{
"Alumina production": 24,
"Aluminium - primary production": 24,
"Aluminium - secondary production": 24,
"Ammonia": 20,
"Basic chemicals (without ammonia)": 20,
"Cement": 23,
"Ceramics & other NMM": 23,
"Electric arc": 24,
"Food, beverages and tobacco": 10,
"Glass production": 23,
"Integrated steelworks": 24,
"Machinery Equipment": 28,
"Other Industrial Sectors": 32,
"Other chemicals": 20,
"Other non-ferrous metals": 24,
"Paper production": 17,
"Pharmaceutical products etc.": 21,
"Printing and media reproduction": 18,
"Pulp production": 17,
"Textiles and leather": 13,
"Transport Equipment": 29,
"Wood and wood products": 16,
}
)
# Group by WZ2008
shares_per_wz = change.groupby("wz")[0].sum()
# Calculate addtional demands needed to meet future demand of pypsa-eur-sec
addtional_mwh = shares_per_wz.multiply(
demand_tomorrow.sum() * 1000000 - ec_cts_ind.sum().sum()
)
# Calulate overall industrial demand for eGon100RE
final_mwh = addtional_mwh + ec_cts_ind[addtional_mwh.index].sum()
# Linear scale the industrial demands per nuts3 and wz to meet final demand
ec_cts_ind[addtional_mwh.index] *= (
final_mwh / ec_cts_ind[addtional_mwh.index].sum()
)
return ec_cts_ind
[docs]def adjust_cts_ind_nep(ec_cts_ind, sector):
"""Add electrical demand of new largescale CTS und industrial consumers
according to NEP 2021, scneario C 2035. Values per federal state are
linear distributed over all CTS branches and nuts3 regions.
Parameters
----------
ec_cts_ind : pandas.DataFrame
CTS or industry demand without new largescale consumers.
Returns
-------
ec_cts_ind : pandas.DataFrame
CTS or industry demand including new largescale consumers.
"""
sources = egon.data.config.datasets()["demandregio_cts_ind_demand"][
"sources"
]
file_path = (
Path(".")
/ "data_bundle_egon_data"
/ "nep2035_version2021"
/ sources["new_consumers_2035"]
)
# get data from NEP per federal state
new_con = pd.read_csv(file_path, delimiter=";", decimal=",", index_col=0)
# match nuts3 regions to federal states
groups = ec_cts_ind.groupby(match_nuts3_bl().gen)
# update demands per federal state
for group in groups.indices.keys():
g = groups.get_group(group)
data_new = g.mul(1 + new_con[sector][group] * 1e6 / g.sum().sum())
ec_cts_ind[ec_cts_ind.index.isin(g.index)] = data_new
return ec_cts_ind
[docs]def disagg_households_power(
scenario, year, weight_by_income=False, original=False, **kwargs
):
"""
Perform spatial disaggregation of electric power in [GWh/a] by key and
possibly weight by income.
Similar to disaggregator.spatial.disagg_households_power
Parameters
----------
by : str
must be one of ['households', 'population']
weight_by_income : bool, optional
Flag if to weight the results by the regional income (default False)
orignal : bool, optional
Throughput to function households_per_size,
A flag if the results should be left untouched and returned in
original form for the year 2011 (True) or if they should be scaled to
the given `year` by the population in that year (False).
Returns
-------
pd.DataFrame or pd.Series
"""
# source: survey of energieAgenturNRW
demand_per_hh_size = pd.DataFrame(
index=range(1, 7),
data={
"weighted DWH": [2290, 3202, 4193, 4955, 5928, 5928],
"without DHW": [1714, 2812, 3704, 4432, 5317, 5317],
},
)
# Bottom-Up: Power demand by household sizes in [MWh/a] for each scenario
if scenario in ["eGon2021", "eGon2035"]:
# chose demand per household size from survey including weighted DHW
power_per_HH = demand_per_hh_size["weighted DWH"] / 1e3
# calculate demand per nuts3
df = (
data.households_per_size(original=original, year=year)
* power_per_HH
)
if scenario == "eGon2035":
# scale to fit demand of NEP 2021 scebario C 2035 (119TWh)
df *= 119000000 / df.sum().sum()
elif scenario == "eGon100RE":
# chose demand per household size from survey without DHW
power_per_HH = demand_per_hh_size["without DHW"] / 1e3
# calculate demand per nuts3 in 2011
df_2011 = data.households_per_size(year=2011) * power_per_HH
# scale demand per hh-size to meet demand without heat
# according to JRC in 2011 (136.6-(20.14+9.41) TWh)
power_per_HH *= (136.6 - (20.14 + 9.41)) * 1e6 / df_2011.sum().sum()
# calculate demand per nuts3 in 2050
df = data.households_per_size(year=year) * power_per_HH
else:
print(
f"Electric demand per household size for scenario {scenario} "
"is not specified."
)
if weight_by_income:
df = spatial.adjust_by_income(df=df)
return df
[docs]def insert_hh_demand(scenario, year, engine):
"""Calculates electrical demands of private households using demandregio's
disaggregator and insert results into the database.
Parameters
----------
scenario : str
Name of the corresponing scenario.
year : int
The number of households per region is taken from this year.
Returns
-------
None.
"""
targets = egon.data.config.datasets()["demandregio_household_demand"][
"targets"
]["household_demand"]
# get demands of private households per nuts and size from demandregio
ec_hh = disagg_households_power(scenario, year)
# Select demands for nuts3-regions in boundaries (needed for testmode)
ec_hh = data_in_boundaries(ec_hh)
# insert into database
for hh_size in ec_hh.columns:
df = pd.DataFrame(ec_hh[hh_size])
df["year"] = year
df["scenario"] = scenario
df["hh_size"] = hh_size
df = df.rename({hh_size: "demand"}, axis="columns")
df.to_sql(
targets["table"],
engine,
schema=targets["schema"],
if_exists="append",
)
[docs]def insert_cts_ind(scenario, year, engine, target_values):
"""Calculates electrical demands of CTS and industry using demandregio's
disaggregator, adjusts them according to resulting values of NEP 2021 or
JRC IDEES and insert results into the database.
Parameters
----------
scenario : str
Name of the corresponing scenario.
year : int
The number of households per region is taken from this year.
target_values : dict
List of target values for each scenario and sector.
Returns
-------
None.
"""
targets = egon.data.config.datasets()["demandregio_cts_ind_demand"][
"targets"
]
for sector in ["CTS", "industry"]:
# get demands per nuts3 and wz of demandregio
ec_cts_ind = spatial.disagg_CTS_industry(
use_nuts3code=True, source="power", sector=sector, year=year
).transpose()
ec_cts_ind.index = ec_cts_ind.index.rename("nuts3")
# exclude mobility sector from GHD
ec_cts_ind = ec_cts_ind.drop(columns=49, errors="ignore")
# scale values according to target_values
if sector in target_values[scenario].keys():
ec_cts_ind *= (
target_values[scenario][sector] * 1e3 / ec_cts_ind.sum().sum()
)
# include new largescale consumers according to NEP 2021
if scenario == "eGon2035":
ec_cts_ind = adjust_cts_ind_nep(ec_cts_ind, sector)
# include new industrial demands due to sector coupling
if (scenario == "eGon100RE") & (sector == "industry"):
ec_cts_ind = adjust_ind_pes(ec_cts_ind)
# Select demands for nuts3-regions in boundaries (needed for testmode)
ec_cts_ind = data_in_boundaries(ec_cts_ind)
# insert into database
for wz in ec_cts_ind.columns:
df = pd.DataFrame(ec_cts_ind[wz])
df["year"] = year
df["wz"] = wz
df["scenario"] = scenario
df = df.rename({wz: "demand"}, axis="columns")
df.index = df.index.rename("nuts3")
df.to_sql(
targets["cts_ind_demand"]["table"],
engine,
targets["cts_ind_demand"]["schema"],
if_exists="append",
)
[docs]def insert_household_demand():
"""Insert electrical demands for households according to
demandregio using its disaggregator-tool in MWh
Returns
-------
None.
"""
targets = egon.data.config.datasets()["demandregio_household_demand"][
"targets"
]
engine = db.engine()
for t in targets:
db.execute_sql(
f"DELETE FROM {targets[t]['schema']}.{targets[t]['table']};"
)
for scn in ["eGon2021", "eGon2035", "eGon100RE"]:
year = scenario_parameters.global_settings(scn)["population_year"]
# Insert demands of private households
insert_hh_demand(scn, year, engine)
[docs]def insert_cts_ind_demands():
"""Insert electricity demands per nuts3-region in Germany according to
demandregio using its disaggregator-tool in MWh
Returns
-------
None.
"""
targets = egon.data.config.datasets()["demandregio_cts_ind_demand"][
"targets"
]
engine = db.engine()
for t in targets:
db.execute_sql(
f"DELETE FROM {targets[t]['schema']}.{targets[t]['table']};"
)
insert_cts_ind_wz_definitions()
for scn in ["eGon2021", "eGon2035", "eGon100RE"]:
year = scenario_parameters.global_settings(scn)["population_year"]
if year > 2035:
year = 2035
# target values per scenario in MWh
target_values = {
# according to NEP 2021
# new consumers will be added seperatly
"eGon2035": {"CTS": 135300, "industry": 225400},
# CTS: reduce overall demand from demandregio (without traffic)
# by share of heat according to JRC IDEES, data from 2011
# industry: no specific heat demand, use data from demandregio
"eGon100RE": {"CTS": (1 - (5.96 + 6.13) / 154.64) * 125183.403},
# no adjustments for status quo
"eGon2021": {},
}
insert_cts_ind(scn, year, engine, target_values)
# Insert load curves per wz
timeseries_per_wz()
[docs]def insert_society_data():
"""Insert population and number of households per nuts3-region in Germany
according to demandregio using its disaggregator-tool
Returns
-------
None.
"""
targets = egon.data.config.datasets()["demandregio_society"]["targets"]
engine = db.engine()
for t in targets:
db.execute_sql(
f"DELETE FROM {targets[t]['schema']}.{targets[t]['table']};"
)
target_years = np.append(
get_sector_parameters("global").population_year.values, 2018
)
for year in target_years:
df_pop = pd.DataFrame(data.population(year=year))
df_pop["year"] = year
df_pop = df_pop.rename({"value": "population"}, axis="columns")
# Select data for nuts3-regions in boundaries (needed for testmode)
df_pop = data_in_boundaries(df_pop)
df_pop.to_sql(
targets["population"]["table"],
engine,
schema=targets["population"]["schema"],
if_exists="append",
)
for year in target_years:
df_hh = pd.DataFrame(data.households_per_size(year=year))
# Select data for nuts3-regions in boundaries (needed for testmode)
df_hh = data_in_boundaries(df_hh)
for hh_size in df_hh.columns:
df = pd.DataFrame(df_hh[hh_size])
df["year"] = year
df["hh_size"] = hh_size
df = df.rename({hh_size: "households"}, axis="columns")
df.to_sql(
targets["household"]["table"],
engine,
schema=targets["household"]["schema"],
if_exists="append",
)
[docs]def insert_timeseries_per_wz(sector, year):
"""Insert normalized electrical load time series for the selected sector
Parameters
----------
sector : str
Name of the sector. ['CTS', 'industry']
year : int
Selected weather year
Returns
-------
None.
"""
targets = egon.data.config.datasets()["demandregio_cts_ind_demand"][
"targets"
]
if sector == "CTS":
profiles = (
data.CTS_power_slp_generator("SH", year=year).resample("H").sum()
)
wz_slp = config.slp_branch_cts_power()
elif sector == "industry":
profiles = (
data.shift_load_profile_generator(state="SH", year=year)
.resample("H")
.sum()
)
wz_slp = config.shift_profile_industry()
else:
print(f"Sector {sector} is not valid.")
df = pd.DataFrame(
index=wz_slp.keys(), columns=["slp", "load_curve", "year"]
)
df.index.rename("wz", inplace=True)
df.slp = wz_slp.values()
df.year = year
df.load_curve = profiles[df.slp].transpose().values.tolist()
db.execute_sql(
f"""
DELETE FROM {targets['timeseries_cts_ind']['schema']}.
{targets['timeseries_cts_ind']['table']}
WHERE wz IN (
SELECT wz FROM {targets['wz_definitions']['schema']}.
{targets['wz_definitions']['table']}
WHERE sector = '{sector}')
"""
)
df.to_sql(
targets["timeseries_cts_ind"]["table"],
schema=targets["timeseries_cts_ind"]["schema"],
con=db.engine(),
if_exists="append",
)
[docs]def timeseries_per_wz():
"""Calcultae and insert normalized timeseries per wz for cts and industry
Returns
-------
None.
"""
years = get_sector_parameters("global").weather_year.unique()
for year in years:
for sector in ["CTS", "industry"]:
insert_timeseries_per_wz(sector, int(year))