Source code for egon.data.datasets.power_plants.pv_rooftop

"""The module containing all code dealing with pv rooftop distribution to MV grid level.
"""
from pathlib import Path

from loguru import logger
from numpy import isclose
import geopandas as gpd
import pandas as pd

from egon.data import config, db
from egon.data.datasets.power_plants.pv_rooftop_buildings import (
    PV_CAP_PER_SQ_M,
    ROOF_FACTOR,
    load_building_data,
    scenario_data,
)
from egon.data.datasets.scenario_parameters import get_sector_parameters

# assumption on theoretical maximum occupancy of rooftops within an mv grid
# district
# TODO: this is a wild guess
MAX_THEORETICAL_PV_OCCUPANCY = 0.5


[docs]def pv_rooftop_per_mv_grid(): """Execute pv rooftop distribution method per scenario Returns ------- None. """ pv_rooftop_per_mv_grid_and_scenario( scenario="eGon2035", level="federal_state" ) pv_rooftop_per_mv_grid_and_scenario(scenario="eGon100RE", level="national")
[docs]def pv_rooftop_per_mv_grid_and_scenario(scenario, level): """Intergate solar rooftop per mv grid district The target capacity is distributed to the mv grid districts linear to the residential and service electricity demands. Parameters ---------- scenario : str, optional Name of the scenario level : str, optional Choose level of target values. Returns ------- None. """ # Select sources and targets from dataset configuration sources = config.datasets()["solar_rooftop"]["sources"] targets = config.datasets()["solar_rooftop"]["targets"] # Delete existing rows db.execute_sql( f""" DELETE FROM {targets['generators']['schema']}. {targets['generators']['table']} WHERE carrier IN ('solar_rooftop') AND scn_name = '{scenario}' AND bus IN (SELECT bus_id FROM {sources['egon_mv_grid_district']['schema']}. {sources['egon_mv_grid_district']['table']} ) """ ) db.execute_sql( f""" DELETE FROM {targets['generator_timeseries']['schema']}. {targets['generator_timeseries']['table']} WHERE scn_name = '{scenario}' AND generator_id NOT IN ( SELECT generator_id FROM grid.egon_etrago_generator WHERE scn_name = '{scenario}') """ ) # Select demand per mv grid district demand = db.select_dataframe( f""" SELECT SUM(demand) as demand, b.bus_id, vg250_lan FROM {sources['electricity_demand']['schema']}. {sources['electricity_demand']['table']} a JOIN {sources['map_zensus_grid_districts']['schema']}. {sources['map_zensus_grid_districts']['table']} b ON a.zensus_population_id = b.zensus_population_id JOIN {sources['map_grid_boundaries']['schema']}. {sources['map_grid_boundaries']['table']} c ON c.bus_id = b.bus_id WHERE scenario = '{scenario}' GROUP BY (b.bus_id, vg250_lan) """ ) # make sure only grid districts with any buildings are used valid_buildings_gdf = load_building_data() valid_buildings_gdf = valid_buildings_gdf.assign( bus_id=valid_buildings_gdf.bus_id.astype(int), overlay_id=valid_buildings_gdf.overlay_id.astype(int), max_cap=valid_buildings_gdf.building_area.multiply( ROOF_FACTOR * PV_CAP_PER_SQ_M ), ) bus_ids = valid_buildings_gdf.bus_id.unique() demand = demand.loc[demand.bus_id.isin(bus_ids)] # Distribute to mv grids per federal state or Germany if level == "federal_state": targets_per_federal_state = db.select_dataframe( f""" SELECT DISTINCT ON (gen) capacity, gen FROM {sources['scenario_capacities']['schema']}. {sources['scenario_capacities']['table']} a JOIN {sources['federal_states']['schema']}. {sources['federal_states']['table']} b ON a.nuts = b.nuts WHERE carrier = 'solar_rooftop' AND scenario_name = '{scenario}' """, index_col="gen", ) demand["share_federal_state"] = demand.groupby( "vg250_lan" ).demand.apply(lambda grp: grp / grp.sum()) demand["target_federal_state"] = targets_per_federal_state.capacity[ demand.vg250_lan ].values demand.set_index("bus_id", inplace=True) capacities = demand["share_federal_state"].mul( demand["target_federal_state"] ) else: target = db.select_dataframe( f""" SELECT capacity FROM {sources['scenario_capacities']['schema']}. {sources['scenario_capacities']['table']} a WHERE carrier = 'solar_rooftop' AND scenario_name = '{scenario}' """ ).capacity[0] dataset = config.settings()["egon-data"]["--dataset-boundary"] if dataset == "Schleswig-Holstein": sources_scn = config.datasets()["scenario_input"]["sources"] path = Path( f"./data_bundle_egon_data/nep2035_version2021/" f"{sources_scn['eGon2035']['capacities']}" ).resolve() total_2035 = ( pd.read_excel( path, sheet_name="1.Entwurf_NEP2035_V2021", index_col="Unnamed: 0", ).at["PV (Aufdach)", "Summe"] * 1000 ) sh_2035 = scenario_data(scenario="eGon2035").capacity.sum() share = sh_2035 / total_2035 target *= share demand["share_country"] = demand.demand / demand.demand.sum() demand.set_index("bus_id", inplace=True) capacities = demand["share_country"].mul(target) # Select next id value new_id = db.next_etrago_id("generator") # Store data in dataframe pv_rooftop = pd.DataFrame( data={ "scn_name": scenario, "carrier": "solar_rooftop", "bus": demand.index, "p_nom": capacities, "generator_id": range(new_id, new_id + len(demand)), } ) # ensure that no more pv rooftop capacity is allocated to any mv grid # district than there is rooftop potential max_cap_per_bus_df = ( valid_buildings_gdf[["max_cap", "bus_id"]].groupby("bus_id").sum() * MAX_THEORETICAL_PV_OCCUPANCY ) pv_rooftop = pv_rooftop.merge( max_cap_per_bus_df, how="left", left_on="bus", right_index=True ) assert ~pv_rooftop.max_cap.isna().any(), ( "There are bus IDs within 'pv_rooftop' which are not included within " " 'max_cap_per_bus_df'." ) pv_rooftop = pv_rooftop.assign(delta=pv_rooftop.max_cap - pv_rooftop.p_nom) loss = pv_rooftop.delta.clip(upper=0).sum() total = pv_rooftop.p_nom.sum() pv_rooftop = pv_rooftop.assign( p_nom=pv_rooftop[["p_nom", "max_cap"]].min(axis=1) ) pos_delta = pv_rooftop.loc[pv_rooftop.delta > 0].delta rel_delta = pos_delta / pos_delta.sum() add_pv_cap = rel_delta * abs(loss) pv_rooftop.loc[add_pv_cap.index, "p_nom"] += add_pv_cap pv_rooftop = pv_rooftop.drop(columns=["max_cap", "delta"]) assert isclose( total, pv_rooftop.p_nom.sum() ), f"{total} != {pv_rooftop.p_nom.sum()}" if loss < 0: logger.debug( f"{loss:g} MW got redistributed from MV grids with too little " f"rooftop potential towards other MV grids." ) # Select feedin timeseries weather_cells = db.select_geodataframe( f""" SELECT w_id, geom FROM {sources['weather_cells']['schema']}. {sources['weather_cells']['table']} """, index_col="w_id", ) mv_grid_districts = db.select_geodataframe( f""" SELECT bus_id as bus_id, ST_Centroid(geom) as geom FROM {sources['egon_mv_grid_district']['schema']}. {sources['egon_mv_grid_district']['table']} """, index_col="bus_id", ) # Map centroid of mv grids to weather cells join = gpd.sjoin(weather_cells, mv_grid_districts)[["index_right"]] feedin = db.select_dataframe( f""" SELECT w_id, feedin FROM {sources['solar_feedin']['schema']}. {sources['solar_feedin']['table']} WHERE carrier = 'pv' AND weather_year = 2011 """, index_col="w_id", ) # Create timeseries only for mv grid districts with pv rooftop join = join[join.index_right.isin(pv_rooftop.bus)] timeseries = pd.DataFrame( data={ "scn_name": scenario, "temp_id": 1, "p_max_pu": feedin.feedin[join.index].values, "generator_id": pv_rooftop.generator_id[join.index_right].values, } ).set_index("generator_id") pv_rooftop = pv_rooftop.set_index("generator_id") pv_rooftop["marginal_cost"] = get_sector_parameters( "electricity", scenario )["marginal_cost"]["solar"] # Insert data to database pv_rooftop.to_sql( targets["generators"]["table"], schema=targets["generators"]["schema"], if_exists="append", con=db.engine(), ) timeseries.to_sql( targets["generator_timeseries"]["table"], schema=targets["generator_timeseries"]["schema"], if_exists="append", con=db.engine(), )