Source code for egon.data.datasets.emobility.motorized_individual_travel.ev_allocation

"""
* Calculate number of electric vehicles and allocate on different spatial
  levels: :py:func:`allocate_evs_numbers`
* Allocate specific EVs to MV grid districts:
  :py:func:`allocate_evs_to_grid_districts`

"""

from itertools import permutations

from sqlalchemy.sql import func
import numpy as np
import pandas as pd

from egon.data import db
from egon.data.datasets.emobility.motorized_individual_travel.db_classes import (
    EgonEvCountMunicipality,
    EgonEvCountMvGridDistrict,
    EgonEvCountRegistrationDistrict,
    EgonEvMvGridDistrict,
    EgonEvPool,
)
from egon.data.datasets.emobility.motorized_individual_travel.helpers import (
    COLUMNS_KBA,
    CONFIG_EV,
    TESTMODE_OFF,
    read_kba_data,
    read_rs7_data,
)
from egon.data.datasets.emobility.motorized_individual_travel.tests import (
    validate_electric_vehicles_numbers,
)
from egon.data.datasets.mv_grid_districts import MvGridDistricts
from egon.data.datasets.scenario_parameters import get_sector_parameters
from egon.data.datasets.zensus_mv_grid_districts import MapZensusGridDistricts
from egon.data.datasets.zensus_vg250 import (
    DestatisZensusPopulationPerHaInsideGermany,
    MapZensusVg250,
    Vg250Gem,
    Vg250GemPopulation,
)
import egon.data.config

RANDOM_SEED = egon.data.config.settings()["egon-data"]["--random-seed"]


[docs]def fix_missing_ags_municipality_regiostar(muns, rs7_data): """Check if all AGS of municipality dataset are included in RegioStaR7 dataset and vice versa. As of Dec 2021, some municipalities are not included int the RegioStaR7 dataset. This is mostly caused by incorporations of a municipality by another municipality. This is fixed by assigning a RS7 id from another municipality with similar AGS (most likely a neighboured one). Missing entries in the municipality dataset is printed but not fixed as it doesn't result in bad data. Nevertheless, consider to update the municipality/VG250 dataset. Parameters ---------- muns : pandas.DataFrame Municipality data rs7_data : pandas.DataFrame RegioStaR7 data Returns ------- pandas.DataFrame Fixed RegioStaR7 data """ if len(muns.ags.unique()) != len(rs7_data.ags): print( "==========> Number of AGS differ between VG250 and RS7, " "trying to fix this..." ) # Get AGS differences ags_datasets = {"RS7": rs7_data.ags, "VG250": muns.ags} ags_datasets_missing = {k: [] for k in ags_datasets.keys()} perm = permutations(ags_datasets.items()) for (name1, ags1), (name2, ags2) in perm: print(f" Checking if all AGS of {name1} are in {name2}...") missing = [_ for _ in ags1 if _ not in ags2.to_list()] if len(missing) > 0: ags_datasets_missing[name2] = missing print(f" AGS in {name1} but not in {name2}: ", missing) else: print(" OK") print("") # Try to fix for name, missing in ags_datasets_missing.items(): if len(missing) > 0: # RS7 entries missing: use RS7 number from mun with similar AGS if name == "RS7": for ags in missing: similar_entry = rs7_data[ round((rs7_data.ags.div(10))) == round(ags / 10) ].iloc[0] if len(similar_entry) > 0: print( f"Adding dataset from VG250 to RS7 " f"based upon AGS {ags}." ) similar_entry.ags = ags rs7_data = rs7_data.append(similar_entry) print("Consider to update RS7.") # VG250 entries missing: elif name == "VG250": print( "Cannot guess VG250 entries. This error does not " "result in bad data but consider to update VG250." ) if len(muns.ags.unique()) != len(rs7_data.ags): print("==========> AGS could not be fixed!") else: print("==========> AGS were fixed!") return rs7_data
[docs]def calc_evs_per_reg_district(scenario_variation_parameters, kba_data): """Calculate EVs per registration district Parameters ---------- scenario_variation_parameters : dict Parameters of scenario variation kba_data : pandas.DataFrame Vehicle registration data for registration district Returns ------- pandas.DataFrame EVs per registration district """ scenario_variation_parameters["mini_share"] = ( scenario_variation_parameters["bev_mini_share"] + scenario_variation_parameters["phev_mini_share"] ) scenario_variation_parameters["medium_share"] = ( scenario_variation_parameters["bev_medium_share"] + scenario_variation_parameters["phev_medium_share"] ) scenario_variation_parameters["luxury_share"] = ( scenario_variation_parameters["bev_luxury_share"] + scenario_variation_parameters["phev_luxury_share"] ) factor_dict = dict() factor_dict["mini_factor"] = ( scenario_variation_parameters["mini_share"] * scenario_variation_parameters["ev_count"] / kba_data.mini.sum() ) factor_dict["medium_factor"] = ( scenario_variation_parameters["medium_share"] * scenario_variation_parameters["ev_count"] / kba_data.medium.sum() ) factor_dict["luxury_factor"] = ( scenario_variation_parameters["luxury_share"] * scenario_variation_parameters["ev_count"] / kba_data.luxury.sum() ) # Define shares and factors ev_data = kba_data.copy() for tech, params in CONFIG_EV.items(): ev_data[tech] = ( ( kba_data[params["column"]] * factor_dict[params["factor"]] * scenario_variation_parameters[params["tech_share"]] / scenario_variation_parameters[params["share"]] ) .round() .astype("int") ) ev_data.drop( columns=[_ for _ in COLUMNS_KBA if _ != "reg_district"], inplace=True ) return ev_data
[docs]def calc_evs_per_municipality(ev_data, rs7_data): """Calculate EVs per municipality Parameters ---------- ev_data : pandas.DataFrame EVs per regstration district rs7_data : pandas.DataFrame RegioStaR7 data """ with db.session_scope() as session: query = session.query( Vg250GemPopulation.ags_0.label("ags"), Vg250GemPopulation.gen, Vg250GemPopulation.population_total.label("pop"), ) muns = pd.read_sql( query.statement, query.session.bind, index_col=None ).astype({"ags": "int64"}) muns["ags_district"] = ( muns.ags.multiply(1 / 1000).apply(np.floor).astype("int") ) # Manual fix of Trier-Saarburg: Introduce new `ags_reg_district` # for correct allocation of mun to registration district # (Zulassungsbezirk), see above for background. muns["ags_reg_district"] = muns["ags_district"] muns.loc[muns["ags_reg_district"] == 7235, "ags_reg_district"] = 7211 # Remove multiple municipality entries (due to 'gf' in VG250) # by summing up population muns = ( muns[["ags", "gen", "ags_reg_district", "pop"]] .groupby(["ags", "gen", "ags_reg_district"]) .sum() .reset_index() ) # Add population of registration district pop_per_reg_district = ( muns[["ags_reg_district", "pop"]] .groupby("ags_reg_district") .sum() .rename(columns={"pop": "pop_district"}) .reset_index() ) # Fix missing ags in mun data if not in testmode if TESTMODE_OFF: rs7_data = fix_missing_ags_municipality_regiostar(muns, rs7_data) # Merge municipality, EV data and pop per district ev_data_muns = muns.merge(ev_data, on="ags_reg_district").merge( pop_per_reg_district, on="ags_reg_district" ) # Disaggregate EV numbers to municipality for tech in ev_data[CONFIG_EV.keys()]: ev_data_muns[tech] = round( ev_data_muns[tech] * ev_data_muns["pop"] / ev_data_muns["pop_district"] ).astype("int") # Filter columns cols = ["ags"] cols.extend(CONFIG_EV.keys()) ev_data_muns = ev_data_muns[cols] # Merge RS7 data ev_data_muns = ev_data_muns.merge(rs7_data[["ags", "rs7_id"]], on="ags") return ev_data_muns
[docs]def calc_evs_per_grid_district(ev_data_muns): """Calculate EVs per grid district by using population weighting Parameters ---------- ev_data_muns : pandas.DataFrame EV data for municipalities Returns ------- pandas.DataFrame EV data for grid districts """ # Read MVGDs with intersecting muns and aggregate pop for each # municipality part with db.session_scope() as session: query_pop_per_mvgd = ( session.query( MvGridDistricts.bus_id, Vg250Gem.ags, func.sum( DestatisZensusPopulationPerHaInsideGermany.population ).label("pop"), ) .select_from(MapZensusGridDistricts) .join( MvGridDistricts, MapZensusGridDistricts.bus_id == MvGridDistricts.bus_id, ) .join( DestatisZensusPopulationPerHaInsideGermany, MapZensusGridDistricts.zensus_population_id == DestatisZensusPopulationPerHaInsideGermany.id, ) .join( MapZensusVg250, MapZensusGridDistricts.zensus_population_id == MapZensusVg250.zensus_population_id, ) .join( Vg250Gem, MapZensusVg250.vg250_municipality_id == Vg250Gem.id ) .group_by(MvGridDistricts.bus_id, Vg250Gem.ags) .order_by(Vg250Gem.ags) ) mvgd_pop_per_mun = pd.read_sql( query_pop_per_mvgd.statement, query_pop_per_mvgd.session.bind, index_col=None, ).astype({"bus_id": "int64", "pop": "int64", "ags": "int64"}) # Calc population share of each municipality in MVGD mvgd_pop_per_mun_in_mvgd = mvgd_pop_per_mun.groupby(["bus_id", "ags"]).agg( {"pop": "sum"} ) # Calc relative and absolute population shares: # * pop_mun_in_mvgd: pop share of mun which intersects with MVGD # * pop_share_mun_in_mvgd: relative pop share of mun which # intersects with MVGD # * pop_mun_total: total pop of mun # * pop_mun_in_mvgd_of_mun_total: relative pop share of mun which # intersects with MVGD in relation to total pop of mun mvgd_pop_per_mun_in_mvgd = ( mvgd_pop_per_mun_in_mvgd.groupby(level=0) .apply(lambda x: x / float(x.sum())) .reset_index() .rename(columns={"pop": "pop_share_mun_in_mvgd"}) .merge( mvgd_pop_per_mun_in_mvgd.reset_index(), on=["bus_id", "ags"], how="left", ) .rename(columns={"pop": "pop_mun_in_mvgd"}) .merge( mvgd_pop_per_mun[["ags", "pop"]] .groupby("ags") .agg({"pop": "sum"}), on="ags", how="left", ) .rename(columns={"pop": "pop_mun_total"}) ) mvgd_pop_per_mun_in_mvgd["pop_mun_in_mvgd_of_mun_total"] = ( mvgd_pop_per_mun_in_mvgd["pop_mun_in_mvgd"] / mvgd_pop_per_mun_in_mvgd["pop_mun_total"] ) # Merge EV data ev_data_mvgds = mvgd_pop_per_mun_in_mvgd.merge( ev_data_muns, on="ags", how="left" ).sort_values(["bus_id", "ags"]) # Calc EVs per MVGD by using EV from mun and share of mun's pop # that is located within MVGD for tech in ev_data_mvgds[CONFIG_EV.keys()]: ev_data_mvgds[tech] = ( round( ev_data_mvgds[tech] * ev_data_mvgds["pop_mun_in_mvgd_of_mun_total"] ) .fillna(0) .astype("int") ) # Set RS7 id for MVGD by using the RS7 id from the mun with the # highest share in population rs7_data_mvgds = ( ev_data_mvgds[["bus_id", "pop_mun_in_mvgd", "rs7_id"]] .groupby(["bus_id", "rs7_id"]) .sum() .sort_values( ["bus_id", "pop_mun_in_mvgd"], ascending=False, na_position="last" ) .reset_index() .drop_duplicates("bus_id", keep="first")[["bus_id", "rs7_id"]] ) # Join RS7 id and select columns columns = ["bus_id"] + [_ for _ in CONFIG_EV.keys()] ev_data_mvgds = ( ev_data_mvgds[columns] .groupby("bus_id") .agg("sum") .merge(rs7_data_mvgds, on="bus_id", how="left") ) return ev_data_mvgds
[docs]def allocate_evs_numbers(): """Allocate electric vehicles to different spatial levels. Accocation uses today's vehicles registration data per registration district from KBA and scales scenario's EV targets (BEV and PHEV) linearly using population. Furthermore, a RegioStaR7 code (BMVI) is assigned. Levels: * districts of registration * municipalities * grid districts Parameters ---------- Returns ------- """ # Import kba_data = read_kba_data() rs7_data = read_rs7_data() for scenario_name in ["eGon2035", "eGon100RE"]: # Load scenario params scenario_parameters = get_sector_parameters( "mobility", scenario=scenario_name )["motorized_individual_travel"] print(f"SCENARIO: {scenario_name}") # Go through scenario variations for ( scenario_variation_name, scenario_variation_parameters, ) in scenario_parameters.items(): print(f" SCENARIO VARIATION: {scenario_variation_name}") # Get EV target ev_target = scenario_variation_parameters["ev_count"] ##################################### # EV data per registration district # ##################################### print("Calculate EV numbers for registration districts...") ev_data = calc_evs_per_reg_district( scenario_variation_parameters, kba_data ) # Check EV results if not in testmode if TESTMODE_OFF: validate_electric_vehicles_numbers( "EV count in registration districts", ev_data, ev_target ) # Add scenario columns and write to DB ev_data["scenario"] = scenario_name ev_data["scenario_variation"] = scenario_variation_name ev_data.sort_values( ["scenario", "scenario_variation", "ags_reg_district"], inplace=True, ) ev_data.to_sql( name=EgonEvCountRegistrationDistrict.__table__.name, schema=EgonEvCountRegistrationDistrict.__table__.schema, con=db.engine(), if_exists="append", index=False, ) ##################################### # EV data per municipality # ##################################### print("Calculate EV numbers for municipalities...") ev_data_muns = calc_evs_per_municipality(ev_data, rs7_data) # Check EV results if not in testmode if TESTMODE_OFF: validate_electric_vehicles_numbers( "EV count in municipalities", ev_data_muns, ev_target ) # Add scenario columns and write to DB ev_data_muns["scenario"] = scenario_name ev_data_muns["scenario_variation"] = scenario_variation_name ev_data_muns.sort_values( ["scenario", "scenario_variation", "ags"], inplace=True ) ev_data_muns.to_sql( name=EgonEvCountMunicipality.__table__.name, schema=EgonEvCountMunicipality.__table__.schema, con=db.engine(), if_exists="append", index=False, ) ##################################### # EV data per grid district # ##################################### print("Calculate EV numbers for grid districts...") ev_data_mvgds = calc_evs_per_grid_district(ev_data_muns) # Check EV results if not in testmode if TESTMODE_OFF: validate_electric_vehicles_numbers( "EV count in grid districts", ev_data_mvgds, ev_target ) # Add scenario columns and write to DB ev_data_mvgds["scenario"] = scenario_name ev_data_mvgds["scenario_variation"] = scenario_variation_name ev_data_mvgds.sort_values( ["scenario", "scenario_variation", "bus_id"], inplace=True ) ev_data_mvgds.to_sql( name=EgonEvCountMvGridDistrict.__table__.name, schema=EgonEvCountMvGridDistrict.__table__.schema, con=db.engine(), if_exists="append", index=False, )
[docs]def allocate_evs_to_grid_districts(): """Allocate EVs to MV grid districts for all scenarios and scenario variations. Each grid district in :class:`egon.data.datasets.mv_grid_districts.MvGridDistricts` is assigned a list of electric vehicles from the EV pool in :class:`EgonEvPool` based on the RegioStar7 region and the counts per EV type in :class:`EgonEvCountMvGridDistrict`. Results are written to :class:`EgonEvMvGridDistrict`. """ def get_random_evs(row): """Get random EV sample for EV type and RS7 region""" return ( ev_pool.loc[ (ev_pool.rs7_id == row.rs7_id) & (ev_pool["type"] == row["type"]) ] .sample(row["count"], replace=True) .ev_id.to_list() ) for scenario_name in ["eGon2035", "eGon100RE"]: print(f"SCENARIO: {scenario_name}") # Load EVs per grid district print("Loading EV counts for grid districts...") with db.session_scope() as session: query = session.query(EgonEvCountMvGridDistrict).filter( EgonEvCountMvGridDistrict.scenario == scenario_name ) ev_per_mvgd = pd.read_sql( query.statement, query.session.bind, index_col=None ) # Convert EV types' wide to long format ev_per_mvgd = pd.melt( ev_per_mvgd, id_vars=["scenario", "scenario_variation", "bus_id", "rs7_id"], value_vars=CONFIG_EV.keys(), var_name="type", value_name="count", ) # Load EV pool print(" Loading EV pool...") with db.session_scope() as session: query = session.query(EgonEvPool).filter( EgonEvPool.scenario == scenario_name ) ev_pool = pd.read_sql( query.statement, query.session.bind, index_col=None, ) # Draw EVs randomly for each grid district from pool print(" Draw EVs from pool for grid districts...") np.random.seed(RANDOM_SEED) ev_per_mvgd["egon_ev_pool_ev_id"] = ev_per_mvgd.apply( get_random_evs, axis=1 ) ev_per_mvgd.drop(columns=["rs7_id", "type", "count"], inplace=True) # EV lists to rows ev_per_mvgd = ev_per_mvgd.explode("egon_ev_pool_ev_id") # Check for empty entries empty_ev_entries = ev_per_mvgd.egon_ev_pool_ev_id.isna().sum() if empty_ev_entries > 0: print("====================================================") print( f"WARNING: Found {empty_ev_entries} empty entries " f"and will remove it:" ) print(ev_per_mvgd[ev_per_mvgd.egon_ev_pool_ev_id.isna()]) ev_per_mvgd = ev_per_mvgd[~ev_per_mvgd.egon_ev_pool_ev_id.isna()] print("====================================================") # Write trips to DB print(" Writing allocated data to DB...") ev_per_mvgd.to_sql( name=EgonEvMvGridDistrict.__table__.name, schema=EgonEvMvGridDistrict.__table__.schema, con=db.engine(), if_exists="append", index=False, method="multi", chunksize=10000, ) # Check EV result sums for all scenario variations if not in testmode if TESTMODE_OFF: print(" Validating results...") ev_per_mvgd_counts_per_scn = ( ev_per_mvgd.drop(columns=["bus_id"]) .groupby(["scenario", "scenario_variation"]) .count() ) for ( scn, scn_var, ), ev_actual in ev_per_mvgd_counts_per_scn.iterrows(): scenario_parameters = get_sector_parameters( "mobility", scenario=scn )["motorized_individual_travel"] # Get EV target ev_target = scenario_parameters[scn_var]["ev_count"] np.testing.assert_allclose( int(ev_actual), ev_target, rtol=0.0001, err_msg=f"Dataset on EV numbers allocated to MVGDs " f"seems to be flawed. " f"Scenario: [{scn}], " f"Scenario variation: [{scn_var}].", )