Source code for egon.data.datasets.sanity_checks

"""
This module does sanity checks for both the eGon2035 and the eGon100RE scenario
separately where a percentage error is given to showcase difference in output
and input values. Please note that there are missing input technologies in the
supply tables.
Authors: @ALonso, @dana, @nailend, @nesnoj, @khelfen
"""
from math import isclose
from pathlib import Path

from sqlalchemy import Numeric
from sqlalchemy.sql import and_, cast, func, or_
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import seaborn as sns

from egon.data import config, db, logger
from egon.data.datasets import Dataset
from egon.data.datasets.electricity_demand_timeseries.cts_buildings import (
    EgonCtsElectricityDemandBuildingShare,
    EgonCtsHeatDemandBuildingShare,
)
from egon.data.datasets.emobility.motorized_individual_travel.db_classes import (  # noqa: E501
    EgonEvCountMunicipality,
    EgonEvCountMvGridDistrict,
    EgonEvCountRegistrationDistrict,
    EgonEvMvGridDistrict,
    EgonEvPool,
    EgonEvTrip,
)
from egon.data.datasets.emobility.motorized_individual_travel.helpers import (
    DATASET_CFG,
    read_simbev_metadata_file,
)
from egon.data.datasets.etrago_setup import (
    EgonPfHvLink,
    EgonPfHvLinkTimeseries,
    EgonPfHvLoad,
    EgonPfHvLoadTimeseries,
    EgonPfHvStore,
    EgonPfHvStoreTimeseries,
)
from egon.data.datasets.power_plants.pv_rooftop_buildings import (
    PV_CAP_PER_SQ_M,
    ROOF_FACTOR,
    SCENARIOS,
    load_building_data,
    scenario_data,
)
from egon.data.datasets.scenario_parameters import get_sector_parameters
from egon.data.datasets.storages.home_batteries import get_cbat_pbat_ratio
import egon.data

TESTMODE_OFF = (
    config.settings()["egon-data"]["--dataset-boundary"] == "Everything"
)


[docs]class SanityChecks(Dataset): def __init__(self, dependencies): super().__init__( name="SanityChecks", version="0.0.7", dependencies=dependencies, tasks={ etrago_eGon2035_electricity, etrago_eGon2035_heat, residential_electricity_annual_sum, residential_electricity_hh_refinement, cts_electricity_demand_share, cts_heat_demand_share, sanitycheck_emobility_mit, sanitycheck_pv_rooftop_buildings, sanitycheck_home_batteries, sanitycheck_dsm, }, )
[docs]def etrago_eGon2035_electricity(): """Execute basic sanity checks. Returns print statements as sanity checks for the electricity sector in the eGon2035 scenario. Parameters ---------- None Returns ------- None """ scn = "eGon2035" # Section to check generator capacities logger.info(f"Sanity checks for scenario {scn}") logger.info( "For German electricity generators the following deviations between " "the inputs and outputs can be observed:" ) carriers_electricity = [ "others", "reservoir", "run_of_river", "oil", "wind_onshore", "wind_offshore", "solar", "solar_rooftop", "biomass", ] for carrier in carriers_electricity: if carrier == "biomass": sum_output = db.select_dataframe( """SELECT scn_name, SUM(p_nom::numeric) as output_capacity_mw FROM grid.egon_etrago_generator WHERE bus IN ( SELECT bus_id FROM grid.egon_etrago_bus WHERE scn_name = 'eGon2035' AND country = 'DE') AND carrier IN ('biomass', 'industrial_biomass_CHP', 'central_biomass_CHP') GROUP BY (scn_name); """, warning=False, ) else: sum_output = db.select_dataframe( f"""SELECT scn_name, SUM(p_nom::numeric) as output_capacity_mw FROM grid.egon_etrago_generator WHERE scn_name = '{scn}' AND carrier IN ('{carrier}') AND bus IN (SELECT bus_id FROM grid.egon_etrago_bus WHERE scn_name = 'eGon2035' AND country = 'DE') GROUP BY (scn_name); """, warning=False, ) sum_input = db.select_dataframe( f"""SELECT carrier, SUM(capacity::numeric) as input_capacity_mw FROM supply.egon_scenario_capacities WHERE carrier= '{carrier}' AND scenario_name ='{scn}' GROUP BY (carrier); """, warning=False, ) if ( sum_output.output_capacity_mw.sum() == 0 and sum_input.input_capacity_mw.sum() == 0 ): logger.info( f"No capacity for carrier '{carrier}' needed to be" f" distributed. Everything is fine" ) elif ( sum_input.input_capacity_mw.sum() > 0 and sum_output.output_capacity_mw.sum() == 0 ): logger.info( f"Error: Capacity for carrier '{carrier}' was not distributed " f"at all!" ) elif ( sum_output.output_capacity_mw.sum() > 0 and sum_input.input_capacity_mw.sum() == 0 ): logger.info( f"Error: Eventhough no input capacity was provided for carrier" f"'{carrier}' a capacity got distributed!" ) else: sum_input["error"] = ( (sum_output.output_capacity_mw - sum_input.input_capacity_mw) / sum_input.input_capacity_mw ) * 100 g = sum_input["error"].values[0] logger.info(f"{carrier}: " + str(round(g, 2)) + " %") # Section to check storage units logger.info(f"Sanity checks for scenario {scn}") logger.info( "For German electrical storage units the following deviations between" "the inputs and outputs can be observed:" ) carriers_electricity = ["pumped_hydro"] for carrier in carriers_electricity: sum_output = db.select_dataframe( f"""SELECT scn_name, SUM(p_nom::numeric) as output_capacity_mw FROM grid.egon_etrago_storage WHERE scn_name = '{scn}' AND carrier IN ('{carrier}') AND bus IN (SELECT bus_id FROM grid.egon_etrago_bus WHERE scn_name = 'eGon2035' AND country = 'DE') GROUP BY (scn_name); """, warning=False, ) sum_input = db.select_dataframe( f"""SELECT carrier, SUM(capacity::numeric) as input_capacity_mw FROM supply.egon_scenario_capacities WHERE carrier= '{carrier}' AND scenario_name ='{scn}' GROUP BY (carrier); """, warning=False, ) if ( sum_output.output_capacity_mw.sum() == 0 and sum_input.input_capacity_mw.sum() == 0 ): print( f"No capacity for carrier '{carrier}' needed to be " f"distributed. Everything is fine" ) elif ( sum_input.input_capacity_mw.sum() > 0 and sum_output.output_capacity_mw.sum() == 0 ): print( f"Error: Capacity for carrier '{carrier}' was not distributed" f" at all!" ) elif ( sum_output.output_capacity_mw.sum() > 0 and sum_input.input_capacity_mw.sum() == 0 ): print( f"Error: Eventhough no input capacity was provided for carrier" f" '{carrier}' a capacity got distributed!" ) else: sum_input["error"] = ( (sum_output.output_capacity_mw - sum_input.input_capacity_mw) / sum_input.input_capacity_mw ) * 100 g = sum_input["error"].values[0] print(f"{carrier}: " + str(round(g, 2)) + " %") # Section to check loads print( "For German electricity loads the following deviations between the" " input and output can be observed:" ) output_demand = db.select_dataframe( """SELECT a.scn_name, a.carrier, SUM((SELECT SUM(p) FROM UNNEST(b.p_set) p))/1000000::numeric as load_twh FROM grid.egon_etrago_load a JOIN grid.egon_etrago_load_timeseries b ON (a.load_id = b.load_id) JOIN grid.egon_etrago_bus c ON (a.bus=c.bus_id) AND b.scn_name = 'eGon2035' AND a.scn_name = 'eGon2035' AND a.carrier = 'AC' AND c.scn_name= 'eGon2035' AND c.country='DE' GROUP BY (a.scn_name, a.carrier); """, warning=False, )["load_twh"].values[0] input_cts_ind = db.select_dataframe( """SELECT scenario, SUM(demand::numeric/1000000) as demand_mw_regio_cts_ind FROM demand.egon_demandregio_cts_ind WHERE scenario= 'eGon2035' AND year IN ('2035') GROUP BY (scenario); """, warning=False, )["demand_mw_regio_cts_ind"].values[0] input_hh = db.select_dataframe( """SELECT scenario, SUM(demand::numeric/1000000) as demand_mw_regio_hh FROM demand.egon_demandregio_hh WHERE scenario= 'eGon2035' AND year IN ('2035') GROUP BY (scenario); """, warning=False, )["demand_mw_regio_hh"].values[0] input_demand = input_hh + input_cts_ind e = round((output_demand - input_demand) / input_demand, 2) * 100 print(f"electricity demand: {e} %")
[docs]def etrago_eGon2035_heat(): """Execute basic sanity checks. Returns print statements as sanity checks for the heat sector in the eGon2035 scenario. Parameters ---------- None Returns ------- None """ # Check input and output values for the carriers "others", # "reservoir", "run_of_river" and "oil" scn = "eGon2035" # Section to check generator capacities print(f"Sanity checks for scenario {scn}") print( "For German heat demands the following deviations between the inputs" " and outputs can be observed:" ) # Sanity checks for heat demand output_heat_demand = db.select_dataframe( """SELECT a.scn_name, (SUM( (SELECT SUM(p) FROM UNNEST(b.p_set) p))/1000000)::numeric as load_twh FROM grid.egon_etrago_load a JOIN grid.egon_etrago_load_timeseries b ON (a.load_id = b.load_id) JOIN grid.egon_etrago_bus c ON (a.bus=c.bus_id) AND b.scn_name = 'eGon2035' AND a.scn_name = 'eGon2035' AND c.scn_name= 'eGon2035' AND c.country='DE' AND a.carrier IN ('rural_heat', 'central_heat') GROUP BY (a.scn_name); """, warning=False, )["load_twh"].values[0] input_heat_demand = db.select_dataframe( """SELECT scenario, SUM(demand::numeric/1000000) as demand_mw_peta_heat FROM demand.egon_peta_heat WHERE scenario= 'eGon2035' GROUP BY (scenario); """, warning=False, )["demand_mw_peta_heat"].values[0] e_demand = ( round((output_heat_demand - input_heat_demand) / input_heat_demand, 2) * 100 ) logger.info(f"heat demand: {e_demand} %") # Sanity checks for heat supply logger.info( "For German heat supplies the following deviations between the inputs " "and outputs can be observed:" ) # Comparison for central heat pumps heat_pump_input = db.select_dataframe( """SELECT carrier, SUM(capacity::numeric) as Urban_central_heat_pump_mw FROM supply.egon_scenario_capacities WHERE carrier= 'urban_central_heat_pump' AND scenario_name IN ('eGon2035') GROUP BY (carrier); """, warning=False, )["urban_central_heat_pump_mw"].values[0] heat_pump_output = db.select_dataframe( """SELECT carrier, SUM(p_nom::numeric) as Central_heat_pump_mw FROM grid.egon_etrago_link WHERE carrier= 'central_heat_pump' AND scn_name IN ('eGon2035') GROUP BY (carrier); """, warning=False, )["central_heat_pump_mw"].values[0] e_heat_pump = ( round((heat_pump_output - heat_pump_input) / heat_pump_output, 2) * 100 ) logger.info(f"'central_heat_pump': {e_heat_pump} % ") # Comparison for residential heat pumps input_residential_heat_pump = db.select_dataframe( """SELECT carrier, SUM(capacity::numeric) as residential_heat_pump_mw FROM supply.egon_scenario_capacities WHERE carrier= 'residential_rural_heat_pump' AND scenario_name IN ('eGon2035') GROUP BY (carrier); """, warning=False, )["residential_heat_pump_mw"].values[0] output_residential_heat_pump = db.select_dataframe( """SELECT carrier, SUM(p_nom::numeric) as rural_heat_pump_mw FROM grid.egon_etrago_link WHERE carrier= 'rural_heat_pump' AND scn_name IN ('eGon2035') GROUP BY (carrier); """, warning=False, )["rural_heat_pump_mw"].values[0] e_residential_heat_pump = ( round( (output_residential_heat_pump - input_residential_heat_pump) / input_residential_heat_pump, 2, ) * 100 ) logger.info(f"'residential heat pumps': {e_residential_heat_pump} %") # Comparison for resistive heater resistive_heater_input = db.select_dataframe( """SELECT carrier, SUM(capacity::numeric) as Urban_central_resistive_heater_MW FROM supply.egon_scenario_capacities WHERE carrier= 'urban_central_resistive_heater' AND scenario_name IN ('eGon2035') GROUP BY (carrier); """, warning=False, )["urban_central_resistive_heater_mw"].values[0] resistive_heater_output = db.select_dataframe( """SELECT carrier, SUM(p_nom::numeric) as central_resistive_heater_MW FROM grid.egon_etrago_link WHERE carrier= 'central_resistive_heater' AND scn_name IN ('eGon2035') GROUP BY (carrier); """, warning=False, )["central_resistive_heater_mw"].values[0] e_resistive_heater = ( round( (resistive_heater_output - resistive_heater_input) / resistive_heater_input, 2, ) * 100 ) logger.info(f"'resistive heater': {e_resistive_heater} %") # Comparison for solar thermal collectors input_solar_thermal = db.select_dataframe( """SELECT carrier, SUM(capacity::numeric) as solar_thermal_collector_mw FROM supply.egon_scenario_capacities WHERE carrier= 'urban_central_solar_thermal_collector' AND scenario_name IN ('eGon2035') GROUP BY (carrier); """, warning=False, )["solar_thermal_collector_mw"].values[0] output_solar_thermal = db.select_dataframe( """SELECT carrier, SUM(p_nom::numeric) as solar_thermal_collector_mw FROM grid.egon_etrago_generator WHERE carrier= 'solar_thermal_collector' AND scn_name IN ('eGon2035') GROUP BY (carrier); """, warning=False, )["solar_thermal_collector_mw"].values[0] e_solar_thermal = ( round( (output_solar_thermal - input_solar_thermal) / input_solar_thermal, 2, ) * 100 ) logger.info(f"'solar thermal collector': {e_solar_thermal} %") # Comparison for geothermal input_geo_thermal = db.select_dataframe( """SELECT carrier, SUM(capacity::numeric) as Urban_central_geo_thermal_MW FROM supply.egon_scenario_capacities WHERE carrier= 'urban_central_geo_thermal' AND scenario_name IN ('eGon2035') GROUP BY (carrier); """, warning=False, )["urban_central_geo_thermal_mw"].values[0] output_geo_thermal = db.select_dataframe( """SELECT carrier, SUM(p_nom::numeric) as geo_thermal_MW FROM grid.egon_etrago_generator WHERE carrier= 'geo_thermal' AND scn_name IN ('eGon2035') GROUP BY (carrier); """, warning=False, )["geo_thermal_mw"].values[0] e_geo_thermal = ( round((output_geo_thermal - input_geo_thermal) / input_geo_thermal, 2) * 100 ) logger.info(f"'geothermal': {e_geo_thermal} %")
[docs]def residential_electricity_annual_sum(rtol=1e-5): """Sanity check for dataset electricity_demand_timeseries : Demand_Building_Assignment Aggregate the annual demand of all census cells at NUTS3 to compare with initial scaling parameters from DemandRegio. """ df_nuts3_annual_sum = db.select_dataframe( sql=""" SELECT dr.nuts3, dr.scenario, dr.demand_regio_sum, profiles.profile_sum FROM ( SELECT scenario, SUM(demand) AS profile_sum, vg250_nuts3 FROM demand.egon_demandregio_zensus_electricity AS egon, boundaries.egon_map_zensus_vg250 AS boundaries Where egon.zensus_population_id = boundaries.zensus_population_id AND sector = 'residential' GROUP BY vg250_nuts3, scenario ) AS profiles JOIN ( SELECT nuts3, scenario, sum(demand) AS demand_regio_sum FROM demand.egon_demandregio_hh GROUP BY year, scenario, nuts3 ) AS dr ON profiles.vg250_nuts3 = dr.nuts3 and profiles.scenario = dr.scenario """ ) np.testing.assert_allclose( actual=df_nuts3_annual_sum["profile_sum"], desired=df_nuts3_annual_sum["demand_regio_sum"], rtol=rtol, verbose=False, ) logger.info( "Aggregated annual residential electricity demand" " matches with DemandRegio at NUTS-3." )
[docs]def residential_electricity_hh_refinement(rtol=1e-5): """Sanity check for dataset electricity_demand_timeseries : Household Demands Check sum of aggregated household types after refinement method was applied and compare it to the original census values.""" df_refinement = db.select_dataframe( sql=""" SELECT refined.nuts3, refined.characteristics_code, refined.sum_refined::int, census.sum_census::int FROM( SELECT nuts3, characteristics_code, SUM(hh_10types) as sum_refined FROM society.egon_destatis_zensus_household_per_ha_refined GROUP BY nuts3, characteristics_code) AS refined JOIN( SELECT t.nuts3, t.characteristics_code, sum(orig) as sum_census FROM( SELECT nuts3, cell_id, characteristics_code, sum(DISTINCT(hh_5types))as orig FROM society.egon_destatis_zensus_household_per_ha_refined GROUP BY cell_id, characteristics_code, nuts3) AS t GROUP BY t.nuts3, t.characteristics_code ) AS census ON refined.nuts3 = census.nuts3 AND refined.characteristics_code = census.characteristics_code """ ) np.testing.assert_allclose( actual=df_refinement["sum_refined"], desired=df_refinement["sum_census"], rtol=rtol, verbose=False, ) logger.info("All Aggregated household types match at NUTS-3.")
[docs]def cts_electricity_demand_share(rtol=1e-5): """Sanity check for dataset electricity_demand_timeseries : CtsBuildings Check sum of aggregated cts electricity demand share which equals to one for every substation as the substation profile is linearly disaggregated to all buildings.""" with db.session_scope() as session: cells_query = session.query(EgonCtsElectricityDemandBuildingShare) df_demand_share = pd.read_sql( cells_query.statement, cells_query.session.bind, index_col=None ) np.testing.assert_allclose( actual=df_demand_share.groupby(["bus_id", "scenario"])[ "profile_share" ].sum(), desired=1, rtol=rtol, verbose=False, ) logger.info("The aggregated demand shares equal to one!.")
[docs]def cts_heat_demand_share(rtol=1e-5): """Sanity check for dataset electricity_demand_timeseries : CtsBuildings Check sum of aggregated cts heat demand share which equals to one for every substation as the substation profile is linearly disaggregated to all buildings.""" with db.session_scope() as session: cells_query = session.query(EgonCtsHeatDemandBuildingShare) df_demand_share = pd.read_sql( cells_query.statement, cells_query.session.bind, index_col=None ) np.testing.assert_allclose( actual=df_demand_share.groupby(["bus_id", "scenario"])[ "profile_share" ].sum(), desired=1, rtol=rtol, verbose=False, ) logger.info("The aggregated demand shares equal to one!.")
[docs]def sanitycheck_pv_rooftop_buildings(): def egon_power_plants_pv_roof_building(): sql = """ SELECT * FROM supply.egon_power_plants_pv_roof_building """ return db.select_dataframe(sql, index_col="index") pv_roof_df = egon_power_plants_pv_roof_building() valid_buildings_gdf = load_building_data() valid_buildings_gdf = valid_buildings_gdf.assign( bus_id=valid_buildings_gdf.bus_id.astype(int), overlay_id=valid_buildings_gdf.overlay_id.astype(int), max_cap=valid_buildings_gdf.building_area.multiply( ROOF_FACTOR * PV_CAP_PER_SQ_M ), ) merge_df = pv_roof_df.merge( valid_buildings_gdf[["building_area"]], how="left", left_on="building_id", right_index=True, ) assert ( len(merge_df.loc[merge_df.building_area.isna()]) == 0 ), f"{len(merge_df.loc[merge_df.building_area.isna()])} != 0" scenarios = ["status_quo", "eGon2035"] base_path = Path(egon.data.__path__[0]).resolve() res_dir = base_path / "sanity_checks" res_dir.mkdir(parents=True, exist_ok=True) for scenario in scenarios: fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(15, 8)) scenario_df = merge_df.loc[merge_df.scenario == scenario] logger.info( scenario + " Capacity:\n" + str(scenario_df.capacity.describe()) ) small_gens_df = scenario_df.loc[scenario_df.capacity < 100] sns.histplot(data=small_gens_df, x="capacity", ax=ax1).set_title( scenario ) sns.scatterplot( data=small_gens_df, x="capacity", y="building_area", ax=ax2 ).set_title(scenario) plt.tight_layout() plt.savefig( res_dir / f"{scenario}_pv_rooftop_distribution.png", bbox_inches="tight", ) for scenario in SCENARIOS: if scenario == "eGon2035": assert isclose( scenario_data(scenario=scenario).capacity.sum(), merge_df.loc[merge_df.scenario == scenario].capacity.sum(), rel_tol=1e-02, ), ( f"{scenario_data(scenario=scenario).capacity.sum()} != " f"{merge_df.loc[merge_df.scenario == scenario].capacity.sum()}" ) elif scenario == "eGon100RE": sources = config.datasets()["solar_rooftop"]["sources"] target = db.select_dataframe( f""" SELECT capacity FROM {sources['scenario_capacities']['schema']}. {sources['scenario_capacities']['table']} a WHERE carrier = 'solar_rooftop' AND scenario_name = '{scenario}' """ ).capacity[0] dataset = config.settings()["egon-data"]["--dataset-boundary"] if dataset == "Schleswig-Holstein": sources = config.datasets()["scenario_input"]["sources"] path = Path( f"./data_bundle_egon_data/nep2035_version2021/" f"{sources['eGon2035']['capacities']}" ).resolve() total_2035 = ( pd.read_excel( path, sheet_name="1.Entwurf_NEP2035_V2021", index_col="Unnamed: 0", ).at["PV (Aufdach)", "Summe"] * 1000 ) sh_2035 = scenario_data(scenario="eGon2035").capacity.sum() share = sh_2035 / total_2035 target *= share assert isclose( target, merge_df.loc[merge_df.scenario == scenario].capacity.sum(), rel_tol=1e-02, ), ( f"{target} != " f"{merge_df.loc[merge_df.scenario == scenario].capacity.sum()}" ) else: raise ValueError(f"Scenario {scenario} is not valid.")
[docs]def sanitycheck_emobility_mit(): """Execute sanity checks for eMobility: motorized individual travel Checks data integrity for eGon2035, eGon2035_lowflex and eGon100RE scenario using assertions: 1. Allocated EV numbers and EVs allocated to grid districts 2. Trip data (original inout data from simBEV) 3. Model data in eTraGo PF tables (grid.egon_etrago_*) Parameters ---------- None Returns ------- None """ def check_ev_allocation(): # Get target number for scenario ev_count_target = scenario_variation_parameters["ev_count"] print(f" Target count: {str(ev_count_target)}") # Get allocated numbers ev_counts_dict = {} with db.session_scope() as session: for table, level in zip( [ EgonEvCountMvGridDistrict, EgonEvCountMunicipality, EgonEvCountRegistrationDistrict, ], ["Grid District", "Municipality", "Registration District"], ): query = session.query( func.sum( table.bev_mini + table.bev_medium + table.bev_luxury + table.phev_mini + table.phev_medium + table.phev_luxury ).label("ev_count") ).filter( table.scenario == scenario_name, table.scenario_variation == scenario_var_name, ) ev_counts = pd.read_sql( query.statement, query.session.bind, index_col=None ) ev_counts_dict[level] = ev_counts.iloc[0].ev_count print( f" Count table: Total count for level {level} " f"(table: {table.__table__}): " f"{str(ev_counts_dict[level])}" ) # Compare with scenario target (only if not in testmode) if TESTMODE_OFF: for level, count in ev_counts_dict.items(): np.testing.assert_allclose( count, ev_count_target, rtol=0.0001, err_msg=f"EV numbers in {level} seems to be flawed.", ) else: print(" Testmode is on, skipping sanity check...") # Get allocated EVs in grid districts with db.session_scope() as session: query = session.query( func.count(EgonEvMvGridDistrict.egon_ev_pool_ev_id).label( "ev_count" ), ).filter( EgonEvMvGridDistrict.scenario == scenario_name, EgonEvMvGridDistrict.scenario_variation == scenario_var_name, ) ev_count_alloc = ( pd.read_sql(query.statement, query.session.bind, index_col=None) .iloc[0] .ev_count ) print( f" EVs allocated to Grid Districts " f"(table: {EgonEvMvGridDistrict.__table__}) total count: " f"{str(ev_count_alloc)}" ) # Compare with scenario target (only if not in testmode) if TESTMODE_OFF: np.testing.assert_allclose( ev_count_alloc, ev_count_target, rtol=0.0001, err_msg=( "EV numbers allocated to Grid Districts seems to be " "flawed." ), ) else: print(" Testmode is on, skipping sanity check...") return ev_count_alloc def check_trip_data(): # Check if trips start at timestep 0 and have a max. of 35040 steps # (8760h in 15min steps) print(" Checking timeranges...") with db.session_scope() as session: query = session.query( func.count(EgonEvTrip.event_id).label("cnt") ).filter( or_( and_( EgonEvTrip.park_start > 0, EgonEvTrip.simbev_event_id == 0, ), EgonEvTrip.park_end > (60 / int(meta_run_config.stepsize)) * 8760, ), EgonEvTrip.scenario == scenario_name, ) invalid_trips = pd.read_sql( query.statement, query.session.bind, index_col=None ) np.testing.assert_equal( invalid_trips.iloc[0].cnt, 0, err_msg=( f"{str(invalid_trips.iloc[0].cnt)} trips in table " f"{EgonEvTrip.__table__} have invalid timesteps." ), ) # Check if charging demand can be covered by available charging energy # while parking print(" Compare charging demand with available power...") with db.session_scope() as session: query = session.query( func.count(EgonEvTrip.event_id).label("cnt") ).filter( func.round( cast( (EgonEvTrip.park_end - EgonEvTrip.park_start + 1) * EgonEvTrip.charging_capacity_nominal * (int(meta_run_config.stepsize) / 60), Numeric, ), 3, ) < cast(EgonEvTrip.charging_demand, Numeric), EgonEvTrip.scenario == scenario_name, ) invalid_trips = pd.read_sql( query.statement, query.session.bind, index_col=None ) np.testing.assert_equal( invalid_trips.iloc[0].cnt, 0, err_msg=( f"In {str(invalid_trips.iloc[0].cnt)} trips (table: " f"{EgonEvTrip.__table__}) the charging demand cannot be " f"covered by available charging power." ), ) def check_model_data(): # Check if model components were fully created print(" Check if all model components were created...") # Get MVGDs which got EV allocated with db.session_scope() as session: query = ( session.query( EgonEvMvGridDistrict.bus_id, ) .filter( EgonEvMvGridDistrict.scenario == scenario_name, EgonEvMvGridDistrict.scenario_variation == scenario_var_name, ) .group_by(EgonEvMvGridDistrict.bus_id) ) mvgds_with_ev = ( pd.read_sql(query.statement, query.session.bind, index_col=None) .bus_id.sort_values() .to_list() ) # Load model components with db.session_scope() as session: query = ( session.query( EgonPfHvLink.bus0.label("mvgd_bus_id"), EgonPfHvLoad.bus.label("emob_bus_id"), EgonPfHvLoad.load_id.label("load_id"), EgonPfHvStore.store_id.label("store_id"), ) .select_from(EgonPfHvLoad, EgonPfHvStore) .join( EgonPfHvLoadTimeseries, EgonPfHvLoadTimeseries.load_id == EgonPfHvLoad.load_id, ) .join( EgonPfHvStoreTimeseries, EgonPfHvStoreTimeseries.store_id == EgonPfHvStore.store_id, ) .filter( EgonPfHvLoad.carrier == "land transport EV", EgonPfHvLoad.scn_name == scenario_name, EgonPfHvLoadTimeseries.scn_name == scenario_name, EgonPfHvStore.carrier == "battery storage", EgonPfHvStore.scn_name == scenario_name, EgonPfHvStoreTimeseries.scn_name == scenario_name, EgonPfHvLink.scn_name == scenario_name, EgonPfHvLink.bus1 == EgonPfHvLoad.bus, EgonPfHvLink.bus1 == EgonPfHvStore.bus, ) ) model_components = pd.read_sql( query.statement, query.session.bind, index_col=None ) # Check number of buses with model components connected mvgd_buses_with_ev = model_components.loc[ model_components.mvgd_bus_id.isin(mvgds_with_ev) ] np.testing.assert_equal( len(mvgds_with_ev), len(mvgd_buses_with_ev), err_msg=( f"Number of Grid Districts with connected model components " f"({str(len(mvgd_buses_with_ev))} in tables egon_etrago_*) " f"differ from number of Grid Districts that got EVs " f"allocated ({len(mvgds_with_ev)} in table " f"{EgonEvMvGridDistrict.__table__})." ), ) # Check if all required components exist (if no id is NaN) np.testing.assert_equal( model_components.drop_duplicates().isna().any().any(), False, err_msg=( f"Some components are missing (see True values): " f"{model_components.drop_duplicates().isna().any()}" ), ) # Get all model timeseries print(" Loading model timeseries...") # Get all model timeseries model_ts_dict = { "Load": { "carrier": "land transport EV", "table": EgonPfHvLoad, "table_ts": EgonPfHvLoadTimeseries, "column_id": "load_id", "columns_ts": ["p_set"], "ts": None, }, "Link": { "carrier": "BEV charger", "table": EgonPfHvLink, "table_ts": EgonPfHvLinkTimeseries, "column_id": "link_id", "columns_ts": ["p_max_pu"], "ts": None, }, "Store": { "carrier": "battery storage", "table": EgonPfHvStore, "table_ts": EgonPfHvStoreTimeseries, "column_id": "store_id", "columns_ts": ["e_min_pu", "e_max_pu"], "ts": None, }, } with db.session_scope() as session: for node, attrs in model_ts_dict.items(): print(f" Loading {node} timeseries...") subquery = ( session.query(getattr(attrs["table"], attrs["column_id"])) .filter(attrs["table"].carrier == attrs["carrier"]) .filter(attrs["table"].scn_name == scenario_name) .subquery() ) cols = [ getattr(attrs["table_ts"], c) for c in attrs["columns_ts"] ] query = session.query( getattr(attrs["table_ts"], attrs["column_id"]), *cols ).filter( getattr(attrs["table_ts"], attrs["column_id"]).in_( subquery ), attrs["table_ts"].scn_name == scenario_name, ) attrs["ts"] = pd.read_sql( query.statement, query.session.bind, index_col=attrs["column_id"], ) # Check if all timeseries have 8760 steps print(" Checking timeranges...") for node, attrs in model_ts_dict.items(): for col in attrs["columns_ts"]: ts = attrs["ts"] invalid_ts = ts.loc[ts[col].apply(lambda _: len(_)) != 8760][ col ].apply(len) np.testing.assert_equal( len(invalid_ts), 0, err_msg=( f"{str(len(invalid_ts))} rows in timeseries do not " f"have 8760 timesteps. Table: " f"{attrs['table_ts'].__table__}, Column: {col}, IDs: " f"{str(list(invalid_ts.index))}" ), ) # Compare total energy demand in model with some approximate values # (per EV: 14,000 km/a, 0.17 kWh/km) print(" Checking energy demand in model...") total_energy_model = ( model_ts_dict["Load"]["ts"].p_set.apply(lambda _: sum(_)).sum() / 1e6 ) print(f" Total energy amount in model: {total_energy_model} TWh") total_energy_scenario_approx = ev_count_alloc * 14000 * 0.17 / 1e9 print( f" Total approximated energy amount in scenario: " f"{total_energy_scenario_approx} TWh" ) np.testing.assert_allclose( total_energy_model, total_energy_scenario_approx, rtol=0.1, err_msg=( "The total energy amount in the model deviates heavily " "from the approximated value for current scenario." ), ) # Compare total storage capacity print(" Checking storage capacity...") # Load storage capacities from model with db.session_scope() as session: query = session.query( func.sum(EgonPfHvStore.e_nom).label("e_nom") ).filter( EgonPfHvStore.scn_name == scenario_name, EgonPfHvStore.carrier == "battery storage", ) storage_capacity_model = ( pd.read_sql( query.statement, query.session.bind, index_col=None ).e_nom.sum() / 1e3 ) print( f" Total storage capacity ({EgonPfHvStore.__table__}): " f"{round(storage_capacity_model, 1)} GWh" ) # Load occurences of each EV with db.session_scope() as session: query = ( session.query( EgonEvMvGridDistrict.bus_id, EgonEvPool.type, func.count(EgonEvMvGridDistrict.egon_ev_pool_ev_id).label( "count" ), ) .join( EgonEvPool, EgonEvPool.ev_id == EgonEvMvGridDistrict.egon_ev_pool_ev_id, ) .filter( EgonEvMvGridDistrict.scenario == scenario_name, EgonEvMvGridDistrict.scenario_variation == scenario_var_name, EgonEvPool.scenario == scenario_name, ) .group_by(EgonEvMvGridDistrict.bus_id, EgonEvPool.type) ) count_per_ev_all = pd.read_sql( query.statement, query.session.bind, index_col="bus_id" ) count_per_ev_all["bat_cap"] = count_per_ev_all.type.map( meta_tech_data.battery_capacity ) count_per_ev_all["bat_cap_total_MWh"] = ( count_per_ev_all["count"] * count_per_ev_all.bat_cap / 1e3 ) storage_capacity_simbev = count_per_ev_all.bat_cap_total_MWh.div( 1e3 ).sum() print( f" Total storage capacity (simBEV): " f"{round(storage_capacity_simbev, 1)} GWh" ) np.testing.assert_allclose( storage_capacity_model, storage_capacity_simbev, rtol=0.01, err_msg=( "The total storage capacity in the model deviates heavily " "from the input data provided by simBEV for current scenario." ), ) # Check SoC storage constraint: e_min_pu < e_max_pu for all timesteps print(" Validating SoC constraints...") stores_with_invalid_soc = [] for idx, row in model_ts_dict["Store"]["ts"].iterrows(): ts = row[["e_min_pu", "e_max_pu"]] x = np.array(ts.e_min_pu) > np.array(ts.e_max_pu) if x.any(): stores_with_invalid_soc.append(idx) np.testing.assert_equal( len(stores_with_invalid_soc), 0, err_msg=( f"The store constraint e_min_pu < e_max_pu does not apply " f"for some storages in {EgonPfHvStoreTimeseries.__table__}. " f"Invalid store_ids: {stores_with_invalid_soc}" ), ) def check_model_data_lowflex_eGon2035(): # TODO: Add eGon100RE_lowflex print("") print("SCENARIO: eGon2035_lowflex") # Compare driving load and charging load print(" Loading eGon2035 model timeseries: driving load...") with db.session_scope() as session: query = ( session.query( EgonPfHvLoad.load_id, EgonPfHvLoadTimeseries.p_set, ) .join( EgonPfHvLoadTimeseries, EgonPfHvLoadTimeseries.load_id == EgonPfHvLoad.load_id, ) .filter( EgonPfHvLoad.carrier == "land transport EV", EgonPfHvLoad.scn_name == "eGon2035", EgonPfHvLoadTimeseries.scn_name == "eGon2035", ) ) model_driving_load = pd.read_sql( query.statement, query.session.bind, index_col=None ) driving_load = np.array(model_driving_load.p_set.to_list()).sum(axis=0) print( " Loading eGon2035_lowflex model timeseries: dumb charging " "load..." ) with db.session_scope() as session: query = ( session.query( EgonPfHvLoad.load_id, EgonPfHvLoadTimeseries.p_set, ) .join( EgonPfHvLoadTimeseries, EgonPfHvLoadTimeseries.load_id == EgonPfHvLoad.load_id, ) .filter( EgonPfHvLoad.carrier == "land transport EV", EgonPfHvLoad.scn_name == "eGon2035_lowflex", EgonPfHvLoadTimeseries.scn_name == "eGon2035_lowflex", ) ) model_charging_load_lowflex = pd.read_sql( query.statement, query.session.bind, index_col=None ) charging_load = np.array( model_charging_load_lowflex.p_set.to_list() ).sum(axis=0) # Ratio of driving and charging load should be 0.9 due to charging # efficiency print(" Compare cumulative loads...") print(f" Driving load (eGon2035): {driving_load.sum() / 1e6} TWh") print( f" Dumb charging load (eGon2035_lowflex): " f"{charging_load.sum() / 1e6} TWh" ) driving_load_theoretical = ( float(meta_run_config.eta_cp) * charging_load.sum() ) np.testing.assert_allclose( driving_load.sum(), driving_load_theoretical, rtol=0.01, err_msg=( f"The driving load (eGon2035) deviates by more than 1% " f"from the theoretical driving load calculated from charging " f"load (eGon2035_lowflex) with an efficiency of " f"{float(meta_run_config.eta_cp)}." ), ) print("=====================================================") print("=== SANITY CHECKS FOR MOTORIZED INDIVIDUAL TRAVEL ===") print("=====================================================") for scenario_name in ["eGon2035", "eGon100RE"]: scenario_var_name = DATASET_CFG["scenario"]["variation"][scenario_name] print("") print(f"SCENARIO: {scenario_name}, VARIATION: {scenario_var_name}") # Load scenario params for scenario and scenario variation scenario_variation_parameters = get_sector_parameters( "mobility", scenario=scenario_name )["motorized_individual_travel"][scenario_var_name] # Load simBEV run config and tech data meta_run_config = read_simbev_metadata_file( scenario_name, "config" ).loc["basic"] meta_tech_data = read_simbev_metadata_file(scenario_name, "tech_data") print("") print("Checking EV counts...") ev_count_alloc = check_ev_allocation() print("") print("Checking trip data...") check_trip_data() print("") print("Checking model data...") check_model_data() print("") check_model_data_lowflex_eGon2035() print("=====================================================")
[docs]def sanitycheck_home_batteries(): # get constants constants = config.datasets()["home_batteries"]["constants"] scenarios = constants["scenarios"] cbat_pbat_ratio = get_cbat_pbat_ratio() sources = config.datasets()["home_batteries"]["sources"] targets = config.datasets()["home_batteries"]["targets"] for scenario in scenarios: # get home battery capacity per mv grid id sql = f""" SELECT el_capacity as p_nom, bus_id FROM {sources["storage"]["schema"]} .{sources["storage"]["table"]} WHERE carrier = 'home_battery' AND scenario = '{scenario}' """ home_batteries_df = db.select_dataframe(sql, index_col="bus_id") home_batteries_df = home_batteries_df.assign( capacity=home_batteries_df.p_nom * cbat_pbat_ratio ) sql = f""" SELECT * FROM {targets["home_batteries"]["schema"]} .{targets["home_batteries"]["table"]} WHERE scenario = '{scenario}' """ home_batteries_buildings_df = db.select_dataframe( sql, index_col="index" ) df = ( home_batteries_buildings_df[["bus_id", "p_nom", "capacity"]] .groupby("bus_id") .sum() ) assert (home_batteries_df.round(6) == df.round(6)).all().all()
[docs]def sanitycheck_dsm(): def df_from_series(s: pd.Series): return pd.DataFrame.from_dict(dict(zip(s.index, s.values))) for scenario in ["eGon2035", "eGon100RE"]: # p_min and p_max sql = f""" SELECT link_id, bus0 as bus, p_nom FROM grid.egon_etrago_link WHERE carrier = 'dsm' AND scn_name = '{scenario}' ORDER BY link_id """ meta_df = db.select_dataframe(sql, index_col="link_id") link_ids = str(meta_df.index.tolist())[1:-1] sql = f""" SELECT link_id, p_min_pu, p_max_pu FROM grid.egon_etrago_link_timeseries WHERE scn_name = '{scenario}' AND link_id IN ({link_ids}) ORDER BY link_id """ ts_df = db.select_dataframe(sql, index_col="link_id") p_max_df = df_from_series(ts_df.p_max_pu).mul(meta_df.p_nom) p_min_df = df_from_series(ts_df.p_min_pu).mul(meta_df.p_nom) p_max_df.columns = meta_df.bus.tolist() p_min_df.columns = meta_df.bus.tolist() targets = config.datasets()["DSM_CTS_industry"]["targets"] tables = [ "cts_loadcurves_dsm", "ind_osm_loadcurves_individual_dsm", "demandregio_ind_sites_dsm", "ind_sites_loadcurves_individual", ] df_list = [] for table in tables: target = targets[table] sql = f""" SELECT bus, p_nom, e_nom, p_min_pu, p_max_pu, e_max_pu, e_min_pu FROM {target["schema"]}.{target["table"]} WHERE scn_name = '{scenario}' ORDER BY bus """ df_list.append(db.select_dataframe(sql)) individual_ts_df = pd.concat(df_list, ignore_index=True) groups = individual_ts_df[["bus"]].reset_index().groupby("bus").groups individual_p_max_df = df_from_series(individual_ts_df.p_max_pu).mul( individual_ts_df.p_nom ) individual_p_max_df = pd.DataFrame( [ individual_p_max_df[idxs].sum(axis=1) for idxs in groups.values() ], index=groups.keys(), ).T individual_p_min_df = df_from_series(individual_ts_df.p_min_pu).mul( individual_ts_df.p_nom ) individual_p_min_df = pd.DataFrame( [ individual_p_min_df[idxs].sum(axis=1) for idxs in groups.values() ], index=groups.keys(), ).T assert np.isclose(p_max_df, individual_p_max_df).all() assert np.isclose(p_min_df, individual_p_min_df).all() # e_min and e_max sql = f""" SELECT store_id, bus, e_nom FROM grid.egon_etrago_store WHERE carrier = 'dsm' AND scn_name = '{scenario}' ORDER BY store_id """ meta_df = db.select_dataframe(sql, index_col="store_id") store_ids = str(meta_df.index.tolist())[1:-1] sql = f""" SELECT store_id, e_min_pu, e_max_pu FROM grid.egon_etrago_store_timeseries WHERE scn_name = '{scenario}' AND store_id IN ({store_ids}) ORDER BY store_id """ ts_df = db.select_dataframe(sql, index_col="store_id") e_max_df = df_from_series(ts_df.e_max_pu).mul(meta_df.e_nom) e_min_df = df_from_series(ts_df.e_min_pu).mul(meta_df.e_nom) e_max_df.columns = meta_df.bus.tolist() e_min_df.columns = meta_df.bus.tolist() individual_e_max_df = df_from_series(individual_ts_df.e_max_pu).mul( individual_ts_df.e_nom ) individual_e_max_df = pd.DataFrame( [ individual_e_max_df[idxs].sum(axis=1) for idxs in groups.values() ], index=groups.keys(), ).T individual_e_min_df = df_from_series(individual_ts_df.e_min_pu).mul( individual_ts_df.e_nom ) individual_e_min_df = pd.DataFrame( [ individual_e_min_df[idxs].sum(axis=1) for idxs in groups.values() ], index=groups.keys(), ).T assert np.isclose(e_max_df, individual_e_max_df).all() assert np.isclose(e_min_df, individual_e_min_df).all()