"""The central module containing all code dealing with importing data from
Netzentwicklungsplan 2035, Version 2031, Szenario C
"""
from pathlib import Path
from sqlalchemy import Column, Float, Integer, String
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
import numpy as np
import pandas as pd
import yaml
from egon.data import db
from egon.data.config import settings
from egon.data.datasets import Dataset
import egon.data.config
# will be later imported from another file
Base = declarative_base()
[docs]class EgonScenarioCapacities(Base):
__tablename__ = "egon_scenario_capacities"
__table_args__ = {"schema": "supply"}
index = Column(Integer, primary_key=True)
component = Column(String(25))
carrier = Column(String(50))
capacity = Column(Float)
nuts = Column(String(12))
scenario_name = Column(String(50))
[docs]class NEP2021ConvPowerPlants(Base):
__tablename__ = "egon_nep_2021_conventional_powerplants"
__table_args__ = {"schema": "supply"}
index = Column(String(50), primary_key=True)
bnetza_id = Column(String(50))
name = Column(String(100))
name_unit = Column(String(50))
carrier_nep = Column(String(50))
carrier = Column(String(12))
chp = Column(String(12))
postcode = Column(String(12))
city = Column(String(50))
federal_state = Column(String(12))
commissioned = Column(String(12))
status = Column(String(50))
capacity = Column(Float)
a2035_chp = Column(String(12))
a2035_capacity = Column(Float)
b2035_chp = Column(String(12))
b2035_capacity = Column(Float)
c2035_chp = Column(String(12))
c2035_capacity = Column(Float)
b2040_chp = Column(String(12))
b2040_capacity = Column(Float)
[docs]class ScenarioCapacities(Dataset):
"""
Create and fill table with installed generation capacities in Germany
This dataset creates and fills a table with the installed generation capacities in
Germany in a lower spatial resolution (either per federal state or on national level).
This data is coming from external sources (e.g. German grid developement plan for scenario eGon2035).
The table is in downstream datasets used to define target values for the installed capacities.
*Dependencies*
* :py:func:`Setup <egon.data.datasets.database.setup>`
* :py:class:`PypsaEurSec <egon.data.datasets.pypsaeursec.PypsaEurSec>`
* :py:class:`Vg250 <egon.data.datasets.vg250.Vg250>`
* :py:class:`DataBundle <egon.data.datasets.data_bundle.DataBundle>`
* :py:class:`ZensusPopulation <egon.data.datasets.zensus.ZensusPopulation>`
*Resulting tables*
* :py:class:`supply.egon_scenario_capacities <egon.data.datasets.scenario_capacities.EgonScenarioCapacities>` is created and filled
* :py:class:`supply.egon_nep_2021_conventional_powerplants <egon.data.datasets.scenario_capacities.NEP2021ConvPowerPlants>` is created and filled
"""
#:
name: str = "ScenarioCapacities"
#:
version: str = "0.0.13"
def __init__(self, dependencies):
super().__init__(
name=self.name,
version=self.version,
dependencies=dependencies,
tasks=(create_table, insert_data_nep, eGon100_capacities),
)
[docs]def create_table():
"""Create input tables for scenario setup
Returns
-------
None.
"""
engine = db.engine()
db.execute_sql("CREATE SCHEMA IF NOT EXISTS supply;")
EgonScenarioCapacities.__table__.drop(bind=engine, checkfirst=True)
NEP2021ConvPowerPlants.__table__.drop(bind=engine, checkfirst=True)
EgonScenarioCapacities.__table__.create(bind=engine, checkfirst=True)
NEP2021ConvPowerPlants.__table__.create(bind=engine, checkfirst=True)
[docs]def nuts_mapping():
nuts_mapping = {
"BW": "DE1",
"NW": "DEA",
"HE": "DE7",
"BB": "DE4",
"HB": "DE5",
"RP": "DEB",
"ST": "DEE",
"SH": "DEF",
"MV": "DE8",
"TH": "DEG",
"NI": "DE9",
"SN": "DED",
"HH": "DE6",
"SL": "DEC",
"BE": "DE3",
"BY": "DE2",
}
return nuts_mapping
[docs]def insert_capacities_per_federal_state_nep():
"""Inserts installed capacities per federal state accordning to
NEP 2035 (version 2021), scenario 2035 C
Returns
-------
None.
"""
sources = egon.data.config.datasets()["scenario_input"]["sources"]
targets = egon.data.config.datasets()["scenario_input"]["targets"]
# Connect to local database
engine = db.engine()
# Delete rows if already exist
db.execute_sql(
f"""
DELETE FROM
{targets['scenario_capacities']['schema']}.
{targets['scenario_capacities']['table']}
WHERE scenario_name = 'eGon2035'
AND nuts != 'DE'
"""
)
# read-in installed capacities per federal state of germany
target_file = (
Path(".")
/ "data_bundle_egon_data"
/ "nep2035_version2021"
/ sources["eGon2035"]["capacities"]
)
df = pd.read_excel(
target_file,
sheet_name="1.Entwurf_NEP2035_V2021",
index_col="Unnamed: 0",
)
df_draft = pd.read_excel(
target_file,
sheet_name="Entwurf_des_Szenariorahmens",
index_col="Unnamed: 0",
)
# Import data on wind offshore capacities
df_windoff = pd.read_excel(
target_file,
sheet_name="WInd_Offshore_NEP",
).dropna(subset=["Bundesland", "Netzverknuepfungspunkt"])
# Remove trailing whitespace from column Bundesland
df_windoff["Bundesland"] = df_windoff["Bundesland"].str.strip()
# Group and sum capacities per federal state
df_windoff_fs = (
df_windoff[["Bundesland", "C 2035"]].groupby(["Bundesland"]).sum()
)
# List federal state with an assigned wind offshore capacity
index_list = list(df_windoff_fs.index.values)
# Overwrite capacities in df_windoff with more accurate values from
# df_windoff_fs
for state in index_list:
df.at["Wind offshore", state] = (
df_windoff_fs.at[state, "C 2035"] / 1000
)
# sort NEP-carriers:
rename_carrier = {
"Wind onshore": "wind_onshore",
"Wind offshore": "wind_offshore",
"sonstige Konventionelle": "others",
"Speicherwasser": "reservoir",
"Laufwasser": "run_of_river",
"Biomasse": "biomass",
"Erdgas": "gas",
"Kuppelgas": "gas",
"PV (Aufdach)": "solar_rooftop",
"PV (Freiflaeche)": "solar",
"Pumpspeicher": "pumped_hydro",
"sonstige EE": "others",
"Oel": "oil",
"Haushaltswaermepumpen": "residential_rural_heat_pump",
"KWK < 10 MW": "small_chp",
}
# 'Elektromobilitaet gesamt': 'transport',
# 'Elektromobilitaet privat': 'transport'}
# nuts1 to federal state in Germany
map_nuts = pd.read_sql(
f"""
SELECT DISTINCT ON (nuts) gen, nuts
FROM {sources['boundaries']['schema']}.{sources['boundaries']['table']}
""",
engine,
index_col="gen",
)
insert_data = pd.DataFrame()
scaled_carriers = [
"Haushaltswaermepumpen",
"PV (Aufdach)",
"PV (Freiflaeche)",
]
for bl in map_nuts.index:
data = pd.DataFrame(df[bl])
# if distribution to federal states is not provided,
# use data from draft of scenario report
for c in scaled_carriers:
data.loc[c, bl] = (
df_draft.loc[c, bl]
/ df_draft.loc[c, "Summe"]
* df.loc[c, "Summe"]
)
# split hydro into run of river and reservoir
# according to draft of scenario report
if data.loc["Lauf- und Speicherwasser", bl] > 0:
for c in ["Speicherwasser", "Laufwasser"]:
data.loc[c, bl] = (
data.loc["Lauf- und Speicherwasser", bl]
* df_draft.loc[c, bl]
/ df_draft.loc[["Speicherwasser", "Laufwasser"], bl].sum()
)
data["carrier"] = data.index.map(rename_carrier)
data = data.groupby(data.carrier)[bl].sum().reset_index()
data["component"] = "generator"
data["nuts"] = map_nuts.nuts[bl]
data["scenario_name"] = "eGon2035"
# According to NEP, each heatpump has 5kW_el installed capacity
# source: Entwurf des Szenariorahmens NEP 2035, version 2021, page 47
data.loc[data.carrier == "residential_rural_heat_pump", bl] *= 5e-6
data.loc[
data.carrier == "residential_rural_heat_pump", "component"
] = "link"
data = data.rename(columns={bl: "capacity"})
# convert GW to MW
data.capacity *= 1e3
insert_data = insert_data.append(data)
# Get aggregated capacities from nep's power plant list for certain carrier
carriers = ["oil", "other_non_renewable", "pumped_hydro"]
capacities_list = aggr_nep_capacities(carriers)
# Filter by carrier
updated = insert_data[insert_data["carrier"].isin(carriers)]
# Merge to replace capacities for carriers "oil", "other_non_renewable" and
# "pumped_hydro"
updated = (
updated.merge(capacities_list, on=["carrier", "nuts"], how="left")
.fillna(0)
.drop(["capacity"], axis=1)
.rename(columns={"c2035_capacity": "capacity"})
)
# Remove updated entries from df
original = insert_data[~insert_data["carrier"].isin(carriers)]
# Join dfs
insert_data = pd.concat([original, updated])
# Insert data to db
insert_data.to_sql(
targets["scenario_capacities"]["table"],
engine,
schema=targets["scenario_capacities"]["schema"],
if_exists="append",
index=insert_data.index,
)
# Add district heating data accordning to energy and full load hours
district_heating_input()
[docs]def population_share():
"""Calulate share of population in testmode
Returns
-------
float
Share of population in testmode
"""
sources = egon.data.config.datasets()["scenario_input"]["sources"]
return (
pd.read_sql(
f"""
SELECT SUM(population)
FROM {sources['zensus_population']['schema']}.
{sources['zensus_population']['table']}
WHERE population>0
""",
con=db.engine(),
)["sum"][0]
/ 80324282
)
[docs]def aggr_nep_capacities(carriers):
"""Aggregates capacities from NEP power plants list by carrier and federal
state
Returns
-------
pandas.Dataframe
Dataframe with capacities per federal state and carrier
"""
# Get list of power plants from nep
nep_capacities = insert_nep_list_powerplants(export=False)[
["federal_state", "carrier", "c2035_capacity"]
]
# Sum up capacities per federal state and carrier
capacities_list = (
nep_capacities.groupby(["federal_state", "carrier"])["c2035_capacity"]
.sum()
.to_frame()
.reset_index()
)
# Neglect entries with carriers not in argument
capacities_list = capacities_list[capacities_list.carrier.isin(carriers)]
# Include NUTS code
capacities_list["nuts"] = capacities_list.federal_state.map(nuts_mapping())
# Drop entries for foreign plants with nan values and federal_state column
capacities_list = capacities_list.dropna(subset=["nuts"]).drop(
columns=["federal_state"]
)
return capacities_list
[docs]def map_carrier():
"""Map carriers from NEP and Marktstammdatenregister to carriers from eGon
Returns
-------
pandas.Series
List of mapped carriers
"""
return pd.Series(
data={
"Abfall": "others",
"Erdgas": "gas",
"Sonstige\nEnergieträger": "others",
"Steinkohle": "coal",
"Kuppelgase": "gas",
"Mineralöl-\nprodukte": "oil",
"Braunkohle": "lignite",
"Waerme": "others",
"Mineraloelprodukte": "oil",
"NichtBiogenerAbfall": "others",
"AndereGase": "gas",
"Sonstige_Energietraeger": "others",
"Kernenergie": "nuclear",
"Pumpspeicher": "pumped_hydro",
"Mineralöl-\nProdukte": "oil",
}
)
[docs]def insert_nep_list_powerplants(export=True):
"""Insert list of conventional powerplants attached to the approval
of the scenario report by BNetzA
Parameters
----------
export : bool
Choose if nep list should be exported to the data
base. The default is True.
If export=False a data frame will be returned
Returns
-------
kw_liste_nep : pandas.DataFrame
List of conventional power plants from nep if export=False
"""
sources = egon.data.config.datasets()["scenario_input"]["sources"]
targets = egon.data.config.datasets()["scenario_input"]["targets"]
# Connect to local database
engine = db.engine()
# Read-in data from csv-file
target_file = (
Path(".")
/ "data_bundle_egon_data"
/ "nep2035_version2021"
/ sources["eGon2035"]["list_conv_pp"]
)
kw_liste_nep = pd.read_csv(target_file, delimiter=";", decimal=",")
# Adjust column names
kw_liste_nep = kw_liste_nep.rename(
columns={
"BNetzA-ID": "bnetza_id",
"Kraftwerksname": "name",
"Blockname": "name_unit",
"Energieträger": "carrier_nep",
"KWK\nJa/Nein": "chp",
"PLZ": "postcode",
"Ort": "city",
"Bundesland/\nLand": "federal_state",
"Inbetrieb-\nnahmejahr": "commissioned",
"Status": "status",
"el. Leistung\n06.02.2020": "capacity",
"A 2035:\nKWK-Ersatz": "a2035_chp",
"A 2035:\nLeistung": "a2035_capacity",
"B 2035\nKWK-Ersatz": "b2035_chp",
"B 2035:\nLeistung": "b2035_capacity",
"C 2035:\nKWK-Ersatz": "c2035_chp",
"C 2035:\nLeistung": "c2035_capacity",
"B 2040:\nKWK-Ersatz": "b2040_chp",
"B 2040:\nLeistung": "b2040_capacity",
}
)
# Cut data to federal state if in testmode
boundary = settings()["egon-data"]["--dataset-boundary"]
if boundary != "Everything":
map_states = {
"Baden-Württemberg": "BW",
"Nordrhein-Westfalen": "NW",
"Hessen": "HE",
"Brandenburg": "BB",
"Bremen": "HB",
"Rheinland-Pfalz": "RP",
"Sachsen-Anhalt": "ST",
"Schleswig-Holstein": "SH",
"Mecklenburg-Vorpommern": "MV",
"Thüringen": "TH",
"Niedersachsen": "NI",
"Sachsen": "SN",
"Hamburg": "HH",
"Saarland": "SL",
"Berlin": "BE",
"Bayern": "BY",
}
kw_liste_nep = kw_liste_nep[
kw_liste_nep.federal_state.isin([map_states[boundary], np.nan])
]
for col in [
"capacity",
"a2035_capacity",
"b2035_capacity",
"c2035_capacity",
"b2040_capacity",
]:
kw_liste_nep.loc[
kw_liste_nep[kw_liste_nep.federal_state.isnull()].index, col
] *= population_share()
kw_liste_nep["carrier"] = map_carrier()[kw_liste_nep.carrier_nep].values
if export is True:
# Insert data to db
kw_liste_nep.to_sql(
targets["nep_conventional_powerplants"]["table"],
engine,
schema=targets["nep_conventional_powerplants"]["schema"],
if_exists="replace",
)
else:
return kw_liste_nep
[docs]def insert_data_nep():
"""Overall function for importing scenario input data for eGon2035 scenario
Returns
-------
None.
"""
insert_nep_list_powerplants(export=True)
insert_capacities_per_federal_state_nep()
[docs]def eGon100_capacities():
"""Inserts installed capacities for the eGon100 scenario
Returns
-------
None.
"""
sources = egon.data.config.datasets()["scenario_input"]["sources"]
targets = egon.data.config.datasets()["scenario_input"]["targets"]
# read-in installed capacities
execute_pypsa_eur_sec = False
cwd = Path(".")
if execute_pypsa_eur_sec:
filepath = cwd / "run-pypsa-eur-sec"
pypsa_eur_sec_repos = filepath / "pypsa-eur-sec"
# Read YAML file
pes_egonconfig = pypsa_eur_sec_repos / "config_egon.yaml"
with open(pes_egonconfig, "r") as stream:
data_config = yaml.safe_load(stream)
target_file = (
pypsa_eur_sec_repos
/ "results"
/ data_config["run"]
/ "csvs"
/ sources["eGon100RE"]["capacities"]
)
else:
target_file = (
cwd
/ "data_bundle_egon_data"
/ "pypsa_eur_sec"
/ "2022-07-26-egondata-integration"
/ "csvs"
/ sources["eGon100RE"]["capacities"]
)
df = pd.read_csv(target_file, skiprows=5)
df.columns = ["component", "country", "carrier", "p_nom"]
df.set_index("carrier", inplace=True)
df = df[df.country.str[:2] == "DE"]
# Drop country column
df.drop("country", axis=1, inplace=True)
# Drop copmponents which will be optimized in eGo
unused_carrier = [
"BEV_charger",
"DAC",
"H2 Electrolysis",
"electricity distribution grid",
"home battery charger",
"home battery discharger",
"H2",
"Li_ion",
"home battery",
"residential rural water tanks charger",
"residential rural water tanks discharger",
"services rural water tanks charger",
"services rural water tanks discharger",
"residential rural water tanks",
"services rural water tanks",
"urban central water tanks",
"urban central water tanks charger",
"urban central water tanks discharger",
"H2 Fuel Cell",
]
df = df[~df.index.isin(unused_carrier)]
df.index = df.index.str.replace(" ", "_")
# Aggregate offshore wind
df = df.append(
pd.DataFrame(
index=["wind_offshore"],
data={
"p_nom": (df.p_nom["offwind-ac"] + df.p_nom["offwind-dc"]),
"component": df.component["offwind-ac"],
},
)
)
df = df.drop(["offwind-ac", "offwind-dc"])
# Aggregate technologies with and without carbon_capture (CC)
for carrier in ["SMR", "urban_central_gas_CHP"]:
df.p_nom[carrier] += df.p_nom[f"{carrier}_CC"]
df = df.drop([f"{carrier}_CC"])
# Aggregate residential and services rural heat supply
for merge_carrier in [
"rural_resistive_heater",
"rural_ground_heat_pump",
"rural_gas_boiler",
"rural_solar_thermal",
]:
if f"residential_{merge_carrier}" in df.index:
df = df.append(
pd.DataFrame(
index=[merge_carrier],
data={
"p_nom": (
df.p_nom[f"residential_{merge_carrier}"]
+ df.p_nom[f"services_{merge_carrier}"]
),
"component": df.component[
f"residential_{merge_carrier}"
],
},
)
)
df = df.drop(
[f"residential_{merge_carrier}", f"services_{merge_carrier}"]
)
# Rename carriers
df.rename(
{
"onwind": "wind_onshore",
"ror": "run_of_river",
"PHS": "pumped_hydro",
"OCGT": "gas",
"rural_ground_heat_pump": "residential_rural_heat_pump",
"urban_central_air_heat_pump": "urban_central_heat_pump",
"urban_central_solar_thermal": (
"urban_central_solar_thermal_collector"
),
},
inplace=True,
)
# Reset index
df = df.reset_index()
# Rename columns
df.rename(
{"p_nom": "capacity", "index": "carrier"}, axis="columns", inplace=True
)
df["scenario_name"] = "eGon100RE"
df["nuts"] = "DE"
db.execute_sql(
f"""
DELETE FROM
{targets['scenario_capacities']['schema']}.{targets['scenario_capacities']['table']}
WHERE scenario_name='eGon100RE'
"""
)
df.to_sql(
targets["scenario_capacities"]["table"],
schema=targets["scenario_capacities"]["schema"],
con=db.engine(),
if_exists="append",
index=False,
)