Source code for egon.data.datasets.fill_etrago_gen

import geopandas as gpd
import numpy as np
import pandas as pd

from egon.data import db
from egon.data.datasets import Dataset, DatasetSources, DatasetTargets
from egon.data.datasets.scenario_parameters import get_sector_parameters
import egon.data.config


[docs] class Egon_etrago_gen(Dataset): """ Group generators based on Scenario, carrier and bus. Marginal costs are assigned to generators without this data. Grouped generators are sent to the egon_etrago_generator table and a timeseries is assigned to the weather dependent ones. *Dependencies* * :py:class:`PowerPlants <egon.data.datasets.power_plants.PowerPlants>` * :py:class:`WeatherData <egon.data.datasets.era5.WeatherData>` *Resulting tables* * :py:class:`grid.egon_etrago_generator <egon.data.datasets.etrago_setup.EgonPfHvGenerator>` is extended * :py:class:`grid.egon_etrago_generator_timeseries <egon.data.datasets.etrago_setup.EgonPfHvGeneratorTimeseries>` is filled """ #: name: str = "etrago_generators" #: version: str = "0.0.10" sources = DatasetSources( tables={ "power_plants": "supply.egon_power_plants", "renewable_feedin": "supply.egon_era5_renewable_feedin", "weather_cells": "supply.egon_era5_weather_cells", "bus": "grid.egon_etrago_bus", } ) targets = DatasetTargets( tables={ "etrago_generators": "grid.egon_etrago_generator", "etrago_gen_time": "grid.egon_etrago_generator_timeseries", } ) def __init__(self, dependencies): super().__init__( name=self.name, version=self.version, dependencies=dependencies, tasks=(fill_etrago_generators,), )
[docs] def fill_etrago_generators(): # Connect to the data base con = db.engine() cfg = Egon_etrago_gen # use class-level sources/targets # Load required tables ( power_plants, renew_feedin, weather_cells, etrago_gen_orig, pp_time, ) = load_tables(con, cfg) # Delete power plants from previous iterations of this script delete_previuos_gen(cfg, con, etrago_gen_orig, power_plants) renew_feedin = adjust_renew_feedin_table( renew_feedin=renew_feedin, cfg=cfg ) etrago_pp = group_power_plants( power_plants=power_plants, renew_feedin=renew_feedin, etrago_gen_orig=etrago_gen_orig, cfg=cfg, ) etrago_pp = add_marginal_costs(etrago_pp) etrago_gen_table = fill_etrago_gen_table( etrago_pp2=etrago_pp, etrago_gen_orig=etrago_gen_orig, cfg=cfg, con=con, ) etrago_gen_time_table = fill_etrago_gen_time_table( etrago_pp=etrago_pp, power_plants=power_plants, renew_feedin=renew_feedin, pp_time=pp_time, cfg=cfg, con=con, ) return "eTrago Generators tables filled successfully"
[docs] def group_power_plants(power_plants, renew_feedin, etrago_gen_orig, cfg): # group power plants by bus and carrier agg_func = { "carrier": consistency, "el_capacity": np.sum, "bus_id": consistency, "weather_cell_id": power_timeser, "scenario": consistency, } etrago_pp = power_plants.groupby(by=["bus_id", "carrier", "scenario"]).agg( func=agg_func ) etrago_pp = etrago_pp.reset_index(drop=True) etrago_pp["generator_id"] = db.next_etrago_id("generator", len(etrago_pp)) etrago_pp.set_index("generator_id", inplace=True) return etrago_pp
[docs] def add_marginal_costs(power_plants): scenarios = power_plants.scenario.unique() pp = pd.DataFrame() for scenario in scenarios: pp_scn = power_plants[power_plants["scenario"] == scenario].copy() # Read marginal costs from scenario capacities marginal_costs = pd.DataFrame.from_dict( get_sector_parameters("electricity", scenario)["marginal_cost"], orient="index", ).rename(columns={0: "marginal_cost"}) # Set marginal costs = 0 for technologies without values warning = [] for carrier in pp_scn.carrier.unique(): if carrier not in (marginal_costs.index): warning.append(carrier) marginal_costs.at[carrier, "marginal_cost"] = 0 if warning: print(f"""There are no marginal_cost values for: \n{warning} in the scenario {scenario}. Missing values set to 0""") pp = pd.concat( [ pp, pp_scn.merge( right=marginal_costs, left_on="carrier", right_index=True ), ] ) return pp
[docs] def fill_etrago_gen_table(etrago_pp2, etrago_gen_orig, cfg, con): etrago_pp = etrago_pp2[ ["carrier", "el_capacity", "bus_id", "scenario", "marginal_cost"] ] etrago_pp = etrago_pp.rename( columns={ "el_capacity": "p_nom", "bus_id": "bus", "scenario": "scn_name", } ) etrago_pp.to_sql( name=cfg.targets.get_table_name("etrago_generators"), schema=cfg.targets.get_table_schema("etrago_generators"), con=con, if_exists="append", ) return etrago_pp
[docs] def fill_etrago_gen_time_table( etrago_pp, power_plants, renew_feedin, pp_time, cfg, con ): etrago_pp_time = etrago_pp.copy() etrago_pp_time = etrago_pp_time[ ["carrier", "el_capacity", "bus_id", "weather_cell_id", "scenario"] ] etrago_pp_time = etrago_pp_time[ (etrago_pp_time["carrier"] == "solar") | (etrago_pp_time["carrier"] == "solar_rooftop") | (etrago_pp_time["carrier"] == "wind_onshore") | (etrago_pp_time["carrier"] == "wind_offshore") ] cal_timeseries = set_timeseries( power_plants=power_plants, renew_feedin=renew_feedin ) etrago_pp_time["p_max_pu"] = 0 etrago_pp_time["p_max_pu"] = etrago_pp_time.apply(cal_timeseries, axis=1) etrago_pp_time.rename(columns={"scenario": "scn_name"}, inplace=True) etrago_pp_time = etrago_pp_time[["scn_name", "p_max_pu"]] etrago_pp_time = etrago_pp_time.reindex(columns=pp_time.columns) etrago_pp_time = etrago_pp_time.drop(columns="generator_id") etrago_pp_time["p_max_pu"] = etrago_pp_time["p_max_pu"].apply(list) etrago_pp_time["temp_id"] = 1 etrago_pp_time.to_sql( name=cfg.targets.get_table_name("etrago_gen_time"), schema=cfg.targets.get_table_schema("etrago_gen_time"), con=con, if_exists="append", ) return etrago_pp_time
[docs] def load_tables(con, cfg): sql = f""" SELECT * FROM {cfg.sources.tables["power_plants"]} WHERE carrier != 'gas' """ power_plants = gpd.GeoDataFrame.from_postgis( sql, con, crs="EPSG:4326", index_col="id" ) sql = f""" SELECT * FROM {cfg.sources.tables['renewable_feedin']} """ renew_feedin = pd.read_sql(sql, con) sql = f""" SELECT * FROM {cfg.sources.tables['weather_cells']} """ weather_cells = gpd.GeoDataFrame.from_postgis(sql, con, crs="EPSG:4326") sql = f""" SELECT * FROM {cfg.targets.tables['etrago_generators']} """ etrago_gen_orig = pd.read_sql(sql, con) sql = f""" SELECT * FROM {cfg.targets.tables['etrago_gen_time']} """ pp_time = pd.read_sql(sql, con) return power_plants, renew_feedin, weather_cells, etrago_gen_orig, pp_time
[docs] def consistency(data): assert ( len(set(data)) <= 1 ), f"The elements in column {data.name} do not match" return data.iloc[0]
[docs] def numpy_nan(data): return np.nan
[docs] def power_timeser(weather_data): if weather_data.isna().any(): return -1 else: return weather_data.iloc[0]
[docs] def adjust_renew_feedin_table(renew_feedin, cfg): # Define carrier 'pv' as 'solar' carrier_pv_mask = renew_feedin["carrier"] == "pv" renew_feedin.loc[carrier_pv_mask, "carrier"] = "solar" # Copy solar timeseries for solar_rooftop feedin_solar_rooftop = renew_feedin.loc[renew_feedin["carrier"] == "solar"] feedin_solar_rooftop.loc[:, "carrier"] = "solar_rooftop" renew_feedin = pd.concat( [renew_feedin, feedin_solar_rooftop], ignore_index=True ) # convert renewable feedin lists to arrays renew_feedin["feedin"] = renew_feedin["feedin"].apply(np.array) return renew_feedin
[docs] def delete_previuos_gen(cfg, con, etrago_gen_orig, power_plants): for scn_name in egon.data.config.settings()["egon-data"]["--scenarios"]: power_plants_scn = power_plants[power_plants["scenario"] == scn_name] carrier_delete = list(power_plants_scn.carrier.unique()) if carrier_delete: db.execute_sql( f"""DELETE FROM {cfg.targets.tables['etrago_generators']} WHERE carrier IN {*carrier_delete,} AND bus IN ( SELECT bus_id FROM {cfg.sources.tables['bus']} WHERE country = 'DE' AND carrier = 'AC' AND scn_name = '{scn_name}' ) AND scn_name = '{scn_name}' """ ) db.execute_sql( f"""DELETE FROM {cfg.targets.tables['etrago_gen_time']} WHERE generator_id NOT IN ( SELECT generator_id FROM {cfg.targets.tables['etrago_generators']} ) AND scn_name ='{scn_name}' """ )
[docs] def set_timeseries(power_plants, renew_feedin): """ Create a function to calculate the feed-in timeseries for power plants. Parameters ---------- power_plants : DataFrame A DataFrame containing information about power plants, including their bus IDs, carriers, weather cell IDs, and electrical capacities. renew_feedin : DataFrame A DataFrame containing feed-in values for different carriers and weather cell IDs. Returns ------- function A function that takes a power plant object and returns its feed-in value based on either its direct weather cell ID or the aggregated feed-in of all power plants connected to the same bus and having the same carrier. """ def timeseries(pp): """Calculate the feed-in for a given power plant based on weather cell ID or aggregation.""" if pp.weather_cell_id != -1: # Directly fetch feed-in value for power plants with an associated weather cell ID return renew_feedin.loc[ (renew_feedin["w_id"] == pp.weather_cell_id) & (renew_feedin["carrier"] == pp.carrier), "feedin", ].iat[0] else: # Aggregate feed-in for power plants without a direct weather cell association df = power_plants.loc[ (power_plants["bus_id"] == pp.bus_id) & (power_plants["carrier"] == pp.carrier) ].dropna(subset=["weather_cell_id"]) total_int_cap = df["el_capacity"].sum() # Fetch and calculate proportional feed-in for each power plant df["feedin"] = df.apply( lambda x: renew_feedin.loc[ (renew_feedin["w_id"] == x.weather_cell_id) & (renew_feedin["carrier"] == x.carrier), "feedin", ].iat[0], axis=1, ) # Calculate and return aggregated feed-in based on electrical capacity return df.apply( lambda x: x["el_capacity"] / total_int_cap * x["feedin"], axis=1, ).sum() return timeseries