Source code for egon.data.datasets.emobility.motorized_individual_travel_charging_infrastructure.use_cases

"""
Functions related to the four different use cases
"""
from __future__ import annotations

from loguru import logger
import geopandas as gpd
import numpy as np
import pandas as pd

from egon.data import config

DATASET_CFG = config.datasets()["charging_infrastructure"]


[docs]def hpc(hpc_points: gpd.GeoDataFrame, uc_dict: dict) -> gpd.GeoDataFrame: """ Calculate placements and energy distribution for use case hpc. :param hpc_points: gpd.GeoDataFrame GeoDataFrame of possible hpc locations :param uc_dict: dict contains basic run info like region boundary and save directory """ uc_id = "hpc" logger.debug(f"Use case: {uc_id}") num_hpc = 10**6 energy_sum = 1 # filter hpc points by region in_region_bool = hpc_points["geometry"].within(uc_dict["region"].iat[0]) in_region = hpc_points.loc[in_region_bool] if "has_hpc" in in_region.columns: in_region = in_region.loc[in_region["has_hpc"]] cols = [ "geometry", "hpc_count", "potential", "new_hpc_index", "new_hpc_tag", ] in_region = in_region[cols] # select all hpc points tagged 0 (all registered points) real_mask = in_region["new_hpc_tag"] == 0 real_in_region = in_region.loc[real_mask] num_hpc_real = real_in_region["hpc_count"].sum() if num_hpc_real < num_hpc: sim_in_region = in_region.loc[~real_mask] sim_in_region = sim_in_region.loc[in_region["new_hpc_index"] > 0] sim_in_region_sorted = sim_in_region.sort_values( "potential", ascending=False ) additional_hpc = int( min(num_hpc - num_hpc_real, len(sim_in_region.index)) ) selected_hpc = sim_in_region_sorted.iloc[:additional_hpc] real_in_region = pd.concat([real_in_region, selected_hpc]) if not len(real_in_region.index): logger.warning( f"No potential charging points found in region {uc_dict['key']}!" ) else: real_in_region["potential"] = ( real_in_region["potential"] * real_in_region["hpc_count"] ) total_potential = real_in_region["potential"].sum() real_in_region = real_in_region.assign( share=real_in_region["potential"] / total_potential ).round(6) real_in_region["exists"] = real_in_region["new_hpc_tag"] == 0 # outputs logger.debug( f"{round(energy_sum, 1)} kWh got fastcharged in region {uc_dict['key']}." ) return gpd.GeoDataFrame(real_in_region)
[docs]def public( public_points: gpd.GeoDataFrame, public_data: gpd.GeoDataFrame, uc_dict: dict, ) -> gpd.GeoDataFrame: """ Calculate placements and energy distribution for use case hpc. :param public_points: gpd.GeoDataFrame existing public charging points :param public_data: gpd.GeoDataFrame clustered POI :param uc_dict: dict contains basic run info like region boundary and save directory """ uc_id = "public" logger.debug(f"Use case: {uc_id}") num_public = 10**6 energy_sum = 1 # filter hpc points by region in_region_bool = public_points["geometry"].within(uc_dict["region"].iat[0]) in_region = public_points.loc[in_region_bool] poi_in_region_bool = public_data["geometry"].within( uc_dict["region"].iat[0] ) poi_in_region = public_data.loc[poi_in_region_bool] num_public_real = in_region["count"].sum() # match with clusters anyway (for weights) region_points, region_poi = match_existing_points(in_region, poi_in_region) region_points["exists"] = True if num_public_real < num_public: additional_public = num_public - num_public_real # distribute additional public points via POI add_points = distribute_by_poi(region_poi, additional_public) region_points = pd.concat([region_points, add_points]) region_points["energy"] = ( region_points["potential"] / region_points["potential"].sum() * energy_sum ) # outputs logger.debug( f"{round(energy_sum, 1)} kWh got charged in region {uc_dict['key']}." ) return gpd.GeoDataFrame(region_points, crs=public_points.crs)
[docs]def distribute_by_poi(region_poi: gpd.GeoDataFrame, num_points: int | float): # sort clusters without existing points by weight, then choose highest region_poi = region_poi.copy() region_poi.sort_values("potential", inplace=True, ascending=False) num_points = int(min(num_points, len(region_poi.index))) # choose point in cluster that is closest to big street return region_poi.iloc[:num_points]
[docs]def match_existing_points( region_points: gpd.GeoDataFrame, region_poi: gpd.GeoDataFrame ): region_poi = region_poi.assign(exists=False) poi_buffer = region_poi.buffer(region_poi["radius"].astype(int)) region_points = region_points.assign(potential=0) for i in region_points.index: lis_point = region_points.at[i, "geometry"] cluster = poi_buffer.contains(lis_point) clusters = region_poi.loc[cluster] num_clusters = len(clusters.index) if num_clusters == 0: # decent average as fallback region_points.at[i, "potential"] = 5 elif num_clusters == 1: region_points.at[i, "potential"] = clusters["potential"] region_poi.loc[cluster, "exists"] = True elif num_clusters > 1: # choose cluster with closest Point dist = clusters.distance(lis_point) idx = dist.idxmin() region_poi.at[idx, "exists"] = True region_points.at[i, "potential"] = clusters.at[idx, "potential"] # delete all clusters with exists = True region_poi = region_poi.loc[~region_poi["exists"]] return region_points, region_poi
[docs]def home( home_data: gpd.GeoDataFrame, uc_dict: dict, ) -> gpd.GeoDataFrame: """ Calculate placements and energy distribution for use case hpc. :param home_data: gpd.GeoDataFrame info about house types :param uc_dict: dict contains basic run info like region boundary and save directory """ uc_id = "home" logger.debug(f"Use case: {uc_id}") num_home = 1000000 energy_sum = 1 # filter houses by region in_region_bool = home_data["geometry"].within(uc_dict["region"].iat[0]) in_region = home_data.loc[in_region_bool] if in_region.empty: return in_region in_region = in_region.assign( num=in_region["num"].fillna(value=0), num_mfh=in_region["num_mfh"].fillna(value=0), ) potential = apportion_home(in_region, num_home, uc_dict) in_region["charge_spots"] = potential in_region = in_region.loc[in_region["charge_spots"] > 0] in_region["energy"] = energy_sum * in_region["charge_spots"] / num_home in_region = in_region.sort_values(by="energy", ascending=False) logger.debug( f"{round(energy_sum, 1)} kWh got charged in region {uc_dict['key']}." ) return gpd.GeoDataFrame(in_region, crs=home_data.crs)
[docs]def apportion_home(home_df: pd.DataFrame, num_spots: int, config: dict): # use parameters to set number of possible charge spots per row home_df["num_available"] = home_df[["num", "num_mfh"]].apply( home_charge_spots, axis=1, raw=True, args=(config,) ) # if too many spots need to be placed, every house gets a spot if num_spots >= home_df["num_available"].sum(): logger.debug( f"All private home spots have been filled. Leftover: " f"{num_spots - home_df['num_available'].sum()}" ) return home_df.loc[:, "num_available"] # distribute charge points based on houses per square samples = home_df.sample( num_spots, weights="num_available", random_state=1, replace=True ) result = pd.Series([0] * len(home_df.index), index=home_df.index) for i in samples.index: result.at[i] += 1 return result
[docs]def home_charge_spots(house_array: pd.Series | np.array, config: dict): # take number of houses, random seed, average spots per house and share of houses # with possible spots sfh = ( house_array[0] * config["sfh_avg_spots"] * max(config["random_seed"].normal(config["sfh_available"], 0.1), 0) ) mfh = ( house_array[1] * config["mfh_avg_spots"] * max(config["random_seed"].normal(config["mfh_available"], 0.1), 0) ) return round(sfh + mfh)
[docs]def work( landuse: gpd.GeoDataFrame, weights_dict: dict, uc_dict: dict, ) -> gpd.GeoDataFrame: """ Calculate placements and energy distribution for use case hpc. :param landuse: gpd.GeoDataFrame work areas by land use :param weights_dict: dict weights for different land use types :param uc_dict: dict contains basic run info like region boundary and save directory """ uc_id = "work" logger.debug(f"Use case: {uc_id}") energy_sum = 1 in_region_bool = landuse.within(uc_dict["region"].iat[0]) in_region = landuse[in_region_bool] # calculating the area of polygons in_region = in_region.assign(area=in_region["geometry"].area / 10**6) groups = in_region.groupby("landuse") group_labels = ["retail", "commercial", "industrial"] srid = DATASET_CFG["original_data"]["sources"]["tracbev"]["srid"] result = gpd.GeoDataFrame( columns=["geometry", "landuse", "potential"], crs=f"EPSG:{srid}" ) for g in group_labels: if g in groups.groups: group = groups.get_group(g) group = group.assign( potential=group["geometry"].area * weights_dict[g] ) group.to_crs(srid) result = gpd.GeoDataFrame( pd.concat([result, group]), crs=f"EPSG:{srid}" ) result["energy"] = ( result["potential"] * energy_sum / result["potential"].sum() ) # outputs logger.debug( f"{round(energy_sum, 1)} kWh got charged in region {uc_dict['key']}." ) return gpd.GeoDataFrame(result, crs=landuse.crs)