"""
Generate timeseries for eTraGo and pypsa-eur-sec
Call order
* generate_model_data_eGon2035() / generate_model_data_eGon100RE()
* generate_model_data()
* generate_model_data_grid_district()
* load_evs_trips()
* data_preprocessing()
* generate_load_time_series()
* write_model_data_to_db()
Notes
-----
# TODO REWORK
Share of EV with access to private charging infrastructure (`flex_share`) for
use cases work and home are not supported by simBEV v0.1.2 and are applied here
(after simulation). Applying those fixed shares post-simulation introduces
small errors compared to application during simBEV's trip generation.
Values (cf. `flex_share` in scenario parameters
:func:`egon.data.datasets.scenario_parameters.parameters.mobility`) were
linearly extrapolated based upon
https://nationale-leitstelle.de/wp-content/pdf/broschuere-lis-2025-2030-final.pdf
(p.92):
* eGon2035: home=0.8, work=1.0
* eGon100RE: home=1.0, work=1.0
"""
from collections import Counter
from pathlib import Path
import datetime as dt
import json
from sqlalchemy.sql import func
import numpy as np
import pandas as pd
from egon.data import db
from egon.data.datasets.emobility.motorized_individual_travel.db_classes import ( # noqa: E501
EgonEvMvGridDistrict,
EgonEvPool,
EgonEvTrip,
)
from egon.data.datasets.emobility.motorized_individual_travel.helpers import (
DATASET_CFG,
MVGD_MIN_COUNT,
WORKING_DIR,
read_simbev_metadata_file,
reduce_mem_usage,
)
from egon.data.datasets.etrago_setup import (
EgonPfHvBus,
EgonPfHvLink,
EgonPfHvLinkTimeseries,
EgonPfHvLoad,
EgonPfHvLoadTimeseries,
EgonPfHvStore,
EgonPfHvStoreTimeseries,
)
from egon.data.datasets.mv_grid_districts import MvGridDistricts
[docs]def data_preprocessing(
scenario_data: pd.DataFrame, ev_data_df: pd.DataFrame
) -> pd.DataFrame:
"""Filter SimBEV data to match region requirements. Duplicates profiles
if necessary. Pre-calculates necessary parameters for the load time series.
Parameters
----------
scenario_data : pd.Dataframe
EV per grid district
ev_data_df : pd.Dataframe
Trip data
Returns
-------
pd.Dataframe
Trip data
"""
# get ev data for given profiles
ev_data_df = ev_data_df.loc[
ev_data_df.ev_id.isin(scenario_data.ev_id.unique())
]
# drop faulty data
ev_data_df = ev_data_df.loc[ev_data_df.park_start <= ev_data_df.park_end]
# calculate time necessary to fulfill the charging demand and brutto
# charging capacity in MVA
ev_data_df = ev_data_df.assign(
charging_capacity_grid_MW=(
ev_data_df.charging_capacity_grid / 10**3
),
minimum_charging_time=(
ev_data_df.charging_demand
/ ev_data_df.charging_capacity_nominal
* 4
),
location=ev_data_df.location.str.replace("/", "_"),
)
# fix driving events
ev_data_df.minimum_charging_time.fillna(0, inplace=True)
# calculate charging capacity for last timestep
(
full_timesteps,
last_timestep_share,
) = ev_data_df.minimum_charging_time.divmod(1)
full_timesteps = full_timesteps.astype(int)
ev_data_df = ev_data_df.assign(
full_timesteps=full_timesteps,
last_timestep_share=last_timestep_share,
last_timestep_charging_capacity_grid_MW=(
last_timestep_share * ev_data_df.charging_capacity_grid_MW
),
charge_end=ev_data_df.park_start + full_timesteps,
last_timestep=ev_data_df.park_start + full_timesteps,
)
# Calculate flexible charging capacity:
# only for private charging facilities at home and work
mask_work = (ev_data_df.location == "0_work") & (
ev_data_df.use_case == "work"
)
mask_home = (ev_data_df.location == "6_home") & (
ev_data_df.use_case == "home"
)
ev_data_df["flex_charging_capacity_grid_MW"] = 0
ev_data_df.loc[
mask_work | mask_home, "flex_charging_capacity_grid_MW"
] = ev_data_df.loc[mask_work | mask_home, "charging_capacity_grid_MW"]
ev_data_df["flex_last_timestep_charging_capacity_grid_MW"] = 0
ev_data_df.loc[
mask_work | mask_home, "flex_last_timestep_charging_capacity_grid_MW"
] = ev_data_df.loc[
mask_work | mask_home, "last_timestep_charging_capacity_grid_MW"
]
# Check length of timeseries
if len(ev_data_df.loc[ev_data_df.last_timestep > 35040]) > 0:
print(" Warning: Trip data exceeds 1 year and is cropped.")
# Correct last TS
ev_data_df.loc[
ev_data_df.last_timestep > 35040, "last_timestep"
] = 35040
if DATASET_CFG["model_timeseries"]["reduce_memory"]:
return reduce_mem_usage(ev_data_df)
return ev_data_df
[docs]def generate_load_time_series(
ev_data_df: pd.DataFrame,
run_config: pd.DataFrame,
scenario_data: pd.DataFrame,
) -> pd.DataFrame:
"""Calculate the load time series from the given trip data. A dumb
charging strategy is assumed where each EV starts charging immediately
after plugging it in. Simultaneously the flexible charging capacity is
calculated.
Parameters
----------
ev_data_df : pd.DataFrame
Full trip data
run_config : pd.DataFrame
simBEV metadata: run config
scenario_data : pd.Dataframe
EV per grid district
Returns
-------
pd.DataFrame
time series of the load and the flex potential
"""
# Get duplicates dict
profile_counter = Counter(scenario_data.ev_id)
# instantiate timeindex
timeindex = pd.date_range(
start=dt.datetime.fromisoformat(f"{run_config.start_date} 00:00:00"),
end=dt.datetime.fromisoformat(f"{run_config.end_date} 23:45:00")
+ dt.timedelta(minutes=int(run_config.stepsize)),
freq=f"{int(run_config.stepsize)}Min",
)
load_time_series_df = pd.DataFrame(
data=0.0,
index=timeindex,
columns=["load_time_series", "flex_time_series"],
)
load_time_series_array = np.zeros(len(load_time_series_df))
flex_time_series_array = load_time_series_array.copy()
simultaneous_plugged_in_charging_capacity = load_time_series_array.copy()
simultaneous_plugged_in_charging_capacity_flex = (
load_time_series_array.copy()
)
soc_min_absolute = load_time_series_array.copy()
soc_max_absolute = load_time_series_array.copy()
driving_load_time_series_array = load_time_series_array.copy()
columns = [
"ev_id",
"drive_start",
"drive_end",
"park_start",
"park_end",
"charge_end",
"charging_capacity_grid_MW",
"last_timestep",
"last_timestep_charging_capacity_grid_MW",
"flex_charging_capacity_grid_MW",
"flex_last_timestep_charging_capacity_grid_MW",
"soc_start",
"soc_end",
"bat_cap",
"location",
"consumption",
]
# iterate over charging events
for (
_,
ev_id,
drive_start,
drive_end,
start,
park_end,
end,
cap,
last_ts,
last_ts_cap,
flex_cap,
flex_last_ts_cap,
soc_start,
soc_end,
bat_cap,
location,
consumption,
) in ev_data_df[columns].itertuples():
ev_count = profile_counter[ev_id]
load_time_series_array[start:end] += cap * ev_count
load_time_series_array[last_ts] += last_ts_cap * ev_count
flex_time_series_array[start:end] += flex_cap * ev_count
flex_time_series_array[last_ts] += flex_last_ts_cap * ev_count
simultaneous_plugged_in_charging_capacity[start : park_end + 1] += (
cap * ev_count
)
simultaneous_plugged_in_charging_capacity_flex[
start : park_end + 1
] += (flex_cap * ev_count)
# ====================================================
# min and max SoC constraints of aggregated EV battery
# ====================================================
# (I) Preserve SoC while driving
if location == "driving":
# Full band while driving
# soc_min_absolute[drive_start:drive_end+1] +=
# soc_end * bat_cap * ev_count
#
# soc_max_absolute[drive_start:drive_end+1] +=
# soc_start * bat_cap * ev_count
# Real band (decrease SoC while driving)
soc_min_absolute[drive_start : drive_end + 1] += (
np.linspace(soc_start, soc_end, drive_end - drive_start + 2)[
1:
]
* bat_cap
* ev_count
)
soc_max_absolute[drive_start : drive_end + 1] += (
np.linspace(soc_start, soc_end, drive_end - drive_start + 2)[
1:
]
* bat_cap
* ev_count
)
# Equal distribution of driving load
if soc_start > soc_end: # reqd. for PHEV
driving_load_time_series_array[
drive_start : drive_end + 1
] += (consumption * ev_count) / (drive_end - drive_start + 1)
# (II) Fix SoC bounds while parking w/o charging
elif soc_start == soc_end:
soc_min_absolute[start : park_end + 1] += (
soc_start * bat_cap * ev_count
)
soc_max_absolute[start : park_end + 1] += (
soc_end * bat_cap * ev_count
)
# (III) Set SoC bounds at start and end of parking while charging
# for flexible and non-flexible events
elif soc_start < soc_end:
if flex_cap > 0:
# * "flex" (private charging only, band: SoC_min..SoC_max)
soc_min_absolute[start : park_end + 1] += (
soc_start * bat_cap * ev_count
)
soc_max_absolute[start : park_end + 1] += (
soc_end * bat_cap * ev_count
)
# * "flex+" (private charging only, band: 0..1)
# (IF USED: add elif with flex scenario)
# soc_min_absolute[start] += soc_start * bat_cap * ev_count
# soc_max_absolute[start] += soc_start * bat_cap * ev_count
# soc_min_absolute[park_end] += soc_end * bat_cap * ev_count
# soc_max_absolute[park_end] += soc_end * bat_cap * ev_count
# * Set SoC bounds for non-flexible charging (increase SoC while
# charging)
# (SKIP THIS PART for "flex++" (private+public charging))
elif flex_cap == 0:
soc_min_absolute[start : park_end + 1] += (
np.linspace(soc_start, soc_end, park_end - start + 1)
* bat_cap
* ev_count
)
soc_max_absolute[start : park_end + 1] += (
np.linspace(soc_start, soc_end, park_end - start + 1)
* bat_cap
* ev_count
)
# Build timeseries
load_time_series_df = load_time_series_df.assign(
load_time_series=load_time_series_array,
flex_time_series=flex_time_series_array,
simultaneous_plugged_in_charging_capacity=(
simultaneous_plugged_in_charging_capacity
),
simultaneous_plugged_in_charging_capacity_flex=(
simultaneous_plugged_in_charging_capacity_flex
),
soc_min_absolute=(soc_min_absolute / 1e3),
soc_max_absolute=(soc_max_absolute / 1e3),
driving_load_time_series=driving_load_time_series_array / 1e3,
)
# validate load timeseries
np.testing.assert_almost_equal(
load_time_series_df.load_time_series.sum() / 4,
(
ev_data_df.ev_id.apply(lambda _: profile_counter[_])
* ev_data_df.charging_demand
).sum()
/ 1000
/ float(run_config.eta_cp),
decimal=-1,
)
if DATASET_CFG["model_timeseries"]["reduce_memory"]:
return reduce_mem_usage(load_time_series_df)
return load_time_series_df
[docs]def generate_static_params(
ev_data_df: pd.DataFrame,
load_time_series_df: pd.DataFrame,
evs_grid_district_df: pd.DataFrame,
) -> dict:
"""Calculate static parameters from trip data.
* cumulative initial SoC
* cumulative battery capacity
* simultaneous plugged in charging capacity
Parameters
----------
ev_data_df : pd.DataFrame
Fill trip data
Returns
-------
dict
Static parameters
"""
max_df = (
ev_data_df[["ev_id", "bat_cap", "charging_capacity_grid_MW"]]
.groupby("ev_id")
.max()
)
# Get EV duplicates dict and weight battery capacity
max_df["bat_cap"] = max_df.bat_cap.mul(
pd.Series(Counter(evs_grid_district_df.ev_id))
)
static_params_dict = {
"store_ev_battery.e_nom_MWh": float(max_df.bat_cap.sum() / 1e3),
"link_bev_charger.p_nom_MW": float(
load_time_series_df.simultaneous_plugged_in_charging_capacity.max()
),
}
return static_params_dict
[docs]def load_evs_trips(
scenario_name: str,
evs_ids: list,
charging_events_only: bool = False,
flex_only_at_charging_events: bool = True,
) -> pd.DataFrame:
"""Load trips for EVs
Parameters
----------
scenario_name : str
Scenario name
evs_ids : list of int
IDs of EV to load the trips for
charging_events_only : bool
Load only events where charging takes place
flex_only_at_charging_events : bool
Flexibility only at charging events. If False, flexibility is provided
by plugged-in EVs even if no charging takes place.
Returns
-------
pd.DataFrame
Trip data
"""
# Select only charigung events
if charging_events_only is True:
charging_condition = EgonEvTrip.charging_demand > 0
else:
charging_condition = EgonEvTrip.charging_demand >= 0
with db.session_scope() as session:
query = (
session.query(
EgonEvTrip.egon_ev_pool_ev_id.label("ev_id"),
EgonEvTrip.location,
EgonEvTrip.use_case,
EgonEvTrip.charging_capacity_nominal,
EgonEvTrip.charging_capacity_grid,
EgonEvTrip.charging_capacity_battery,
EgonEvTrip.soc_start,
EgonEvTrip.soc_end,
EgonEvTrip.charging_demand,
EgonEvTrip.park_start,
EgonEvTrip.park_end,
EgonEvTrip.drive_start,
EgonEvTrip.drive_end,
EgonEvTrip.consumption,
EgonEvPool.type,
)
.join(
EgonEvPool, EgonEvPool.ev_id == EgonEvTrip.egon_ev_pool_ev_id
)
.filter(EgonEvTrip.egon_ev_pool_ev_id.in_(evs_ids))
.filter(EgonEvTrip.scenario == scenario_name)
.filter(EgonEvPool.scenario == scenario_name)
.filter(charging_condition)
.order_by(
EgonEvTrip.egon_ev_pool_ev_id, EgonEvTrip.simbev_event_id
)
)
trip_data = pd.read_sql(
query.statement, query.session.bind, index_col=None
).astype(
{
"ev_id": "int",
"park_start": "int",
"park_end": "int",
"drive_start": "int",
"drive_end": "int",
}
)
if flex_only_at_charging_events is True:
# ASSUMPTION: set charging cap 0 where there's no demand
# (discard other plugged-in times)
mask = trip_data.charging_demand == 0
trip_data.loc[mask, "charging_capacity_nominal"] = 0
trip_data.loc[mask, "charging_capacity_grid"] = 0
trip_data.loc[mask, "charging_capacity_battery"] = 0
return trip_data
[docs]def write_model_data_to_db(
static_params_dict: dict,
load_time_series_df: pd.DataFrame,
bus_id: int,
scenario_name: str,
run_config: pd.DataFrame,
bat_cap: pd.DataFrame,
) -> None:
"""Write all results for grid district to database
Parameters
----------
static_params_dict : dict
Static model params
load_time_series_df : pd.DataFrame
Load time series for grid district
bus_id : int
ID of grid district
scenario_name : str
Scenario name
run_config : pd.DataFrame
simBEV metadata: run config
bat_cap : pd.DataFrame
Battery capacities per EV type
Returns
-------
None
"""
def calc_initial_ev_soc(bus_id: int, scenario_name: str) -> pd.DataFrame:
"""Calculate an average initial state of charge for EVs in MV grid
district.
This is done by weighting the initial SoCs at timestep=0 with EV count
and battery capacity for each EV type.
"""
with db.session_scope() as session:
query_ev_soc = (
session.query(
EgonEvPool.type,
func.count(EgonEvTrip.egon_ev_pool_ev_id).label(
"ev_count"
),
func.avg(EgonEvTrip.soc_start).label("ev_soc_start"),
)
.select_from(EgonEvTrip)
.join(
EgonEvPool,
EgonEvPool.ev_id == EgonEvTrip.egon_ev_pool_ev_id,
)
.join(
EgonEvMvGridDistrict,
EgonEvMvGridDistrict.egon_ev_pool_ev_id
== EgonEvTrip.egon_ev_pool_ev_id,
)
.filter(
EgonEvTrip.scenario == scenario_name,
EgonEvPool.scenario == scenario_name,
EgonEvMvGridDistrict.scenario == scenario_name,
EgonEvMvGridDistrict.bus_id == bus_id,
EgonEvTrip.simbev_event_id == 0,
)
.group_by(EgonEvPool.type)
)
initial_soc_per_ev_type = pd.read_sql(
query_ev_soc.statement, query_ev_soc.session.bind, index_col="type"
)
initial_soc_per_ev_type[
"battery_capacity_sum"
] = initial_soc_per_ev_type.ev_count.multiply(bat_cap)
initial_soc_per_ev_type[
"ev_soc_start_abs"
] = initial_soc_per_ev_type.battery_capacity_sum.multiply(
initial_soc_per_ev_type.ev_soc_start
)
return (
initial_soc_per_ev_type.ev_soc_start_abs.sum()
/ initial_soc_per_ev_type.battery_capacity_sum.sum()
)
def write_to_db(write_lowflex_model: bool) -> None:
"""Write model data to eTraGo tables"""
@db.check_db_unique_violation
def write_bus(scenario_name: str) -> int:
# eMob MIT bus
emob_bus_id = db.next_etrago_id("bus")
with db.session_scope() as session:
session.add(
EgonPfHvBus(
scn_name=scenario_name,
bus_id=emob_bus_id,
v_nom=1,
carrier="Li_ion",
x=etrago_bus.x,
y=etrago_bus.y,
geom=etrago_bus.geom,
)
)
return emob_bus_id
@db.check_db_unique_violation
def write_link(scenario_name: str) -> None:
# eMob MIT link [bus_el] -> [bus_ev]
emob_link_id = db.next_etrago_id("link")
with db.session_scope() as session:
session.add(
EgonPfHvLink(
scn_name=scenario_name,
link_id=emob_link_id,
bus0=etrago_bus.bus_id,
bus1=emob_bus_id,
carrier="BEV_charger",
efficiency=float(run_config.eta_cp),
p_nom=(
load_time_series_df.simultaneous_plugged_in_charging_capacity.max() # noqa: E501
),
p_nom_extendable=False,
p_nom_min=0,
p_nom_max=np.Inf,
p_min_pu=0,
p_max_pu=1,
# p_set_fixed=0,
capital_cost=0,
marginal_cost=0,
length=0,
terrain_factor=1,
)
)
with db.session_scope() as session:
session.add(
EgonPfHvLinkTimeseries(
scn_name=scenario_name,
link_id=emob_link_id,
temp_id=1,
p_min_pu=None,
p_max_pu=(
hourly_load_time_series_df.ev_availability.to_list() # noqa: E501
),
)
)
@db.check_db_unique_violation
def write_store(scenario_name: str) -> None:
# eMob MIT store
emob_store_id = db.next_etrago_id("store")
with db.session_scope() as session:
session.add(
EgonPfHvStore(
scn_name=scenario_name,
store_id=emob_store_id,
bus=emob_bus_id,
carrier="battery_storage",
e_nom=static_params_dict["store_ev_battery.e_nom_MWh"],
e_nom_extendable=False,
e_nom_min=0,
e_nom_max=np.Inf,
e_min_pu=0,
e_max_pu=1,
e_initial=(
initial_soc_mean
* static_params_dict["store_ev_battery.e_nom_MWh"]
),
e_cyclic=False,
sign=1,
standing_loss=0,
)
)
with db.session_scope() as session:
session.add(
EgonPfHvStoreTimeseries(
scn_name=scenario_name,
store_id=emob_store_id,
temp_id=1,
e_min_pu=hourly_load_time_series_df.soc_min.to_list(),
e_max_pu=hourly_load_time_series_df.soc_max.to_list(),
)
)
@db.check_db_unique_violation
def write_load(
scenario_name: str, connection_bus_id: int, load_ts: list
) -> None:
# eMob MIT load
emob_load_id = db.next_etrago_id("load")
with db.session_scope() as session:
session.add(
EgonPfHvLoad(
scn_name=scenario_name,
load_id=emob_load_id,
bus=connection_bus_id,
carrier="land_transport_EV",
sign=-1,
)
)
with db.session_scope() as session:
session.add(
EgonPfHvLoadTimeseries(
scn_name=scenario_name,
load_id=emob_load_id,
temp_id=1,
p_set=load_ts,
)
)
# Get eTraGo substation bus
with db.session_scope() as session:
query = session.query(
EgonPfHvBus.scn_name,
EgonPfHvBus.bus_id,
EgonPfHvBus.x,
EgonPfHvBus.y,
EgonPfHvBus.geom,
).filter(
EgonPfHvBus.scn_name == scenario_name,
EgonPfHvBus.bus_id == bus_id,
EgonPfHvBus.carrier == "AC",
)
etrago_bus = query.first()
if etrago_bus is None:
# TODO: raise exception here!
print(
f"No AC bus found for scenario {scenario_name} "
f"with bus_id {bus_id} in table egon_etrago_bus!"
)
# Call DB writing functions for regular or lowflex scenario
# * use corresponding scenario name as defined in datasets.yml
# * no storage for lowflex scenario
# * load timeseries:
# * regular (flex): use driving load
# * lowflex: use dumb charging load
if write_lowflex_model is False:
emob_bus_id = write_bus(scenario_name=scenario_name)
write_link(scenario_name=scenario_name)
write_store(scenario_name=scenario_name)
write_load(
scenario_name=scenario_name,
connection_bus_id=emob_bus_id,
load_ts=(
hourly_load_time_series_df.driving_load_time_series.to_list() # noqa: E501
),
)
else:
# Get lowflex scenario name
lowflex_scenario_name = DATASET_CFG["scenario"]["lowflex"][
"names"
][scenario_name]
write_load(
scenario_name=lowflex_scenario_name,
connection_bus_id=etrago_bus.bus_id,
load_ts=hourly_load_time_series_df.load_time_series.to_list(),
)
def write_to_file():
"""Write model data to file (for debugging purposes)"""
results_dir = WORKING_DIR / Path("results", scenario_name, str(bus_id))
results_dir.mkdir(exist_ok=True, parents=True)
hourly_load_time_series_df[["load_time_series"]].to_csv(
results_dir / "ev_load_time_series.csv"
)
hourly_load_time_series_df[["ev_availability"]].to_csv(
results_dir / "ev_availability.csv"
)
hourly_load_time_series_df[["soc_min", "soc_max"]].to_csv(
results_dir / "ev_dsm_profile.csv"
)
static_params_dict[
"load_land_transport_ev.p_set_MW"
] = "ev_load_time_series.csv"
static_params_dict["link_bev_charger.p_max_pu"] = "ev_availability.csv"
static_params_dict["store_ev_battery.e_min_pu"] = "ev_dsm_profile.csv"
static_params_dict["store_ev_battery.e_max_pu"] = "ev_dsm_profile.csv"
file = results_dir / "ev_static_params.json"
with open(file, "w") as f:
json.dump(static_params_dict, f, indent=4)
print(" Writing model timeseries...")
load_time_series_df = load_time_series_df.assign(
ev_availability=(
load_time_series_df.simultaneous_plugged_in_charging_capacity
/ static_params_dict["link_bev_charger.p_nom_MW"]
)
)
# Resample to 1h
hourly_load_time_series_df = load_time_series_df.resample("1H").agg(
{
"load_time_series": np.mean,
"flex_time_series": np.mean,
"simultaneous_plugged_in_charging_capacity": np.mean,
"simultaneous_plugged_in_charging_capacity_flex": np.mean,
"soc_min_absolute": np.min,
"soc_max_absolute": np.max,
"ev_availability": np.mean,
"driving_load_time_series": np.sum,
}
)
# Create relative SoC timeseries
hourly_load_time_series_df = hourly_load_time_series_df.assign(
soc_min=hourly_load_time_series_df.soc_min_absolute.div(
static_params_dict["store_ev_battery.e_nom_MWh"]
),
soc_max=hourly_load_time_series_df.soc_max_absolute.div(
static_params_dict["store_ev_battery.e_nom_MWh"]
),
)
hourly_load_time_series_df = hourly_load_time_series_df.assign(
soc_delta_absolute=(
hourly_load_time_series_df.soc_max_absolute
- hourly_load_time_series_df.soc_min_absolute
),
soc_delta=(
hourly_load_time_series_df.soc_max
- hourly_load_time_series_df.soc_min
),
)
# Crop hourly TS if needed
hourly_load_time_series_df = hourly_load_time_series_df[:8760]
# Create lowflex scenario?
write_lowflex_model = DATASET_CFG["scenario"]["lowflex"][
"create_lowflex_scenario"
]
# Get initial average storage SoC
initial_soc_mean = calc_initial_ev_soc(bus_id, scenario_name)
# Write to database: regular and lowflex scenario
write_to_db(write_lowflex_model=False)
print(" Writing flex scenario...")
if write_lowflex_model is True:
print(" Writing lowflex scenario...")
write_to_db(write_lowflex_model=True)
# Export to working dir if requested
if DATASET_CFG["model_timeseries"]["export_results_to_csv"]:
write_to_file()
[docs]def delete_model_data_from_db():
"""Delete all eMob MIT data from eTraGo PF tables"""
with db.session_scope() as session:
# Buses
session.query(EgonPfHvBus).filter(
EgonPfHvBus.carrier == "Li_ion"
).delete(synchronize_session=False)
# Link TS
subquery = (
session.query(EgonPfHvLink.link_id)
.filter(EgonPfHvLink.carrier == "BEV_charger")
.subquery()
)
session.query(EgonPfHvLinkTimeseries).filter(
EgonPfHvLinkTimeseries.link_id.in_(subquery)
).delete(synchronize_session=False)
# Links
session.query(EgonPfHvLink).filter(
EgonPfHvLink.carrier == "BEV_charger"
).delete(synchronize_session=False)
# Store TS
subquery = (
session.query(EgonPfHvStore.store_id)
.filter(EgonPfHvStore.carrier == "battery_storage")
.subquery()
)
session.query(EgonPfHvStoreTimeseries).filter(
EgonPfHvStoreTimeseries.store_id.in_(subquery)
).delete(synchronize_session=False)
# Stores
session.query(EgonPfHvStore).filter(
EgonPfHvStore.carrier == "battery_storage"
).delete(synchronize_session=False)
# Load TS
subquery = (
session.query(EgonPfHvLoad.load_id)
.filter(EgonPfHvLoad.carrier == "land_transport_EV")
.subquery()
)
session.query(EgonPfHvLoadTimeseries).filter(
EgonPfHvLoadTimeseries.load_id.in_(subquery)
).delete(synchronize_session=False)
# Loads
session.query(EgonPfHvLoad).filter(
EgonPfHvLoad.carrier == "land_transport_EV"
).delete(synchronize_session=False)
[docs]def load_grid_district_ids() -> pd.Series:
"""Load bus IDs of all grid districts"""
with db.session_scope() as session:
query_mvgd = session.query(MvGridDistricts.bus_id)
return pd.read_sql(
query_mvgd.statement, query_mvgd.session.bind, index_col=None
).bus_id.sort_values()
[docs]def generate_model_data_grid_district(
scenario_name: str,
evs_grid_district: pd.DataFrame,
bat_cap_dict: dict,
run_config: pd.DataFrame,
) -> tuple:
"""Generates timeseries from simBEV trip data for MV grid district
Parameters
----------
scenario_name : str
Scenario name
evs_grid_district : pd.DataFrame
EV data for grid district
bat_cap_dict : dict
Battery capacity per EV type
run_config : pd.DataFrame
simBEV metadata: run config
Returns
-------
pd.DataFrame
Model data for grid district
"""
# Load trip data
print(" Loading trips...")
trip_data = load_evs_trips(
scenario_name=scenario_name,
evs_ids=evs_grid_district.ev_id.unique(),
charging_events_only=False,
flex_only_at_charging_events=True,
)
print(" Preprocessing data...")
# Assign battery capacity to trip data
trip_data["bat_cap"] = trip_data.type.apply(lambda _: bat_cap_dict[_])
trip_data.drop(columns=["type"], inplace=True)
# Preprocess trip data
trip_data = data_preprocessing(evs_grid_district, trip_data)
# Generate load timeseries
print(" Generating load timeseries...")
load_ts = generate_load_time_series(
ev_data_df=trip_data,
run_config=run_config,
scenario_data=evs_grid_district,
)
# Generate static params
static_params = generate_static_params(
trip_data, load_ts, evs_grid_district
)
return static_params, load_ts
[docs]def generate_model_data_bunch(scenario_name: str, bunch: range) -> None:
"""Generates timeseries from simBEV trip data for a bunch of MV grid
districts.
Parameters
----------
scenario_name : str
Scenario name
bunch : list
Bunch of grid districts to generate data for, e.g. [1,2,..,100].
Note: `bunch` is NOT a list of grid districts but is used for slicing
the ordered list (by bus_id) of grid districts! This is used for
parallelization. See
:meth:`egon.data.datasets.emobility.motorized_individual_travel.MotorizedIndividualTravel.generate_model_data_tasks`
"""
# Get list of grid districts / substations for this bunch
mvgd_bus_ids = load_grid_district_ids().iloc[bunch]
# Get scenario variation name
scenario_var_name = DATASET_CFG["scenario"]["variation"][scenario_name]
print(
f"SCENARIO: {scenario_name}, "
f"SCENARIO VARIATION: {scenario_var_name}, "
f"BUNCH: {bunch[0]}-{bunch[-1]}"
)
# Load scenario params for scenario and scenario variation
# scenario_variation_parameters = get_sector_parameters(
# "mobility", scenario=scenario_name
# )["motorized_individual_travel"][scenario_var_name]
# Get substations
with db.session_scope() as session:
query = (
session.query(
EgonEvMvGridDistrict.bus_id,
EgonEvMvGridDistrict.egon_ev_pool_ev_id.label("ev_id"),
)
.filter(EgonEvMvGridDistrict.scenario == scenario_name)
.filter(
EgonEvMvGridDistrict.scenario_variation == scenario_var_name
)
.filter(EgonEvMvGridDistrict.bus_id.in_(mvgd_bus_ids))
.filter(EgonEvMvGridDistrict.egon_ev_pool_ev_id.isnot(None))
)
evs_grid_district = pd.read_sql(
query.statement, query.session.bind, index_col=None
).astype({"ev_id": "int"})
mvgd_bus_ids = evs_grid_district.bus_id.unique()
print(
f"{len(evs_grid_district)} EV loaded "
f"({len(evs_grid_district.ev_id.unique())} unique) in "
f"{len(mvgd_bus_ids)} grid districts."
)
# Get run metadata
meta_tech_data = read_simbev_metadata_file(scenario_name, "tech_data")
meta_run_config = read_simbev_metadata_file(scenario_name, "config").loc[
"basic"
]
# Generate timeseries for each MVGD
print("GENERATE MODEL DATA...")
ctr = 0
for bus_id in mvgd_bus_ids:
ctr += 1
print(
f"Processing grid district: bus {bus_id}... "
f"({ctr}/{len(mvgd_bus_ids)})"
)
(static_params, load_ts,) = generate_model_data_grid_district(
scenario_name=scenario_name,
evs_grid_district=evs_grid_district[
evs_grid_district.bus_id == bus_id
],
bat_cap_dict=meta_tech_data.battery_capacity.to_dict(),
run_config=meta_run_config,
)
write_model_data_to_db(
static_params_dict=static_params,
load_time_series_df=load_ts,
bus_id=bus_id,
scenario_name=scenario_name,
run_config=meta_run_config,
bat_cap=meta_tech_data.battery_capacity,
)
[docs]def generate_model_data_eGon2035_remaining():
"""Generates timeseries for eGon2035 scenario for grid districts which
has not been processed in the parallel tasks before.
"""
generate_model_data_bunch(
scenario_name="eGon2035",
bunch=range(MVGD_MIN_COUNT, len(load_grid_district_ids())),
)
[docs]def generate_model_data_eGon100RE_remaining():
"""Generates timeseries for eGon100RE scenario for grid districts which
has not been processed in the parallel tasks before.
"""
generate_model_data_bunch(
scenario_name="eGon100RE",
bunch=range(MVGD_MIN_COUNT, len(load_grid_district_ids())),
)