Source code for egon.data.datasets.industry.temporal

"""The central module containing all code dealing with processing
timeseries data using demandregio

"""

from sqlalchemy.ext.declarative import declarative_base
import geopandas as gpd
import numpy as np
import pandas as pd

from egon.data import db
from egon.data.datasets.electricity_demand.temporal import calc_load_curve
import egon.data.config

Base = declarative_base()


[docs]def identify_voltage_level(df): """Identify the voltage_level of a grid component based on its peak load and defined thresholds. Parameters ---------- df : pandas.DataFrame Data frame containing information about peak loads Returns ------- pandas.DataFrame Data frame with an additional column with voltage level """ df["voltage_level"] = np.nan # Identify voltage_level for every demand area taking thresholds into # account which were defined in the eGon project df.loc[df["peak_load"] <= 0.1, "voltage_level"] = 7 df.loc[df["peak_load"] > 0.1, "voltage_level"] = 6 df.loc[df["peak_load"] > 0.2, "voltage_level"] = 5 df.loc[df["peak_load"] > 5.5, "voltage_level"] = 4 df.loc[df["peak_load"] > 20, "voltage_level"] = 3 df.loc[df["peak_load"] > 120, "voltage_level"] = 1 return df
[docs]def identify_bus(load_curves, demand_area): """Identify the grid connection point for a consumer by determining its grid level based on the time series' peak load and the spatial intersection to mv grid districts or ehv voronoi cells. Parameters ---------- load_curves : pandas.DataFrame Demand timeseries per demand area (e.g. osm landuse area, industrial site) demand_area: pandas.DataFrame Dataframe with id and geometry of areas where an industrial demand is assigned to, such as osm landuse areas or industrial sites. Returns ------- pandas.DataFrame Aggregated industrial demand timeseries per bus """ sources = egon.data.config.datasets()["electrical_load_curves_industry"][ "sources" ] # Select mv griddistrict griddistrict = db.select_geodataframe( f"""SELECT bus_id, geom FROM {sources['egon_mv_grid_district']['schema']}. {sources['egon_mv_grid_district']['table']}""", geom_col="geom", epsg=3035, ) # Initialize dataframe to identify peak load per demand area (e.g. osm # landuse area or industrial site) peak = pd.DataFrame(columns=["id", "peak_load"]) peak["id"] = load_curves.max(axis=0).index peak["peak_load"] = load_curves.max(axis=0).values peak = identify_voltage_level(peak) # Assign bus_id to demand area by merging landuse and peak df peak = pd.merge(demand_area, peak, right_on="id", left_index=True) # Identify all demand areas connected to HVMV buses peak_hv = peak[peak["voltage_level"] > 1] # Perform a spatial join between the centroid of the demand area and mv # grid districts to identify grid connection point peak_hv["centroid"] = peak_hv["geom"].centroid peak_hv = peak_hv.set_geometry("centroid") peak_hv_c = gpd.sjoin(peak_hv, griddistrict, how="inner", op="intersects") # Perform a spatial join between the polygon of the demand area and mv # grid districts to ensure every area got assign to a bus peak_hv_p = peak_hv[~peak_hv.isin(peak_hv_c)].dropna().set_geometry("geom") peak_hv_p = gpd.sjoin( peak_hv_p, griddistrict, how="inner", op="intersects" ).drop_duplicates(subset=["id"]) # Bring both dataframes together peak_bus = peak_hv_c.append(peak_hv_p, ignore_index=True) # Select ehv voronoi ehv_voronoi = db.select_geodataframe( f"""SELECT bus_id, geom FROM {sources['egon_mv_grid_district']['schema']}. {sources['egon_mv_grid_district']['table']}""", geom_col="geom", epsg=3035, ) # Identify all demand areas connected to EHV buses peak_ehv = peak[peak["voltage_level"] == 1] # Perform a spatial join between the centroid of the demand area and ehv # voronoi to identify grid connection point peak_ehv["centroid"] = peak_ehv["geom"].centroid peak_ehv = peak_ehv.set_geometry("centroid") peak_ehv = gpd.sjoin(peak_ehv, ehv_voronoi, how="inner", op="intersects") # Bring both dataframes together peak_bus = peak_bus.append(peak_ehv, ignore_index=True) # Combine dataframes to bring loadcurves and bus id together curves_da = pd.merge( load_curves.T, peak_bus[["bus_id", "id", "geom"]], left_index=True, right_on="id", ) return curves_da
[docs]def calc_load_curves_ind_osm(scenario): """Temporal disaggregate electrical demand per osm industrial landuse area. Parameters ---------- scenario : str Scenario name. Returns ------- pandas.DataFrame Demand timeseries of industry allocated to osm landuse areas and aggregated per substation id """ sources = egon.data.config.datasets()["electrical_load_curves_industry"][ "sources" ] # Select demands per industrial branch and osm landuse area demands_osm_area = db.select_dataframe( f"""SELECT osm_id, wz, demand FROM {sources['osm']['schema']}. {sources['osm']['table']} WHERE scenario = '{scenario}' AND demand > 0 """ ).set_index(["osm_id", "wz"]) # Select industrial landuse polygons as demand area demand_area = db.select_geodataframe( f"""SELECT id, geom FROM {sources['osm_landuse']['schema']}. {sources['osm_landuse']['table']} WHERE sector = 3 """, index_col="id", geom_col="geom", epsg=3035, ) # Calculate shares of industrial branches per osm area osm_share_wz = demands_osm_area.groupby("osm_id").apply( lambda grp: grp / grp.sum() ) osm_share_wz.reset_index(inplace=True) share_wz_transpose = pd.DataFrame( index=osm_share_wz.osm_id.unique(), columns=osm_share_wz.wz.unique() ) share_wz_transpose.index.rename("osm_id", inplace=True) for wz in share_wz_transpose.columns: share_wz_transpose[wz] = ( osm_share_wz[osm_share_wz.wz == wz].set_index("osm_id").demand ) # Rename columns to bring it in line with demandregio data share_wz_transpose.rename(columns={1718: 17}, inplace=True) # Calculate industrial annual demand per osm area annual_demand_osm = demands_osm_area.groupby("osm_id").demand.sum() # Return electrical load curves per osm industrial landuse area load_curves = calc_load_curve(share_wz_transpose, annual_demand_osm) curves_da = identify_bus(load_curves, demand_area) # Group all load curves per bus curves_bus = ( curves_da.drop(["id"], axis=1).fillna(0).groupby("bus_id").sum() ) # Initalize pandas.DataFrame for export to database load_ts_df = pd.DataFrame(index=curves_bus.index, columns=["p_set"]) # Insert time series data to df as an array load_ts_df.p_set = curves_bus.values.tolist() # Create Dataframe to store time series individually curves_individual_interim = ( curves_da.drop(["bus_id", "geom"], axis=1).fillna(0) ).set_index("id") curves_individual = curves_da[["id", "bus_id"]] curves_individual["p_set"] = curves_individual_interim.values.tolist() curves_individual["scn_name"] = scenario curves_individual = curves_individual.rename( columns={"id": "osm_id"} ).set_index(["osm_id", "scn_name"]) return load_ts_df, curves_individual
[docs]def insert_osm_ind_load(): """Inserts electrical industry loads assigned to osm landuse areas to the database. Returns ------- None. """ targets = egon.data.config.datasets()["electrical_load_curves_industry"][ "targets" ] for scenario in ["eGon2021", "eGon2035", "eGon100RE"]: # Delete existing data from database db.execute_sql( f""" DELETE FROM {targets['osm_load']['schema']}.{targets['osm_load']['table']} WHERE scn_name = '{scenario}' """ ) db.execute_sql( f""" DELETE FROM {targets['osm_load_individual']['schema']}. {targets['osm_load_individual']['table']} WHERE scn_name = '{scenario}' """ ) # Calculate cts load curves per mv substation (hvmv bus) data, curves_individual = calc_load_curves_ind_osm(scenario) data.index = data.index.rename("bus") data["scn_name"] = scenario data.set_index(["scn_name"], inplace=True, append=True) # Insert into database data.to_sql( targets["osm_load"]["table"], schema=targets["osm_load"]["schema"], con=db.engine(), if_exists="append", ) curves_individual["peak_load"] = np.array( curves_individual["p_set"].values.tolist() ).max(axis=1) curves_individual["demand"] = np.array( curves_individual["p_set"].values.tolist() ).sum(axis=1) curves_individual = identify_voltage_level(curves_individual) curves_individual.to_sql( targets["osm_load_individual"]["table"], schema=targets["osm_load_individual"]["schema"], con=db.engine(), if_exists="append", )
[docs]def calc_load_curves_ind_sites(scenario): """Temporal disaggregation of load curves per industrial site and industrial subsector. Parameters ---------- scenario : str Scenario name. Returns ------- pandas.DataFrame Demand timeseries of industry allocated to industrial sites and aggregated per substation id and industrial subsector """ sources = egon.data.config.datasets()["electrical_load_curves_industry"][ "sources" ] # Select demands per industrial site including the subsector information demands_ind_sites = db.select_dataframe( f"""SELECT industrial_sites_id, wz, demand FROM {sources['sites']['schema']}. {sources['sites']['table']} WHERE scenario = '{scenario}' AND demand > 0 """ ).set_index(["industrial_sites_id"]) # Select industrial sites as demand_areas from database demand_area = db.select_geodataframe( f"""SELECT id, geom FROM {sources['sites_geom']['schema']}. {sources['sites_geom']['table']}""", index_col="id", geom_col="geom", epsg=3035, ) # Replace entries to bring it in line with demandregio's subsector # definitions demands_ind_sites.replace(1718, 17, inplace=True) share_wz_sites = demands_ind_sites.copy() # Create additional df on wz_share per industrial site, which is always # set to one as the industrial demand per site is subsector specific share_wz_sites.demand = 1 share_wz_sites.reset_index(inplace=True) share_transpose = pd.DataFrame( index=share_wz_sites.industrial_sites_id.unique(), columns=share_wz_sites.wz.unique(), ) share_transpose.index.rename("industrial_sites_id", inplace=True) for wz in share_transpose.columns: share_transpose[wz] = ( share_wz_sites[share_wz_sites.wz == wz] .set_index("industrial_sites_id") .demand ) load_curves = calc_load_curve(share_transpose, demands_ind_sites["demand"]) curves_da = identify_bus(load_curves, demand_area) curves_da = pd.merge( curves_da, demands_ind_sites.wz, left_on="id", right_index=True ) # Group all load curves per bus and wz curves_bus = ( curves_da.fillna(0) .groupby(["bus_id", "wz"]) .sum() .drop(["id"], axis=1) ) # Initalize pandas.DataFrame for pf table load timeseries load_ts_df = pd.DataFrame(index=curves_bus.index, columns=["p_set"]) # Insert data for pf load timeseries table load_ts_df.p_set = curves_bus.values.tolist() # Create Dataframe to store time series individually curves_individual_interim = ( curves_da.drop(["bus_id", "geom", "wz"], axis=1).fillna(0) ).set_index("id") curves_individual = curves_da[["id", "bus_id"]] curves_individual["p_set"] = curves_individual_interim.values.tolist() curves_individual["scn_name"] = scenario curves_individual = curves_individual.merge( curves_da[["wz", "id"]], left_on="id", right_on="id" ) curves_individual = curves_individual.rename( columns={"id": "site_id"} ).set_index(["site_id", "scn_name"]) return load_ts_df, curves_individual
[docs]def insert_sites_ind_load(): """Inserts electrical industry loads assigned to osm landuse areas to the database. Returns ------- None. """ targets = egon.data.config.datasets()["electrical_load_curves_industry"][ "targets" ] for scenario in ["eGon2021", "eGon2035", "eGon100RE"]: # Delete existing data from database db.execute_sql( f""" DELETE FROM {targets['sites_load']['schema']}.{targets['sites_load']['table']} WHERE scn_name = '{scenario}' """ ) # Delete existing data from database db.execute_sql( f""" DELETE FROM {targets['sites_load_individual']['schema']}. {targets['sites_load_individual']['table']} WHERE scn_name = '{scenario}' """ ) # Calculate industrial load curves per bus data, curves_individual = calc_load_curves_ind_sites(scenario) data.index = data.index.rename(["bus", "wz"]) data["scn_name"] = scenario data.set_index(["scn_name"], inplace=True, append=True) # Insert into database data.to_sql( targets["sites_load"]["table"], schema=targets["sites_load"]["schema"], con=db.engine(), if_exists="append", ) curves_individual["peak_load"] = np.array( curves_individual["p_set"].values.tolist() ).max(axis=1) curves_individual["demand"] = np.array( curves_individual["p_set"].values.tolist() ).sum(axis=1) curves_individual = identify_voltage_level(curves_individual) curves_individual.to_sql( targets["sites_load_individual"]["table"], schema=targets["sites_load_individual"]["schema"], con=db.engine(), if_exists="append", )