"""
This module does sanity checks for both the eGon2035 and the eGon100RE scenario
separately where a percentage error is given to showcase difference in output
and input values. Please note that there are missing input technologies in the
supply tables.
Authors: @ALonso, @dana, @nailend, @nesnoj, @khelfen
"""
from math import isclose
from pathlib import Path
import ast
from sqlalchemy import Numeric
from sqlalchemy.sql import and_, cast, func, or_
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import seaborn as sns
from egon.data import config, db, logger
from egon.data.datasets import Dataset
from egon.data.datasets.electricity_demand_timeseries.cts_buildings import (
EgonCtsElectricityDemandBuildingShare,
EgonCtsHeatDemandBuildingShare,
)
from egon.data.datasets.emobility.motorized_individual_travel.db_classes import ( # noqa: E501
EgonEvCountMunicipality,
EgonEvCountMvGridDistrict,
EgonEvCountRegistrationDistrict,
EgonEvMvGridDistrict,
EgonEvPool,
EgonEvTrip,
)
from egon.data.datasets.emobility.motorized_individual_travel.helpers import (
DATASET_CFG,
read_simbev_metadata_file,
)
from egon.data.datasets.etrago_setup import (
EgonPfHvLink,
EgonPfHvLinkTimeseries,
EgonPfHvLoad,
EgonPfHvLoadTimeseries,
EgonPfHvStore,
EgonPfHvStoreTimeseries,
)
from egon.data.datasets.gas_grid import (
define_gas_buses_abroad,
define_gas_nodes_list,
define_gas_pipeline_list,
)
from egon.data.datasets.gas_neighbours.eGon2035 import (
calc_capacities,
calc_ch4_storage_capacities,
calc_global_ch4_demand,
calc_global_power_to_h2_demand,
calculate_ch4_grid_capacities,
import_ch4_demandTS,
)
from egon.data.datasets.hydrogen_etrago.storage import (
calculate_and_map_saltcavern_storage_potential,
)
from egon.data.datasets.power_plants.pv_rooftop_buildings import (
PV_CAP_PER_SQ_M,
ROOF_FACTOR,
SCENARIOS,
load_building_data,
scenario_data,
)
from egon.data.datasets.pypsaeursec import read_network
from egon.data.datasets.scenario_parameters import get_sector_parameters
from egon.data.datasets.storages.home_batteries import get_cbat_pbat_ratio
import egon.data
TESTMODE_OFF = (
config.settings()["egon-data"]["--dataset-boundary"] == "Everything"
)
[docs]class SanityChecks(Dataset):
#:
name: str = "SanityChecks"
#:
version: str = "0.0.8"
def __init__(self, dependencies):
super().__init__(
name=self.name,
version=self.version,
dependencies=dependencies,
tasks={
etrago_eGon2035_electricity,
etrago_eGon2035_heat,
residential_electricity_annual_sum,
residential_electricity_hh_refinement,
cts_electricity_demand_share,
cts_heat_demand_share,
sanitycheck_emobility_mit,
sanitycheck_pv_rooftop_buildings,
sanitycheck_home_batteries,
etrago_eGon2035_gas_DE,
etrago_eGon2035_gas_abroad,
sanitycheck_dsm,
},
)
[docs]def etrago_eGon2035_electricity():
"""Execute basic sanity checks.
Returns print statements as sanity checks for the electricity sector in
the eGon2035 scenario.
Parameters
----------
None
Returns
-------
None
"""
scn = "eGon2035"
# Section to check generator capacities
logger.info(f"Sanity checks for scenario {scn}")
logger.info(
"For German electricity generators the following deviations between "
"the inputs and outputs can be observed:"
)
carriers_electricity = [
"others",
"reservoir",
"run_of_river",
"oil",
"wind_onshore",
"wind_offshore",
"solar",
"solar_rooftop",
"biomass",
]
for carrier in carriers_electricity:
if carrier == "biomass":
sum_output = db.select_dataframe(
"""SELECT scn_name, SUM(p_nom::numeric) as output_capacity_mw
FROM grid.egon_etrago_generator
WHERE bus IN (
SELECT bus_id FROM grid.egon_etrago_bus
WHERE scn_name = 'eGon2035'
AND country = 'DE')
AND carrier IN ('biomass', 'industrial_biomass_CHP',
'central_biomass_CHP')
GROUP BY (scn_name);
""",
warning=False,
)
else:
sum_output = db.select_dataframe(
f"""SELECT scn_name,
SUM(p_nom::numeric) as output_capacity_mw
FROM grid.egon_etrago_generator
WHERE scn_name = '{scn}'
AND carrier IN ('{carrier}')
AND bus IN
(SELECT bus_id
FROM grid.egon_etrago_bus
WHERE scn_name = 'eGon2035'
AND country = 'DE')
GROUP BY (scn_name);
""",
warning=False,
)
sum_input = db.select_dataframe(
f"""SELECT carrier, SUM(capacity::numeric) as input_capacity_mw
FROM supply.egon_scenario_capacities
WHERE carrier= '{carrier}'
AND scenario_name ='{scn}'
GROUP BY (carrier);
""",
warning=False,
)
if (
sum_output.output_capacity_mw.sum() == 0
and sum_input.input_capacity_mw.sum() == 0
):
logger.info(
f"No capacity for carrier '{carrier}' needed to be"
f" distributed. Everything is fine"
)
elif (
sum_input.input_capacity_mw.sum() > 0
and sum_output.output_capacity_mw.sum() == 0
):
logger.info(
f"Error: Capacity for carrier '{carrier}' was not distributed "
f"at all!"
)
elif (
sum_output.output_capacity_mw.sum() > 0
and sum_input.input_capacity_mw.sum() == 0
):
logger.info(
f"Error: Eventhough no input capacity was provided for carrier"
f"'{carrier}' a capacity got distributed!"
)
else:
sum_input["error"] = (
(sum_output.output_capacity_mw - sum_input.input_capacity_mw)
/ sum_input.input_capacity_mw
) * 100
g = sum_input["error"].values[0]
logger.info(f"{carrier}: " + str(round(g, 2)) + " %")
# Section to check storage units
logger.info(f"Sanity checks for scenario {scn}")
logger.info(
"For German electrical storage units the following deviations between"
"the inputs and outputs can be observed:"
)
carriers_electricity = ["pumped_hydro"]
for carrier in carriers_electricity:
sum_output = db.select_dataframe(
f"""SELECT scn_name, SUM(p_nom::numeric) as output_capacity_mw
FROM grid.egon_etrago_storage
WHERE scn_name = '{scn}'
AND carrier IN ('{carrier}')
AND bus IN
(SELECT bus_id
FROM grid.egon_etrago_bus
WHERE scn_name = 'eGon2035'
AND country = 'DE')
GROUP BY (scn_name);
""",
warning=False,
)
sum_input = db.select_dataframe(
f"""SELECT carrier, SUM(capacity::numeric) as input_capacity_mw
FROM supply.egon_scenario_capacities
WHERE carrier= '{carrier}'
AND scenario_name ='{scn}'
GROUP BY (carrier);
""",
warning=False,
)
if (
sum_output.output_capacity_mw.sum() == 0
and sum_input.input_capacity_mw.sum() == 0
):
print(
f"No capacity for carrier '{carrier}' needed to be "
f"distributed. Everything is fine"
)
elif (
sum_input.input_capacity_mw.sum() > 0
and sum_output.output_capacity_mw.sum() == 0
):
print(
f"Error: Capacity for carrier '{carrier}' was not distributed"
f" at all!"
)
elif (
sum_output.output_capacity_mw.sum() > 0
and sum_input.input_capacity_mw.sum() == 0
):
print(
f"Error: Eventhough no input capacity was provided for carrier"
f" '{carrier}' a capacity got distributed!"
)
else:
sum_input["error"] = (
(sum_output.output_capacity_mw - sum_input.input_capacity_mw)
/ sum_input.input_capacity_mw
) * 100
g = sum_input["error"].values[0]
print(f"{carrier}: " + str(round(g, 2)) + " %")
# Section to check loads
print(
"For German electricity loads the following deviations between the"
" input and output can be observed:"
)
output_demand = db.select_dataframe(
"""SELECT a.scn_name, a.carrier, SUM((SELECT SUM(p)
FROM UNNEST(b.p_set) p))/1000000::numeric as load_twh
FROM grid.egon_etrago_load a
JOIN grid.egon_etrago_load_timeseries b
ON (a.load_id = b.load_id)
JOIN grid.egon_etrago_bus c
ON (a.bus=c.bus_id)
AND b.scn_name = 'eGon2035'
AND a.scn_name = 'eGon2035'
AND a.carrier = 'AC'
AND c.scn_name= 'eGon2035'
AND c.country='DE'
GROUP BY (a.scn_name, a.carrier);
""",
warning=False,
)["load_twh"].values[0]
input_cts_ind = db.select_dataframe(
"""SELECT scenario,
SUM(demand::numeric/1000000) as demand_mw_regio_cts_ind
FROM demand.egon_demandregio_cts_ind
WHERE scenario= 'eGon2035'
AND year IN ('2035')
GROUP BY (scenario);
""",
warning=False,
)["demand_mw_regio_cts_ind"].values[0]
input_hh = db.select_dataframe(
"""SELECT scenario, SUM(demand::numeric/1000000) as demand_mw_regio_hh
FROM demand.egon_demandregio_hh
WHERE scenario= 'eGon2035'
AND year IN ('2035')
GROUP BY (scenario);
""",
warning=False,
)["demand_mw_regio_hh"].values[0]
input_demand = input_hh + input_cts_ind
e = round((output_demand - input_demand) / input_demand, 2) * 100
print(f"electricity demand: {e} %")
[docs]def etrago_eGon2035_heat():
"""Execute basic sanity checks.
Returns print statements as sanity checks for the heat sector in
the eGon2035 scenario.
Parameters
----------
None
Returns
-------
None
"""
# Check input and output values for the carriers "others",
# "reservoir", "run_of_river" and "oil"
scn = "eGon2035"
# Section to check generator capacities
print(f"Sanity checks for scenario {scn}")
print(
"For German heat demands the following deviations between the inputs"
" and outputs can be observed:"
)
# Sanity checks for heat demand
output_heat_demand = db.select_dataframe(
"""SELECT a.scn_name,
(SUM(
(SELECT SUM(p) FROM UNNEST(b.p_set) p))/1000000)::numeric as load_twh
FROM grid.egon_etrago_load a
JOIN grid.egon_etrago_load_timeseries b
ON (a.load_id = b.load_id)
JOIN grid.egon_etrago_bus c
ON (a.bus=c.bus_id)
AND b.scn_name = 'eGon2035'
AND a.scn_name = 'eGon2035'
AND c.scn_name= 'eGon2035'
AND c.country='DE'
AND a.carrier IN ('rural_heat', 'central_heat')
GROUP BY (a.scn_name);
""",
warning=False,
)["load_twh"].values[0]
input_heat_demand = db.select_dataframe(
"""SELECT scenario, SUM(demand::numeric/1000000) as demand_mw_peta_heat
FROM demand.egon_peta_heat
WHERE scenario= 'eGon2035'
GROUP BY (scenario);
""",
warning=False,
)["demand_mw_peta_heat"].values[0]
e_demand = (
round((output_heat_demand - input_heat_demand) / input_heat_demand, 2)
* 100
)
logger.info(f"heat demand: {e_demand} %")
# Sanity checks for heat supply
logger.info(
"For German heat supplies the following deviations between the inputs "
"and outputs can be observed:"
)
# Comparison for central heat pumps
heat_pump_input = db.select_dataframe(
"""SELECT carrier, SUM(capacity::numeric) as Urban_central_heat_pump_mw
FROM supply.egon_scenario_capacities
WHERE carrier= 'urban_central_heat_pump'
AND scenario_name IN ('eGon2035')
GROUP BY (carrier);
""",
warning=False,
)["urban_central_heat_pump_mw"].values[0]
heat_pump_output = db.select_dataframe(
"""SELECT carrier, SUM(p_nom::numeric) as Central_heat_pump_mw
FROM grid.egon_etrago_link
WHERE carrier= 'central_heat_pump'
AND scn_name IN ('eGon2035')
GROUP BY (carrier);
""",
warning=False,
)["central_heat_pump_mw"].values[0]
e_heat_pump = (
round((heat_pump_output - heat_pump_input) / heat_pump_output, 2) * 100
)
logger.info(f"'central_heat_pump': {e_heat_pump} % ")
# Comparison for residential heat pumps
input_residential_heat_pump = db.select_dataframe(
"""SELECT carrier, SUM(capacity::numeric) as residential_heat_pump_mw
FROM supply.egon_scenario_capacities
WHERE carrier= 'residential_rural_heat_pump'
AND scenario_name IN ('eGon2035')
GROUP BY (carrier);
""",
warning=False,
)["residential_heat_pump_mw"].values[0]
output_residential_heat_pump = db.select_dataframe(
"""SELECT carrier, SUM(p_nom::numeric) as rural_heat_pump_mw
FROM grid.egon_etrago_link
WHERE carrier= 'rural_heat_pump'
AND scn_name IN ('eGon2035')
GROUP BY (carrier);
""",
warning=False,
)["rural_heat_pump_mw"].values[0]
e_residential_heat_pump = (
round(
(output_residential_heat_pump - input_residential_heat_pump)
/ input_residential_heat_pump,
2,
)
* 100
)
logger.info(f"'residential heat pumps': {e_residential_heat_pump} %")
# Comparison for resistive heater
resistive_heater_input = db.select_dataframe(
"""SELECT carrier,
SUM(capacity::numeric) as Urban_central_resistive_heater_MW
FROM supply.egon_scenario_capacities
WHERE carrier= 'urban_central_resistive_heater'
AND scenario_name IN ('eGon2035')
GROUP BY (carrier);
""",
warning=False,
)["urban_central_resistive_heater_mw"].values[0]
resistive_heater_output = db.select_dataframe(
"""SELECT carrier, SUM(p_nom::numeric) as central_resistive_heater_MW
FROM grid.egon_etrago_link
WHERE carrier= 'central_resistive_heater'
AND scn_name IN ('eGon2035')
GROUP BY (carrier);
""",
warning=False,
)["central_resistive_heater_mw"].values[0]
e_resistive_heater = (
round(
(resistive_heater_output - resistive_heater_input)
/ resistive_heater_input,
2,
)
* 100
)
logger.info(f"'resistive heater': {e_resistive_heater} %")
# Comparison for solar thermal collectors
input_solar_thermal = db.select_dataframe(
"""SELECT carrier, SUM(capacity::numeric) as solar_thermal_collector_mw
FROM supply.egon_scenario_capacities
WHERE carrier= 'urban_central_solar_thermal_collector'
AND scenario_name IN ('eGon2035')
GROUP BY (carrier);
""",
warning=False,
)["solar_thermal_collector_mw"].values[0]
output_solar_thermal = db.select_dataframe(
"""SELECT carrier, SUM(p_nom::numeric) as solar_thermal_collector_mw
FROM grid.egon_etrago_generator
WHERE carrier= 'solar_thermal_collector'
AND scn_name IN ('eGon2035')
GROUP BY (carrier);
""",
warning=False,
)["solar_thermal_collector_mw"].values[0]
e_solar_thermal = (
round(
(output_solar_thermal - input_solar_thermal) / input_solar_thermal,
2,
)
* 100
)
logger.info(f"'solar thermal collector': {e_solar_thermal} %")
# Comparison for geothermal
input_geo_thermal = db.select_dataframe(
"""SELECT carrier,
SUM(capacity::numeric) as Urban_central_geo_thermal_MW
FROM supply.egon_scenario_capacities
WHERE carrier= 'urban_central_geo_thermal'
AND scenario_name IN ('eGon2035')
GROUP BY (carrier);
""",
warning=False,
)["urban_central_geo_thermal_mw"].values[0]
output_geo_thermal = db.select_dataframe(
"""SELECT carrier, SUM(p_nom::numeric) as geo_thermal_MW
FROM grid.egon_etrago_generator
WHERE carrier= 'geo_thermal'
AND scn_name IN ('eGon2035')
GROUP BY (carrier);
""",
warning=False,
)["geo_thermal_mw"].values[0]
e_geo_thermal = (
round((output_geo_thermal - input_geo_thermal) / input_geo_thermal, 2)
* 100
)
logger.info(f"'geothermal': {e_geo_thermal} %")
[docs]def residential_electricity_annual_sum(rtol=1e-5):
"""Sanity check for dataset electricity_demand_timeseries :
Demand_Building_Assignment
Aggregate the annual demand of all census cells at NUTS3 to compare
with initial scaling parameters from DemandRegio.
"""
df_nuts3_annual_sum = db.select_dataframe(
sql="""
SELECT dr.nuts3, dr.scenario, dr.demand_regio_sum, profiles.profile_sum
FROM (
SELECT scenario, SUM(demand) AS profile_sum, vg250_nuts3
FROM demand.egon_demandregio_zensus_electricity AS egon,
boundaries.egon_map_zensus_vg250 AS boundaries
Where egon.zensus_population_id = boundaries.zensus_population_id
AND sector = 'residential'
GROUP BY vg250_nuts3, scenario
) AS profiles
JOIN (
SELECT nuts3, scenario, sum(demand) AS demand_regio_sum
FROM demand.egon_demandregio_hh
GROUP BY year, scenario, nuts3
) AS dr
ON profiles.vg250_nuts3 = dr.nuts3 and profiles.scenario = dr.scenario
"""
)
np.testing.assert_allclose(
actual=df_nuts3_annual_sum["profile_sum"],
desired=df_nuts3_annual_sum["demand_regio_sum"],
rtol=rtol,
verbose=False,
)
logger.info(
"Aggregated annual residential electricity demand"
" matches with DemandRegio at NUTS-3."
)
[docs]def residential_electricity_hh_refinement(rtol=1e-5):
"""Sanity check for dataset electricity_demand_timeseries :
Household Demands
Check sum of aggregated household types after refinement method
was applied and compare it to the original census values."""
df_refinement = db.select_dataframe(
sql="""
SELECT refined.nuts3, refined.characteristics_code,
refined.sum_refined::int, census.sum_census::int
FROM(
SELECT nuts3, characteristics_code, SUM(hh_10types) as sum_refined
FROM society.egon_destatis_zensus_household_per_ha_refined
GROUP BY nuts3, characteristics_code)
AS refined
JOIN(
SELECT t.nuts3, t.characteristics_code, sum(orig) as sum_census
FROM(
SELECT nuts3, cell_id, characteristics_code,
sum(DISTINCT(hh_5types))as orig
FROM society.egon_destatis_zensus_household_per_ha_refined
GROUP BY cell_id, characteristics_code, nuts3) AS t
GROUP BY t.nuts3, t.characteristics_code ) AS census
ON refined.nuts3 = census.nuts3
AND refined.characteristics_code = census.characteristics_code
"""
)
np.testing.assert_allclose(
actual=df_refinement["sum_refined"],
desired=df_refinement["sum_census"],
rtol=rtol,
verbose=False,
)
logger.info("All Aggregated household types match at NUTS-3.")
[docs]def cts_electricity_demand_share(rtol=1e-5):
"""Sanity check for dataset electricity_demand_timeseries :
CtsBuildings
Check sum of aggregated cts electricity demand share which equals to one
for every substation as the substation profile is linearly disaggregated
to all buildings."""
with db.session_scope() as session:
cells_query = session.query(EgonCtsElectricityDemandBuildingShare)
df_demand_share = pd.read_sql(
cells_query.statement, cells_query.session.bind, index_col=None
)
np.testing.assert_allclose(
actual=df_demand_share.groupby(["bus_id", "scenario"])[
"profile_share"
].sum(),
desired=1,
rtol=rtol,
verbose=False,
)
logger.info("The aggregated demand shares equal to one!.")
[docs]def cts_heat_demand_share(rtol=1e-5):
"""Sanity check for dataset electricity_demand_timeseries
: CtsBuildings
Check sum of aggregated cts heat demand share which equals to one
for every substation as the substation profile is linearly disaggregated
to all buildings."""
with db.session_scope() as session:
cells_query = session.query(EgonCtsHeatDemandBuildingShare)
df_demand_share = pd.read_sql(
cells_query.statement, cells_query.session.bind, index_col=None
)
np.testing.assert_allclose(
actual=df_demand_share.groupby(["bus_id", "scenario"])[
"profile_share"
].sum(),
desired=1,
rtol=rtol,
verbose=False,
)
logger.info("The aggregated demand shares equal to one!.")
[docs]def sanitycheck_pv_rooftop_buildings():
def egon_power_plants_pv_roof_building():
sql = """
SELECT *
FROM supply.egon_power_plants_pv_roof_building
"""
return db.select_dataframe(sql, index_col="index")
pv_roof_df = egon_power_plants_pv_roof_building()
valid_buildings_gdf = load_building_data()
valid_buildings_gdf = valid_buildings_gdf.assign(
bus_id=valid_buildings_gdf.bus_id.astype(int),
overlay_id=valid_buildings_gdf.overlay_id.astype(int),
max_cap=valid_buildings_gdf.building_area.multiply(
ROOF_FACTOR * PV_CAP_PER_SQ_M
),
)
merge_df = pv_roof_df.merge(
valid_buildings_gdf[["building_area"]],
how="left",
left_on="building_id",
right_index=True,
)
assert (
len(merge_df.loc[merge_df.building_area.isna()]) == 0
), f"{len(merge_df.loc[merge_df.building_area.isna()])} != 0"
scenarios = ["status_quo", "eGon2035"]
base_path = Path(egon.data.__path__[0]).resolve()
res_dir = base_path / "sanity_checks"
res_dir.mkdir(parents=True, exist_ok=True)
for scenario in scenarios:
fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(15, 8))
scenario_df = merge_df.loc[merge_df.scenario == scenario]
logger.info(
scenario + " Capacity:\n" + str(scenario_df.capacity.describe())
)
small_gens_df = scenario_df.loc[scenario_df.capacity < 100]
sns.histplot(data=small_gens_df, x="capacity", ax=ax1).set_title(
scenario
)
sns.scatterplot(
data=small_gens_df, x="capacity", y="building_area", ax=ax2
).set_title(scenario)
plt.tight_layout()
plt.savefig(
res_dir / f"{scenario}_pv_rooftop_distribution.png",
bbox_inches="tight",
)
for scenario in SCENARIOS:
if scenario == "eGon2035":
assert isclose(
scenario_data(scenario=scenario).capacity.sum(),
merge_df.loc[merge_df.scenario == scenario].capacity.sum(),
rel_tol=1e-02,
), (
f"{scenario_data(scenario=scenario).capacity.sum()} != "
f"{merge_df.loc[merge_df.scenario == scenario].capacity.sum()}"
)
elif scenario == "eGon100RE":
sources = config.datasets()["solar_rooftop"]["sources"]
target = db.select_dataframe(
f"""
SELECT capacity
FROM {sources['scenario_capacities']['schema']}.
{sources['scenario_capacities']['table']} a
WHERE carrier = 'solar_rooftop'
AND scenario_name = '{scenario}'
"""
).capacity[0]
dataset = config.settings()["egon-data"]["--dataset-boundary"]
if dataset == "Schleswig-Holstein":
sources = config.datasets()["scenario_input"]["sources"]
path = Path(
f"./data_bundle_egon_data/nep2035_version2021/"
f"{sources['eGon2035']['capacities']}"
).resolve()
total_2035 = (
pd.read_excel(
path,
sheet_name="1.Entwurf_NEP2035_V2021",
index_col="Unnamed: 0",
).at["PV (Aufdach)", "Summe"]
* 1000
)
sh_2035 = scenario_data(scenario="eGon2035").capacity.sum()
share = sh_2035 / total_2035
target *= share
assert isclose(
target,
merge_df.loc[merge_df.scenario == scenario].capacity.sum(),
rel_tol=1e-02,
), (
f"{target} != "
f"{merge_df.loc[merge_df.scenario == scenario].capacity.sum()}"
)
else:
raise ValueError(f"Scenario {scenario} is not valid.")
[docs]def sanitycheck_emobility_mit():
"""Execute sanity checks for eMobility: motorized individual travel
Checks data integrity for eGon2035, eGon2035_lowflex and eGon100RE scenario
using assertions:
1. Allocated EV numbers and EVs allocated to grid districts
2. Trip data (original inout data from simBEV)
3. Model data in eTraGo PF tables (grid.egon_etrago_*)
Parameters
----------
None
Returns
-------
None
"""
def check_ev_allocation():
# Get target number for scenario
ev_count_target = scenario_variation_parameters["ev_count"]
print(f" Target count: {str(ev_count_target)}")
# Get allocated numbers
ev_counts_dict = {}
with db.session_scope() as session:
for table, level in zip(
[
EgonEvCountMvGridDistrict,
EgonEvCountMunicipality,
EgonEvCountRegistrationDistrict,
],
["Grid District", "Municipality", "Registration District"],
):
query = session.query(
func.sum(
table.bev_mini
+ table.bev_medium
+ table.bev_luxury
+ table.phev_mini
+ table.phev_medium
+ table.phev_luxury
).label("ev_count")
).filter(
table.scenario == scenario_name,
table.scenario_variation == scenario_var_name,
)
ev_counts = pd.read_sql(
query.statement, query.session.bind, index_col=None
)
ev_counts_dict[level] = ev_counts.iloc[0].ev_count
print(
f" Count table: Total count for level {level} "
f"(table: {table.__table__}): "
f"{str(ev_counts_dict[level])}"
)
# Compare with scenario target (only if not in testmode)
if TESTMODE_OFF:
for level, count in ev_counts_dict.items():
np.testing.assert_allclose(
count,
ev_count_target,
rtol=0.0001,
err_msg=f"EV numbers in {level} seems to be flawed.",
)
else:
print(" Testmode is on, skipping sanity check...")
# Get allocated EVs in grid districts
with db.session_scope() as session:
query = session.query(
func.count(EgonEvMvGridDistrict.egon_ev_pool_ev_id).label(
"ev_count"
),
).filter(
EgonEvMvGridDistrict.scenario == scenario_name,
EgonEvMvGridDistrict.scenario_variation == scenario_var_name,
)
ev_count_alloc = (
pd.read_sql(query.statement, query.session.bind, index_col=None)
.iloc[0]
.ev_count
)
print(
f" EVs allocated to Grid Districts "
f"(table: {EgonEvMvGridDistrict.__table__}) total count: "
f"{str(ev_count_alloc)}"
)
# Compare with scenario target (only if not in testmode)
if TESTMODE_OFF:
np.testing.assert_allclose(
ev_count_alloc,
ev_count_target,
rtol=0.0001,
err_msg=(
"EV numbers allocated to Grid Districts seems to be "
"flawed."
),
)
else:
print(" Testmode is on, skipping sanity check...")
return ev_count_alloc
def check_trip_data():
# Check if trips start at timestep 0 and have a max. of 35040 steps
# (8760h in 15min steps)
print(" Checking timeranges...")
with db.session_scope() as session:
query = session.query(
func.count(EgonEvTrip.event_id).label("cnt")
).filter(
or_(
and_(
EgonEvTrip.park_start > 0,
EgonEvTrip.simbev_event_id == 0,
),
EgonEvTrip.park_end
> (60 / int(meta_run_config.stepsize)) * 8760,
),
EgonEvTrip.scenario == scenario_name,
)
invalid_trips = pd.read_sql(
query.statement, query.session.bind, index_col=None
)
np.testing.assert_equal(
invalid_trips.iloc[0].cnt,
0,
err_msg=(
f"{str(invalid_trips.iloc[0].cnt)} trips in table "
f"{EgonEvTrip.__table__} have invalid timesteps."
),
)
# Check if charging demand can be covered by available charging energy
# while parking
print(" Compare charging demand with available power...")
with db.session_scope() as session:
query = session.query(
func.count(EgonEvTrip.event_id).label("cnt")
).filter(
func.round(
cast(
(EgonEvTrip.park_end - EgonEvTrip.park_start + 1)
* EgonEvTrip.charging_capacity_nominal
* (int(meta_run_config.stepsize) / 60),
Numeric,
),
3,
)
< cast(EgonEvTrip.charging_demand, Numeric),
EgonEvTrip.scenario == scenario_name,
)
invalid_trips = pd.read_sql(
query.statement, query.session.bind, index_col=None
)
np.testing.assert_equal(
invalid_trips.iloc[0].cnt,
0,
err_msg=(
f"In {str(invalid_trips.iloc[0].cnt)} trips (table: "
f"{EgonEvTrip.__table__}) the charging demand cannot be "
f"covered by available charging power."
),
)
def check_model_data():
# Check if model components were fully created
print(" Check if all model components were created...")
# Get MVGDs which got EV allocated
with db.session_scope() as session:
query = (
session.query(
EgonEvMvGridDistrict.bus_id,
)
.filter(
EgonEvMvGridDistrict.scenario == scenario_name,
EgonEvMvGridDistrict.scenario_variation
== scenario_var_name,
)
.group_by(EgonEvMvGridDistrict.bus_id)
)
mvgds_with_ev = (
pd.read_sql(query.statement, query.session.bind, index_col=None)
.bus_id.sort_values()
.to_list()
)
# Load model components
with db.session_scope() as session:
query = (
session.query(
EgonPfHvLink.bus0.label("mvgd_bus_id"),
EgonPfHvLoad.bus.label("emob_bus_id"),
EgonPfHvLoad.load_id.label("load_id"),
EgonPfHvStore.store_id.label("store_id"),
)
.select_from(EgonPfHvLoad, EgonPfHvStore)
.join(
EgonPfHvLoadTimeseries,
EgonPfHvLoadTimeseries.load_id == EgonPfHvLoad.load_id,
)
.join(
EgonPfHvStoreTimeseries,
EgonPfHvStoreTimeseries.store_id == EgonPfHvStore.store_id,
)
.filter(
EgonPfHvLoad.carrier == "land_transport_EV",
EgonPfHvLoad.scn_name == scenario_name,
EgonPfHvLoadTimeseries.scn_name == scenario_name,
EgonPfHvStore.carrier == "battery_storage",
EgonPfHvStore.scn_name == scenario_name,
EgonPfHvStoreTimeseries.scn_name == scenario_name,
EgonPfHvLink.scn_name == scenario_name,
EgonPfHvLink.bus1 == EgonPfHvLoad.bus,
EgonPfHvLink.bus1 == EgonPfHvStore.bus,
)
)
model_components = pd.read_sql(
query.statement, query.session.bind, index_col=None
)
# Check number of buses with model components connected
mvgd_buses_with_ev = model_components.loc[
model_components.mvgd_bus_id.isin(mvgds_with_ev)
]
np.testing.assert_equal(
len(mvgds_with_ev),
len(mvgd_buses_with_ev),
err_msg=(
f"Number of Grid Districts with connected model components "
f"({str(len(mvgd_buses_with_ev))} in tables egon_etrago_*) "
f"differ from number of Grid Districts that got EVs "
f"allocated ({len(mvgds_with_ev)} in table "
f"{EgonEvMvGridDistrict.__table__})."
),
)
# Check if all required components exist (if no id is NaN)
np.testing.assert_equal(
model_components.drop_duplicates().isna().any().any(),
False,
err_msg=(
f"Some components are missing (see True values): "
f"{model_components.drop_duplicates().isna().any()}"
),
)
# Get all model timeseries
print(" Loading model timeseries...")
# Get all model timeseries
model_ts_dict = {
"Load": {
"carrier": "land_transport_EV",
"table": EgonPfHvLoad,
"table_ts": EgonPfHvLoadTimeseries,
"column_id": "load_id",
"columns_ts": ["p_set"],
"ts": None,
},
"Link": {
"carrier": "BEV_charger",
"table": EgonPfHvLink,
"table_ts": EgonPfHvLinkTimeseries,
"column_id": "link_id",
"columns_ts": ["p_max_pu"],
"ts": None,
},
"Store": {
"carrier": "battery_storage",
"table": EgonPfHvStore,
"table_ts": EgonPfHvStoreTimeseries,
"column_id": "store_id",
"columns_ts": ["e_min_pu", "e_max_pu"],
"ts": None,
},
}
with db.session_scope() as session:
for node, attrs in model_ts_dict.items():
print(f" Loading {node} timeseries...")
subquery = (
session.query(getattr(attrs["table"], attrs["column_id"]))
.filter(attrs["table"].carrier == attrs["carrier"])
.filter(attrs["table"].scn_name == scenario_name)
.subquery()
)
cols = [
getattr(attrs["table_ts"], c) for c in attrs["columns_ts"]
]
query = session.query(
getattr(attrs["table_ts"], attrs["column_id"]), *cols
).filter(
getattr(attrs["table_ts"], attrs["column_id"]).in_(
subquery
),
attrs["table_ts"].scn_name == scenario_name,
)
attrs["ts"] = pd.read_sql(
query.statement,
query.session.bind,
index_col=attrs["column_id"],
)
# Check if all timeseries have 8760 steps
print(" Checking timeranges...")
for node, attrs in model_ts_dict.items():
for col in attrs["columns_ts"]:
ts = attrs["ts"]
invalid_ts = ts.loc[ts[col].apply(lambda _: len(_)) != 8760][
col
].apply(len)
np.testing.assert_equal(
len(invalid_ts),
0,
err_msg=(
f"{str(len(invalid_ts))} rows in timeseries do not "
f"have 8760 timesteps. Table: "
f"{attrs['table_ts'].__table__}, Column: {col}, IDs: "
f"{str(list(invalid_ts.index))}"
),
)
# Compare total energy demand in model with some approximate values
# (per EV: 14,000 km/a, 0.17 kWh/km)
print(" Checking energy demand in model...")
total_energy_model = (
model_ts_dict["Load"]["ts"].p_set.apply(lambda _: sum(_)).sum()
/ 1e6
)
print(f" Total energy amount in model: {total_energy_model} TWh")
total_energy_scenario_approx = ev_count_alloc * 14000 * 0.17 / 1e9
print(
f" Total approximated energy amount in scenario: "
f"{total_energy_scenario_approx} TWh"
)
np.testing.assert_allclose(
total_energy_model,
total_energy_scenario_approx,
rtol=0.1,
err_msg=(
"The total energy amount in the model deviates heavily "
"from the approximated value for current scenario."
),
)
# Compare total storage capacity
print(" Checking storage capacity...")
# Load storage capacities from model
with db.session_scope() as session:
query = session.query(
func.sum(EgonPfHvStore.e_nom).label("e_nom")
).filter(
EgonPfHvStore.scn_name == scenario_name,
EgonPfHvStore.carrier == "battery_storage",
)
storage_capacity_model = (
pd.read_sql(
query.statement, query.session.bind, index_col=None
).e_nom.sum()
/ 1e3
)
print(
f" Total storage capacity ({EgonPfHvStore.__table__}): "
f"{round(storage_capacity_model, 1)} GWh"
)
# Load occurences of each EV
with db.session_scope() as session:
query = (
session.query(
EgonEvMvGridDistrict.bus_id,
EgonEvPool.type,
func.count(EgonEvMvGridDistrict.egon_ev_pool_ev_id).label(
"count"
),
)
.join(
EgonEvPool,
EgonEvPool.ev_id
== EgonEvMvGridDistrict.egon_ev_pool_ev_id,
)
.filter(
EgonEvMvGridDistrict.scenario == scenario_name,
EgonEvMvGridDistrict.scenario_variation
== scenario_var_name,
EgonEvPool.scenario == scenario_name,
)
.group_by(EgonEvMvGridDistrict.bus_id, EgonEvPool.type)
)
count_per_ev_all = pd.read_sql(
query.statement, query.session.bind, index_col="bus_id"
)
count_per_ev_all["bat_cap"] = count_per_ev_all.type.map(
meta_tech_data.battery_capacity
)
count_per_ev_all["bat_cap_total_MWh"] = (
count_per_ev_all["count"] * count_per_ev_all.bat_cap / 1e3
)
storage_capacity_simbev = count_per_ev_all.bat_cap_total_MWh.div(
1e3
).sum()
print(
f" Total storage capacity (simBEV): "
f"{round(storage_capacity_simbev, 1)} GWh"
)
np.testing.assert_allclose(
storage_capacity_model,
storage_capacity_simbev,
rtol=0.01,
err_msg=(
"The total storage capacity in the model deviates heavily "
"from the input data provided by simBEV for current scenario."
),
)
# Check SoC storage constraint: e_min_pu < e_max_pu for all timesteps
print(" Validating SoC constraints...")
stores_with_invalid_soc = []
for idx, row in model_ts_dict["Store"]["ts"].iterrows():
ts = row[["e_min_pu", "e_max_pu"]]
x = np.array(ts.e_min_pu) > np.array(ts.e_max_pu)
if x.any():
stores_with_invalid_soc.append(idx)
np.testing.assert_equal(
len(stores_with_invalid_soc),
0,
err_msg=(
f"The store constraint e_min_pu < e_max_pu does not apply "
f"for some storages in {EgonPfHvStoreTimeseries.__table__}. "
f"Invalid store_ids: {stores_with_invalid_soc}"
),
)
def check_model_data_lowflex_eGon2035():
# TODO: Add eGon100RE_lowflex
print("")
print("SCENARIO: eGon2035_lowflex")
# Compare driving load and charging load
print(" Loading eGon2035 model timeseries: driving load...")
with db.session_scope() as session:
query = (
session.query(
EgonPfHvLoad.load_id,
EgonPfHvLoadTimeseries.p_set,
)
.join(
EgonPfHvLoadTimeseries,
EgonPfHvLoadTimeseries.load_id == EgonPfHvLoad.load_id,
)
.filter(
EgonPfHvLoad.carrier == "land_transport_EV",
EgonPfHvLoad.scn_name == "eGon2035",
EgonPfHvLoadTimeseries.scn_name == "eGon2035",
)
)
model_driving_load = pd.read_sql(
query.statement, query.session.bind, index_col=None
)
driving_load = np.array(model_driving_load.p_set.to_list()).sum(axis=0)
print(
" Loading eGon2035_lowflex model timeseries: dumb charging "
"load..."
)
with db.session_scope() as session:
query = (
session.query(
EgonPfHvLoad.load_id,
EgonPfHvLoadTimeseries.p_set,
)
.join(
EgonPfHvLoadTimeseries,
EgonPfHvLoadTimeseries.load_id == EgonPfHvLoad.load_id,
)
.filter(
EgonPfHvLoad.carrier == "land_transport_EV",
EgonPfHvLoad.scn_name == "eGon2035_lowflex",
EgonPfHvLoadTimeseries.scn_name == "eGon2035_lowflex",
)
)
model_charging_load_lowflex = pd.read_sql(
query.statement, query.session.bind, index_col=None
)
charging_load = np.array(
model_charging_load_lowflex.p_set.to_list()
).sum(axis=0)
# Ratio of driving and charging load should be 0.9 due to charging
# efficiency
print(" Compare cumulative loads...")
print(f" Driving load (eGon2035): {driving_load.sum() / 1e6} TWh")
print(
f" Dumb charging load (eGon2035_lowflex): "
f"{charging_load.sum() / 1e6} TWh"
)
driving_load_theoretical = (
float(meta_run_config.eta_cp) * charging_load.sum()
)
np.testing.assert_allclose(
driving_load.sum(),
driving_load_theoretical,
rtol=0.01,
err_msg=(
f"The driving load (eGon2035) deviates by more than 1% "
f"from the theoretical driving load calculated from charging "
f"load (eGon2035_lowflex) with an efficiency of "
f"{float(meta_run_config.eta_cp)}."
),
)
print("=====================================================")
print("=== SANITY CHECKS FOR MOTORIZED INDIVIDUAL TRAVEL ===")
print("=====================================================")
for scenario_name in ["eGon2035", "eGon100RE"]:
scenario_var_name = DATASET_CFG["scenario"]["variation"][scenario_name]
print("")
print(f"SCENARIO: {scenario_name}, VARIATION: {scenario_var_name}")
# Load scenario params for scenario and scenario variation
scenario_variation_parameters = get_sector_parameters(
"mobility", scenario=scenario_name
)["motorized_individual_travel"][scenario_var_name]
# Load simBEV run config and tech data
meta_run_config = read_simbev_metadata_file(
scenario_name, "config"
).loc["basic"]
meta_tech_data = read_simbev_metadata_file(scenario_name, "tech_data")
print("")
print("Checking EV counts...")
ev_count_alloc = check_ev_allocation()
print("")
print("Checking trip data...")
check_trip_data()
print("")
print("Checking model data...")
check_model_data()
print("")
check_model_data_lowflex_eGon2035()
print("=====================================================")
[docs]def sanitycheck_home_batteries():
# get constants
constants = config.datasets()["home_batteries"]["constants"]
scenarios = constants["scenarios"]
cbat_pbat_ratio = get_cbat_pbat_ratio()
sources = config.datasets()["home_batteries"]["sources"]
targets = config.datasets()["home_batteries"]["targets"]
for scenario in scenarios:
# get home battery capacity per mv grid id
sql = f"""
SELECT el_capacity as p_nom, bus_id FROM
{sources["storage"]["schema"]}
.{sources["storage"]["table"]}
WHERE carrier = 'home_battery'
AND scenario = '{scenario}'
"""
home_batteries_df = db.select_dataframe(sql, index_col="bus_id")
home_batteries_df = home_batteries_df.assign(
capacity=home_batteries_df.p_nom * cbat_pbat_ratio
)
sql = f"""
SELECT * FROM
{targets["home_batteries"]["schema"]}
.{targets["home_batteries"]["table"]}
WHERE scenario = '{scenario}'
"""
home_batteries_buildings_df = db.select_dataframe(
sql, index_col="index"
)
df = (
home_batteries_buildings_df[["bus_id", "p_nom", "capacity"]]
.groupby("bus_id")
.sum()
)
assert (home_batteries_df.round(6) == df.round(6)).all().all()
[docs]def sanity_check_gas_buses(scn):
"""Execute sanity checks for the gas buses in Germany
Returns print statements as sanity checks for the CH4, H2_grid and
H2_saltcavern buses.
* For all of them, it is checked if they are not isolated.
* For the grid buses, the deviation is calculated between the
number of gas grid buses in the database and the original
Scigrid_gas number of gas buses in Germany.
Parameters
----------
scn_name : str
Name of the scenario
"""
logger.info("BUSES")
# Are gas buses isolated?
corresponding_carriers = {
"eGon2035": {
"CH4": "CH4",
"H2_grid": "H2_feedin",
"H2_saltcavern": "power_to_H2",
},
# "eGon100RE": {
# "CH4": "CH4",
# "H2_grid": "H2_retrofit",
# "H2_saltcavern": "H2_extension",
# }
}
for key in corresponding_carriers[scn]:
isolated_gas_buses = db.select_dataframe(
f"""
SELECT bus_id, carrier, country
FROM grid.egon_etrago_bus
WHERE scn_name = '{scn}'
AND carrier = '{key}'
AND country = 'DE'
AND bus_id NOT IN
(SELECT bus0
FROM grid.egon_etrago_link
WHERE scn_name = '{scn}'
AND carrier = '{corresponding_carriers[scn][key]}')
AND bus_id NOT IN
(SELECT bus1
FROM grid.egon_etrago_link
WHERE scn_name = '{scn}'
AND carrier = '{corresponding_carriers[scn][key]}')
;
""",
warning=False,
)
if not isolated_gas_buses.empty:
logger.info(f"Isolated {key} buses:")
logger.info(isolated_gas_buses)
# Deviation of the gas grid buses number
target_file = (
Path(".") / "datasets" / "gas_data" / "data" / "IGGIELGN_Nodes.csv"
)
Grid_buses_list = pd.read_csv(
target_file,
delimiter=";",
decimal=".",
usecols=["country_code"],
)
Grid_buses_list = Grid_buses_list[
Grid_buses_list["country_code"].str.match("DE")
]
input_grid_buses = len(Grid_buses_list.index)
for carrier in ["CH4", "H2_grid"]:
output_grid_buses_df = db.select_dataframe(
f"""
SELECT bus_id
FROM grid.egon_etrago_bus
WHERE scn_name = '{scn}'
AND country = 'DE'
AND carrier = '{carrier}';
""",
warning=False,
)
output_grid_buses = len(output_grid_buses_df.index)
e_grid_buses = (
round(
(output_grid_buses - input_grid_buses) / input_grid_buses,
2,
)
* 100
)
logger.info(f"Deviation {carrier} buses: {e_grid_buses} %")
[docs]def sanity_check_CH4_stores(scn):
"""Execute sanity checks for the CH4 stores in Germany
Returns print statements as sanity checks for the CH4 stores
capacity in Germany. The deviation is calculated between:
* the sum of the capacities of the stores with carrier 'CH4'
in the database (for one scenario) and
* the sum of:
* the capacity the gas grid allocated to CH4 (total capacity
in eGon2035 and capacity reduced the share of the grid
allocated to H2 in eGon100RE)
* the total capacity of the CH4 stores in Germany (source: GIE)
Parameters
----------
scn_name : str
Name of the scenario
"""
output_CH4_stores = db.select_dataframe(
f"""SELECT SUM(e_nom::numeric) as e_nom_germany
FROM grid.egon_etrago_store
WHERE scn_name = '{scn}'
AND carrier = 'CH4'
AND bus IN
(SELECT bus_id
FROM grid.egon_etrago_bus
WHERE scn_name = '{scn}'
AND country = 'DE'
AND carrier = 'CH4');
""",
warning=False,
)["e_nom_germany"].values[0]
if scn == "eGon2035":
grid_cap = 130000
elif scn == "eGon100RE":
grid_cap = 13000 * (
1
- get_sector_parameters("gas", "eGon100RE")[
"retrofitted_CH4pipeline-to-H2pipeline_share"
]
)
stores_cap_D = 266424202 # MWh GIE https://www.gie.eu/transparency/databases/storage-database/
input_CH4_stores = stores_cap_D + grid_cap
e_CH4_stores = (
round(
(output_CH4_stores - input_CH4_stores) / input_CH4_stores,
2,
)
* 100
)
logger.info(f"Deviation CH4 stores: {e_CH4_stores} %")
[docs]def sanity_check_H2_saltcavern_stores(scn):
"""Execute sanity checks for the H2 saltcavern stores in Germany
Returns print as sanity checks for the H2 saltcavern potential
storage capacity in Germany. The deviation is calculated between:
* the sum of the of the H2 saltcavern potential storage capacity
(e_nom_max) in the database and
* the sum of the H2 saltcavern potential storage capacity
assumed to be the ratio of the areas of 500 m radius around
substations in each german federal state and the estimated
total hydrogen storage potential of the corresponding federal
state (data from InSpEE-DS report).
This test works also in test mode.
Parameters
----------
scn_name : str
Name of the scenario
"""
output_H2_stores = db.select_dataframe(
f"""SELECT SUM(e_nom_max::numeric) as e_nom_max_germany
FROM grid.egon_etrago_store
WHERE scn_name = '{scn}'
AND carrier = 'H2_underground'
AND bus IN
(SELECT bus_id
FROM grid.egon_etrago_bus
WHERE scn_name = '{scn}'
AND country = 'DE'
AND carrier = 'H2_saltcavern');
""",
warning=False,
)["e_nom_max_germany"].values[0]
storage_potentials = calculate_and_map_saltcavern_storage_potential()
storage_potentials["storage_potential"] = (
storage_potentials["area_fraction"] * storage_potentials["potential"]
)
input_H2_stores = sum(storage_potentials["storage_potential"].to_list())
e_H2_stores = (
round(
(output_H2_stores - input_H2_stores) / input_H2_stores,
2,
)
* 100
)
logger.info(f"Deviation H2 saltcavern stores: {e_H2_stores} %")
[docs]def sanity_check_gas_one_port(scn):
"""Check connections of gas one-port components
Verify that gas one-port component (loads, generators, stores) are
all connected to a bus (of the right carrier) present in the data
base. Return print statements if this is not the case.
These sanity checks are not specific to Germany, they also include
the neighbouring countries.
Parameters
----------
scn_name : str
Name of the scenario
"""
if scn == "eGon2035":
# Loads
## CH4_for_industry Germany
isolated_one_port_c = db.select_dataframe(
f"""
SELECT load_id, bus, carrier, scn_name
FROM grid.egon_etrago_load
WHERE scn_name = '{scn}'
AND carrier = 'CH4_for_industry'
AND bus NOT IN
(SELECT bus_id
FROM grid.egon_etrago_bus
WHERE scn_name = '{scn}'
AND country = 'DE'
AND carrier = 'CH4')
;
""",
warning=False,
)
if not isolated_one_port_c.empty:
logger.info("Isolated loads:")
logger.info(isolated_one_port_c)
## CH4_for_industry abroad
isolated_one_port_c = db.select_dataframe(
f"""
SELECT load_id, bus, carrier, scn_name
FROM grid.egon_etrago_load
WHERE scn_name = '{scn}'
AND carrier = 'CH4'
AND bus NOT IN
(SELECT bus_id
FROM grid.egon_etrago_bus
WHERE scn_name = '{scn}'
AND country != 'DE'
AND carrier = 'CH4')
;
""",
warning=False,
)
if not isolated_one_port_c.empty:
logger.info("Isolated loads:")
logger.info(isolated_one_port_c)
## H2_for_industry
isolated_one_port_c = db.select_dataframe(
f"""
SELECT load_id, bus, carrier, scn_name
FROM grid.egon_etrago_load
WHERE scn_name = '{scn}'
AND carrier = 'H2_for_industry'
AND (bus NOT IN
(SELECT bus_id
FROM grid.egon_etrago_bus
WHERE scn_name = '{scn}'
AND country = 'DE'
AND carrier = 'H2_grid')
AND bus NOT IN
(SELECT bus_id
FROM grid.egon_etrago_bus
WHERE scn_name = '{scn}'
AND country != 'DE'
AND carrier = 'AC'))
;
""",
warning=False,
)
if not isolated_one_port_c.empty:
logger.info("Isolated loads:")
logger.info(isolated_one_port_c)
# Genrators
isolated_one_port_c = db.select_dataframe(
f"""
SELECT generator_id, bus, carrier, scn_name
FROM grid.egon_etrago_generator
WHERE scn_name = '{scn}'
AND carrier = 'CH4'
AND bus NOT IN
(SELECT bus_id
FROM grid.egon_etrago_bus
WHERE scn_name = '{scn}'
AND carrier = 'CH4');
;
""",
warning=False,
)
if not isolated_one_port_c.empty:
logger.info("Isolated generators:")
logger.info(isolated_one_port_c)
# Stores
## CH4 and H2_underground
corresponding_carriers = {
"CH4": "CH4",
"H2_saltcavern": "H2_underground",
}
for key in corresponding_carriers:
isolated_one_port_c = db.select_dataframe(
f"""
SELECT store_id, bus, carrier, scn_name
FROM grid.egon_etrago_store
WHERE scn_name = '{scn}'
AND carrier = '{corresponding_carriers[key]}'
AND bus NOT IN
(SELECT bus_id
FROM grid.egon_etrago_bus
WHERE scn_name = '{scn}'
AND carrier = '{key}')
;
""",
warning=False,
)
if not isolated_one_port_c.empty:
logger.info("Isolated stores:")
logger.info(isolated_one_port_c)
## H2_overground
isolated_one_port_c = db.select_dataframe(
f"""
SELECT store_id, bus, carrier, scn_name
FROM grid.egon_etrago_store
WHERE scn_name = '{scn}'
AND carrier = 'H2_overground'
AND bus NOT IN
(SELECT bus_id
FROM grid.egon_etrago_bus
WHERE scn_name = '{scn}'
AND country = 'DE'
AND carrier = 'H2_saltcavern')
AND bus NOT IN
(SELECT bus_id
FROM grid.egon_etrago_bus
WHERE scn_name = '{scn}'
AND country = 'DE'
AND carrier = 'H2_grid')
;
""",
warning=False,
)
if not isolated_one_port_c.empty:
logger.info("Isolated stores:")
logger.info(isolated_one_port_c)
# elif scn == "eGon2035":
[docs]def sanity_check_CH4_grid(scn):
"""Execute sanity checks for the gas grid capacity in Germany
Returns print statements as sanity checks for the CH4 links
(pipelines) in Germany. The deviation is calculated between
the sum of the power (p_nom) of all the CH4 pipelines in Germany
for one scenario in the database and the sum of the powers of the
imported pipelines.
In eGon100RE, the sum is reduced by the share of the grid that is
allocated to hydrogen (share calculated by PyPSA-eur-sec).
This test works also in test mode.
Parameters
----------
scn_name : str
Name of the scenario
Returns
-------
scn_name : float
Sum of the power (p_nom) of all the pipelines in Germany
"""
grid_carrier = "CH4"
output_gas_grid = db.select_dataframe(
f"""SELECT SUM(p_nom::numeric) as p_nom_germany
FROM grid.egon_etrago_link
WHERE scn_name = '{scn}'
AND carrier = '{grid_carrier}'
AND bus0 IN
(SELECT bus_id
FROM grid.egon_etrago_bus
WHERE scn_name = '{scn}'
AND country = 'DE'
AND carrier = '{grid_carrier}')
AND bus1 IN
(SELECT bus_id
FROM grid.egon_etrago_bus
WHERE scn_name = '{scn}'
AND country = 'DE'
AND carrier = '{grid_carrier}')
;
""",
warning=False,
)["p_nom_germany"].values[0]
gas_nodes_list = define_gas_nodes_list()
abroad_gas_nodes_list = define_gas_buses_abroad()
gas_grid = define_gas_pipeline_list(gas_nodes_list, abroad_gas_nodes_list)
gas_grid_germany = gas_grid[
(gas_grid["country_0"] == "DE") & (gas_grid["country_1"] == "DE")
]
p_nom_total = sum(gas_grid_germany["p_nom"].to_list())
if scn == "eGon2035":
input_gas_grid = p_nom_total
if scn == "eGon100RE":
input_gas_grid = p_nom_total * (
1
- get_sector_parameters("gas", "eGon100RE")[
"retrofitted_CH4pipeline-to-H2pipeline_share"
]
)
e_gas_grid = (
round(
(output_gas_grid - input_gas_grid) / input_gas_grid,
2,
)
* 100
)
logger.info(f"Deviation of the capacity of the CH4 grid: {e_gas_grid} %")
return p_nom_total
[docs]def sanity_check_gas_links(scn):
"""Check connections of gas links
Verify that gas links are all connected to buses present in the data
base. Return print statements if this is not the case.
This sanity check is not specific to Germany, it also includes
the neighbouring countries.
Parameters
----------
scn_name : str
Name of the scenario
"""
carriers = [
"CH4",
"H2_feedin",
"H2_to_CH4",
"CH4_to_H2",
"H2_to_power",
"power_to_H2",
"OCGT",
"central_gas_boiler",
"central_gas_CHP",
"central_gas_CHP_heat",
"industrial_gas_CHP",
]
for c in carriers:
link_with_missing_bus = db.select_dataframe(
f"""
SELECT link_id, bus0, bus1, carrier, scn_name
FROM grid.egon_etrago_link
WHERE scn_name = '{scn}'
AND carrier = '{c}'
AND (bus0 NOT IN
(SELECT bus_id
FROM grid.egon_etrago_bus
WHERE scn_name = '{scn}')
OR bus1 NOT IN
(SELECT bus_id
FROM grid.egon_etrago_bus
WHERE scn_name = '{scn}'))
;
""",
warning=False,
)
if not link_with_missing_bus.empty:
logger.info("Links with missing bus:")
logger.info(link_with_missing_bus)
[docs]def etrago_eGon2035_gas_DE():
"""Execute basic sanity checks for the gas sector in eGon2035
Returns print statements as sanity checks for the gas sector in
the eGon2035 scenario for the following components in Germany:
* Buses: with the function :py:func:`sanity_check_gas_buses`
* Loads: for the carriers 'CH4_for_industry' and 'H2_for_industry'
the deviation is calculated between the sum of the loads in the
database and the sum the loads in the sources document
(opendata.ffe database)
* Generators: the deviation is calculated between the sums of the
nominal powers of the gas generators in the database and of
the ones in the sources document (Biogaspartner Einspeiseatlas
Deutschland from the dena and Productions from the SciGRID_gas
data)
* Stores: deviations for stores with following carriers are
calculated:
* 'CH4': with the function :py:func:`sanity_check_CH4_stores`
* 'H2_underground': with the function :py:func:`sanity_check_H2_saltcavern_stores`
* One-port components (loads, generators, stores): verification
that they are all connected to a bus present in the data base
with the function :py:func:`sanity_check_gas_one_port`
* Links: verification:
* that the gas links are all connected to buses present in
the data base with the function :py:func:`sanity_check_gas_links`
* of the capacity of the gas grid with the function
:py:func:`sanity_check_CH4_grid`
"""
scn = "eGon2035"
if TESTMODE_OFF:
logger.info(f"Gas sanity checks for scenario {scn}")
# Buses
sanity_check_gas_buses(scn)
# Loads
logger.info("LOADS")
path = Path(".") / "datasets" / "gas_data" / "demand"
corr_file = path / "region_corr.json"
df_corr = pd.read_json(corr_file)
df_corr = df_corr.loc[:, ["id_region", "name_short"]]
df_corr.set_index("id_region", inplace=True)
for carrier in ["CH4_for_industry", "H2_for_industry"]:
output_gas_demand = db.select_dataframe(
f"""SELECT (SUM(
(SELECT SUM(p)
FROM UNNEST(b.p_set) p))/1000000)::numeric as load_twh
FROM grid.egon_etrago_load a
JOIN grid.egon_etrago_load_timeseries b
ON (a.load_id = b.load_id)
JOIN grid.egon_etrago_bus c
ON (a.bus=c.bus_id)
AND b.scn_name = '{scn}'
AND a.scn_name = '{scn}'
AND c.scn_name = '{scn}'
AND c.country = 'DE'
AND a.carrier = '{carrier}';
""",
warning=False,
)["load_twh"].values[0]
input_gas_demand = pd.read_json(
path / (carrier + "_eGon2035.json")
)
input_gas_demand = input_gas_demand.loc[:, ["id_region", "value"]]
input_gas_demand.set_index("id_region", inplace=True)
input_gas_demand = pd.concat(
[input_gas_demand, df_corr], axis=1, join="inner"
)
input_gas_demand["NUTS0"] = (input_gas_demand["name_short"].str)[
0:2
]
input_gas_demand = input_gas_demand[
input_gas_demand["NUTS0"].str.match("DE")
]
input_gas_demand = sum(input_gas_demand.value.to_list()) / 1000000
e_demand = (
round(
(output_gas_demand - input_gas_demand) / input_gas_demand,
2,
)
* 100
)
logger.info(f"Deviation {carrier}: {e_demand} %")
# Generators
logger.info("GENERATORS")
carrier_generator = "CH4"
output_gas_generation = db.select_dataframe(
f"""SELECT SUM(p_nom::numeric) as p_nom_germany
FROM grid.egon_etrago_generator
WHERE scn_name = '{scn}'
AND carrier = '{carrier_generator}'
AND bus IN
(SELECT bus_id
FROM grid.egon_etrago_bus
WHERE scn_name = '{scn}'
AND country = 'DE'
AND carrier = '{carrier_generator}');
""",
warning=False,
)["p_nom_germany"].values[0]
target_file = (
Path(".")
/ "datasets"
/ "gas_data"
/ "data"
/ "IGGIELGN_Productions.csv"
)
NG_generators_list = pd.read_csv(
target_file,
delimiter=";",
decimal=".",
usecols=["country_code", "param"],
)
NG_generators_list = NG_generators_list[
NG_generators_list["country_code"].str.match("DE")
]
p_NG = 0
for index, row in NG_generators_list.iterrows():
param = ast.literal_eval(row["param"])
p_NG = p_NG + param["max_supply_M_m3_per_d"]
conversion_factor = 437.5 # MCM/day to MWh/h
p_NG = p_NG * conversion_factor
basename = "Biogaspartner_Einspeiseatlas_Deutschland_2021.xlsx"
target_file = Path(".") / "datasets" / "gas_data" / basename
conversion_factor_b = 0.01083 # m^3/h to MWh/h
p_biogas = (
pd.read_excel(
target_file,
usecols=["Einspeisung Biomethan [(N*m^3)/h)]"],
)["Einspeisung Biomethan [(N*m^3)/h)]"].sum()
* conversion_factor_b
)
input_gas_generation = p_NG + p_biogas
e_generation = (
round(
(output_gas_generation - input_gas_generation)
/ input_gas_generation,
2,
)
* 100
)
logger.info(
f"Deviation {carrier_generator} generation: {e_generation} %"
)
# Stores
logger.info("STORES")
sanity_check_CH4_stores(scn)
sanity_check_H2_saltcavern_stores(scn)
# One-port components
sanity_check_gas_one_port(scn)
# Links
logger.info("LINKS")
sanity_check_CH4_grid(scn)
sanity_check_gas_links(scn)
else:
print("Testmode is on, skipping sanity check.")
[docs]def etrago_eGon2035_gas_abroad():
"""Execute basic sanity checks for the gas sector in eGon2035 abroad
Returns print statements as sanity checks for the gas sector in
the eGon2035 scenario for the following components in Germany:
* Buses
* Loads: for the carriers 'CH4' and 'H2_for_industry'
the deviation is calculated between the sum of the loads in the
database and the sum in the sources document (TYNDP)
* Generators: the deviation is calculated between the sums of the
nominal powers of the methane generators abroad in the database
and of the ones in the sources document (TYNDP)
* Stores: the deviation for methane stores abroad is calculated
between the sum of the capacities in the data base and the one
of the source document (SciGRID_gas data)
* Links: verification of the capacity of the crossbordering gas
grid pipelines.
"""
scn = "eGon2035"
if TESTMODE_OFF:
logger.info(f"Gas sanity checks abroad for scenario {scn}")
# Buses
logger.info("BUSES")
# Are gas buses isolated?
corresponding_carriers = {
"eGon2035": {
"CH4": "CH4",
},
# "eGon100RE": {
# "CH4": "CH4",
# "H2_grid": "H2_retrofit",
# }
}
for key in corresponding_carriers[scn]:
isolated_gas_buses_abroad = db.select_dataframe(
f"""
SELECT bus_id, carrier, country
FROM grid.egon_etrago_bus
WHERE scn_name = '{scn}'
AND carrier = '{key}'
AND country != 'DE'
AND bus_id NOT IN
(SELECT bus0
FROM grid.egon_etrago_link
WHERE scn_name = '{scn}'
AND carrier = '{corresponding_carriers[scn][key]}')
AND bus_id NOT IN
(SELECT bus1
FROM grid.egon_etrago_link
WHERE scn_name = '{scn}'
AND carrier = '{corresponding_carriers[scn][key]}')
;
""",
warning=False,
)
if not isolated_gas_buses_abroad.empty:
logger.info(f"Isolated {key} buses abroad:")
logger.info(isolated_gas_buses_abroad)
# Loads
logger.info("LOADS")
(
Norway_global_demand_1y,
normalized_ch4_demandTS,
) = import_ch4_demandTS()
input_CH4_demand_abroad = calc_global_ch4_demand(
Norway_global_demand_1y
)
input_CH4_demand = input_CH4_demand_abroad["GlobD_2035"].sum()
## CH4
output_CH4_demand = db.select_dataframe(
f"""SELECT (SUM(
(SELECT SUM(p)
FROM UNNEST(b.p_set) p)))::numeric as load_mwh
FROM grid.egon_etrago_load a
JOIN grid.egon_etrago_load_timeseries b
ON (a.load_id = b.load_id)
JOIN grid.egon_etrago_bus c
ON (a.bus=c.bus_id)
AND b.scn_name = '{scn}'
AND a.scn_name = '{scn}'
AND c.scn_name = '{scn}'
AND c.country != 'DE'
AND a.carrier = 'CH4';
""",
warning=False,
)["load_mwh"].values[0]
e_demand_CH4 = (
round(
(output_CH4_demand - input_CH4_demand) / input_CH4_demand,
2,
)
* 100
)
logger.info(f"Deviation CH4 load: {e_demand_CH4} %")
## H2_for_industry
input_power_to_h2_demand_abroad = calc_global_power_to_h2_demand()
input_H2_demand = input_power_to_h2_demand_abroad["GlobD_2035"].sum()
output_H2_demand = db.select_dataframe(
f"""SELECT SUM(p_set::numeric) as p_set_abroad
FROM grid.egon_etrago_load
WHERE scn_name = '{scn}'
AND carrier = 'H2_for_industry'
AND bus IN
(SELECT bus_id
FROM grid.egon_etrago_bus
WHERE scn_name = '{scn}'
AND country != 'DE'
AND carrier = 'AC');
""",
warning=False,
)["p_set_abroad"].values[0]
e_demand_H2 = (
round(
(output_H2_demand - input_H2_demand) / input_H2_demand,
2,
)
* 100
)
logger.info(f"Deviation H2_for_industry load: {e_demand_H2} %")
# Generators
logger.info("GENERATORS ")
CH4_gen = calc_capacities()
input_CH4_gen = CH4_gen["cap_2035"].sum()
output_CH4_gen = db.select_dataframe(
f"""SELECT SUM(p_nom::numeric) as p_nom_abroad
FROM grid.egon_etrago_generator
WHERE scn_name = '{scn}'
AND carrier = 'CH4'
AND bus IN
(SELECT bus_id
FROM grid.egon_etrago_bus
WHERE scn_name = '{scn}'
AND country != 'DE'
AND carrier = 'CH4');
""",
warning=False,
)["p_nom_abroad"].values[0]
e_gen = (
round(
(output_CH4_gen - input_CH4_gen) / input_CH4_gen,
2,
)
* 100
)
logger.info(f"Deviation CH4 generators: {e_gen} %")
# Stores
logger.info("STORES")
ch4_input_capacities = calc_ch4_storage_capacities()
input_CH4_stores = ch4_input_capacities["e_nom"].sum()
output_CH4_stores = db.select_dataframe(
f"""SELECT SUM(e_nom::numeric) as e_nom_abroad
FROM grid.egon_etrago_store
WHERE scn_name = '{scn}'
AND carrier = 'CH4'
AND bus IN
(SELECT bus_id
FROM grid.egon_etrago_bus
WHERE scn_name = '{scn}'
AND country != 'DE'
AND carrier = 'CH4');
""",
warning=False,
)["e_nom_abroad"].values[0]
e_stores = (
round(
(output_CH4_stores - input_CH4_stores) / input_CH4_stores,
2,
)
* 100
)
logger.info(f"Deviation CH4 stores: {e_stores} %")
# Links
logger.info("LINKS")
ch4_grid_input_capacities = calculate_ch4_grid_capacities()
input_CH4_grid = ch4_grid_input_capacities["p_nom"].sum()
grid_carrier = "CH4"
output_gas_grid = db.select_dataframe(
f"""SELECT SUM(p_nom::numeric) as p_nom
FROM grid.egon_etrago_link
WHERE scn_name = '{scn}'
AND carrier = '{grid_carrier}'
AND (bus0 IN
(SELECT bus_id
FROM grid.egon_etrago_bus
WHERE scn_name = '{scn}'
AND country != 'DE'
AND carrier = '{grid_carrier}')
OR bus1 IN
(SELECT bus_id
FROM grid.egon_etrago_bus
WHERE scn_name = '{scn}'
AND country != 'DE'
AND carrier = '{grid_carrier}'))
;
""",
warning=False,
)["p_nom"].values[0]
e_gas_grid = (
round(
(output_gas_grid - input_CH4_grid) / input_CH4_grid,
2,
)
* 100
)
logger.info(
f"Deviation of the capacity of the crossbordering CH4 grid: {e_gas_grid} %"
)
else:
print("Testmode is on, skipping sanity check.")
[docs]def sanitycheck_dsm():
def df_from_series(s: pd.Series):
return pd.DataFrame.from_dict(dict(zip(s.index, s.values)))
for scenario in ["eGon2035", "eGon100RE"]:
# p_min and p_max
sql = f"""
SELECT link_id, bus0 as bus, p_nom FROM grid.egon_etrago_link
WHERE carrier = 'dsm'
AND scn_name = '{scenario}'
ORDER BY link_id
"""
meta_df = db.select_dataframe(sql, index_col="link_id")
link_ids = str(meta_df.index.tolist())[1:-1]
sql = f"""
SELECT link_id, p_min_pu, p_max_pu
FROM grid.egon_etrago_link_timeseries
WHERE scn_name = '{scenario}'
AND link_id IN ({link_ids})
ORDER BY link_id
"""
ts_df = db.select_dataframe(sql, index_col="link_id")
p_max_df = df_from_series(ts_df.p_max_pu).mul(meta_df.p_nom)
p_min_df = df_from_series(ts_df.p_min_pu).mul(meta_df.p_nom)
p_max_df.columns = meta_df.bus.tolist()
p_min_df.columns = meta_df.bus.tolist()
targets = config.datasets()["DSM_CTS_industry"]["targets"]
tables = [
"cts_loadcurves_dsm",
"ind_osm_loadcurves_individual_dsm",
"demandregio_ind_sites_dsm",
"ind_sites_loadcurves_individual",
]
df_list = []
for table in tables:
target = targets[table]
sql = f"""
SELECT bus, p_min, p_max, e_max, e_min
FROM {target["schema"]}.{target["table"]}
WHERE scn_name = '{scenario}'
ORDER BY bus
"""
df_list.append(db.select_dataframe(sql))
individual_ts_df = pd.concat(df_list, ignore_index=True)
groups = individual_ts_df[["bus"]].reset_index().groupby("bus").groups
individual_p_max_df = df_from_series(individual_ts_df.p_max)
individual_p_max_df = pd.DataFrame(
[
individual_p_max_df[idxs].sum(axis=1)
for idxs in groups.values()
],
index=groups.keys(),
).T
individual_p_min_df = df_from_series(individual_ts_df.p_min)
individual_p_min_df = pd.DataFrame(
[
individual_p_min_df[idxs].sum(axis=1)
for idxs in groups.values()
],
index=groups.keys(),
).T
# due to the fact that time series are clipped at zero (either
# direction) there is a little difference between the sum of the
# individual time series and the aggregated time series as the second
# is generated independent of the others. This makes atol=1e-01
# necessary.
atol = 1e-01
assert np.allclose(p_max_df, individual_p_max_df, atol=atol)
assert np.allclose(p_min_df, individual_p_min_df, atol=atol)
# e_min and e_max
sql = f"""
SELECT store_id, bus, e_nom FROM grid.egon_etrago_store
WHERE carrier = 'dsm'
AND scn_name = '{scenario}'
ORDER BY store_id
"""
meta_df = db.select_dataframe(sql, index_col="store_id")
store_ids = str(meta_df.index.tolist())[1:-1]
sql = f"""
SELECT store_id, e_min_pu, e_max_pu
FROM grid.egon_etrago_store_timeseries
WHERE scn_name = '{scenario}'
AND store_id IN ({store_ids})
ORDER BY store_id
"""
ts_df = db.select_dataframe(sql, index_col="store_id")
e_max_df = df_from_series(ts_df.e_max_pu).mul(meta_df.e_nom)
e_min_df = df_from_series(ts_df.e_min_pu).mul(meta_df.e_nom)
e_max_df.columns = meta_df.bus.tolist()
e_min_df.columns = meta_df.bus.tolist()
individual_e_max_df = df_from_series(individual_ts_df.e_max)
individual_e_max_df = pd.DataFrame(
[
individual_e_max_df[idxs].sum(axis=1)
for idxs in groups.values()
],
index=groups.keys(),
).T
individual_e_min_df = df_from_series(individual_ts_df.e_min)
individual_e_min_df = pd.DataFrame(
[
individual_e_min_df[idxs].sum(axis=1)
for idxs in groups.values()
],
index=groups.keys(),
).T
assert np.allclose(e_max_df, individual_e_max_df)
assert np.allclose(e_min_df, individual_e_min_df)