Source code for egon.data.datasets.power_plants.pv_ground_mounted

import geopandas as gpd
import numpy as np
import pandas as pd

from egon.data import db
from egon.data.datasets.mastr import WORKING_DIR_MASTR_NEW
import egon.data.config


[docs]def insert(): def mastr_existing_pv(pow_per_area): """Import MaStR data from csv-files. Parameters ---------- pow_per_area: int Assumption for areas of existing pv farms and power of new built pv farms depending on area in kW/m² """ # get config cfg = egon.data.config.datasets()["power_plants"] # import MaStR data: locations, grid levels and installed capacities # get relevant pv plants: ground mounted df = pd.read_csv( WORKING_DIR_MASTR_NEW / cfg["sources"]["mastr_pv"], usecols=[ "Lage", "Laengengrad", "Breitengrad", "Nettonennleistung", "EinheitMastrNummer", "LokationMastrNummer", ], ) df = df[df["Lage"] == "Freifläche"] # examine data concerning geographical locations and drop NaNs x1 = df["Laengengrad"].isnull().sum() x2 = df["Breitengrad"].isnull().sum() print(" ") print("Examination of MaStR data set:") print("original number of rows in the data set: " + str(len(df))) print("NaNs for longitude and latitude: " + str(x1) + " & " + str(x2)) df.dropna(inplace=True) print("Number of rows after neglecting NaNs: " + str(len(df))) print(" ") # derive dataframe for locations mastr = gpd.GeoDataFrame( index=df.index, geometry=gpd.points_from_xy(df["Laengengrad"], df["Breitengrad"]), crs={"init": "epsg:4326"}, ) mastr = mastr.to_crs(3035) # derive installed capacities mastr["installed capacity in kW"] = df["Nettonennleistung"] # create buffer around locations # calculate bufferarea and -radius considering installed capacity df_radius = ( mastr["installed capacity in kW"].div(pow_per_area * np.pi) ** 0.5 ) # in m # create buffer mastr["buffer"] = mastr["geometry"].buffer(df_radius) mastr["buffer"].crs = 3035 # derive MaStR-Nummer mastr["LokationMastrNummer"] = df["LokationMastrNummer"] # derive voltage level mastr["voltage_level"] = pd.Series(dtype=int) lvl = pd.read_csv( WORKING_DIR_MASTR_NEW / cfg["sources"]["mastr_location"], usecols=["Spannungsebene", "MaStRNummer"], ) # assign voltage_level to MaStR-unit: vlevel_mapping = { "Höchstspannung": 1, "UmspannungZurHochspannung": 2, "Hochspannung": 3, "UmspannungZurMittelspannung": 4, "Mittelspannung": 5, "UmspannungZurNiederspannung": 6, "Niederspannung": 7, } mastr = mastr.merge( lvl[["MaStRNummer", "Spannungsebene"]], left_on="LokationMastrNummer", right_on="MaStRNummer", how="left", ) mastr["voltage_level"] = mastr.Spannungsebene.replace(vlevel_mapping) mastr.drop(["MaStRNummer", "Spannungsebene"], axis=1, inplace=True) # ### examine data concerning voltage level x1 = mastr["voltage_level"].isnull().sum() print(" ") print("Examination of voltage levels in MaStR data set:") print("Original number of rows in MaStR: " + str(len(mastr))) print( "NaNs in voltage level caused by a) a missing assignment to the " "number or b) insufficient data: " + str(x1) ) # drop PVs with missing values due to a) no assignment of # MaStR-numbers or b) missing data in row mastr.dropna(inplace=True) print("Number of rows after neglecting NaNs: " + str(len(mastr))) # drop PVs in low voltage level index_names = mastr[mastr["voltage_level"] == "Niederspannung"].index x2 = len(index_names) mastr.drop(index_names, inplace=True) index_names = mastr[ mastr["voltage_level"] == "UmspannungZurNiederspannung" ].index x3 = len(index_names) mastr.drop(index_names, inplace=True) # ### further examination print("Number of PVs in low voltage level: " + str(x2)) print("Number of PVs in LVMV level: " + str(x3)) print( "Number of rows after dropping entries assigned to these levels: " + str(len(mastr)) ) print(" ") return mastr def potential_areas(con, join_buffer): """Import potential areas and choose and prepare areas suitable for PV ground mounted. Parameters ---------- con: Connection to database join_buffer: int Maximum distance for joining of potential areas (only small ones to big ones) in m """ # import potential areas: railways and roads & agriculture # roads and railway sql = ( "SELECT id, geom FROM " "supply.egon_re_potential_area_pv_road_railway" ) potentials_rora = gpd.GeoDataFrame.from_postgis(sql, con) potentials_rora = potentials_rora.set_index("id") # agriculture sql = ( "SELECT id, geom FROM " "supply.egon_re_potential_area_pv_agriculture" ) potentials_agri = gpd.GeoDataFrame.from_postgis(sql, con) potentials_agri = potentials_agri.set_index("id") # add areas < 1 ha to bigger areas if they are very close, otherwise # exclude areas < 1 ha # calculate area potentials_rora["area"] = potentials_rora.area potentials_agri["area"] = potentials_agri.area # roads and railways # ### counting variable for examination before = len(potentials_rora) # get small areas and create buffer for joining around them small_areas = potentials_rora[potentials_rora["area"] < 10000] small_buffers = small_areas.copy() small_buffers["geom"] = small_areas["geom"].buffer(join_buffer) # drop small areas in potential areas index_names = potentials_rora[potentials_rora["area"] < 10000].index potentials_rora.drop(index_names, inplace=True) # check intersection of small areas with other potential areas overlay = gpd.sjoin(potentials_rora, small_buffers) o = overlay["index_right"] o.drop_duplicates(inplace=True) # add small areas to big ones if buffer intersects for i in range(0, len(o)): index_potentials = o.index[i] index_small = o.iloc[i] x = potentials_rora["geom"].loc[index_potentials] y = small_areas["geom"].loc[index_small] join = gpd.GeoSeries(data=[x, y]) potentials_rora["geom"].loc[index_potentials] = join.unary_union # ### examination of joining of areas count_small = len(small_buffers) count_join = len(o) count_delete = count_small - count_join print(" ") print( "Examination of potential areas in category 'Roads and Railways'" ) print("Length of original data frame: " + str(before)) print("Number of small areas: " + str(count_small)) print("Number of joins: " + str(count_join)) print("Deleted areas (not joined): " + str(count_delete)) print("Length of resulting data frame: " + str(len(potentials_rora))) print(" ") # agriculture # ### counting variable for examination before = len(potentials_agri) # get small areas and create buffer for joining around them small_areas = potentials_agri[potentials_agri["area"] < 10000] small_buffers = small_areas.copy() small_buffers["geom"] = small_areas["geom"].buffer(join_buffer) # drop small areas in potential areas index_names = potentials_agri[potentials_agri["area"] < 10000].index potentials_agri.drop(index_names, inplace=True) # check intersection of small areas with other potential areas overlay = gpd.sjoin(potentials_agri, small_buffers) o = overlay["index_right"] o.drop_duplicates(inplace=True) # add small areas to big ones if buffer intersects for i in range(0, len(o)): index_potentials = o.index[i] index_small = o.iloc[i] x = potentials_agri["geom"].loc[index_potentials] y = small_areas["geom"].loc[index_small] join = gpd.GeoSeries(data=[x, y]) potentials_agri["geom"].loc[index_potentials] = join.unary_union # ### examination of joining of areas count_small = len(small_buffers) count_join = len(o) count_delete = count_small - count_join print(" ") print("Examination of potential areas in category 'Agriculture'") print("Length of original data frame: " + str(before)) print("Number of small areas: " + str(count_small)) print("Number of joins: " + str(count_join)) print("Deleted areas (not joined): " + str(count_delete)) print("Length of resulting data frame: " + str(len(potentials_agri))) print(" ") # calculate new areas potentials_rora["area"] = potentials_rora.area potentials_agri["area"] = potentials_agri.area # check intersection of potential areas # ### counting variable agri_vorher = len(potentials_agri) # if areas intersect, keep road & railway potential areas and drop # agricultural ones overlay = gpd.sjoin(potentials_rora, potentials_agri) o = overlay["index_right"] o.drop_duplicates(inplace=True) for i in range(0, len(o)): index = o.iloc[i] potentials_agri.drop([index], inplace=True) # ### examination of intersection of areas print(" ") print("Review function to avoid intersection of potential areas:") print("Initial length potentials_agri: " + str(agri_vorher)) print("Number of occurred cases: " + str(len(o))) print("Resulting length potentials_agri: " + str(len(potentials_agri))) print(" ") return potentials_rora, potentials_agri def select_pot_areas(mastr, potentials_pot): """Select potential areas where there are existing pv parks (MaStR-data). Parameters ---------- mastr: gpd.GeoDataFrame() MaStR-DataFrame with existing pv parks potentials_pot: gpd.GeoDataFrame() Suitable potential areas """ # select potential areas with existing pv parks # (potential areas intersect buffer around existing plants) # prepare dataframes to check intersection pvs = gpd.GeoDataFrame() pvs["geom"] = mastr["buffer"].copy() pvs.crs = 3035 pvs = pvs.set_geometry("geom") potentials = gpd.GeoDataFrame() potentials["geom"] = potentials_pot["geom"].copy() potentials.crs = 3035 potentials = potentials.set_geometry("geom") # check intersection of potential areas with exisiting PVs (MaStR) overlay = gpd.sjoin(pvs, potentials) o = overlay["index_right"] o.drop_duplicates(inplace=True) # define selected potentials areas pot_sel = potentials_pot.copy() pot_sel["selected"] = pd.Series() pot_sel["voltage_level"] = pd.Series(dtype=int) for i in range(0, len(o)): index_pot = o.iloc[i] pot_sel["selected"].loc[index_pot] = True # get voltage level of existing PVs index_pv = o.index[i] pot_sel["voltage_level"] = mastr["voltage_level"].loc[index_pv] pot_sel = pot_sel[pot_sel["selected"] == True] pot_sel.drop("selected", axis=1, inplace=True) # drop selected existing pv parks from mastr mastr.drop(index=o.index, inplace=True) return (pot_sel, mastr) def build_pv(pv_pot, pow_per_area): """Build new pv parks in selected potential areas. Parameters ---------- pv_pot: gpd.GeoDataFrame() Selected potential areas pow_per_area: int Assumption for areas of existing pv farms and power of new built pv farms depending on area in kW/m² """ # build pv farms in selected areas # calculation of centroids pv_pot["centroid"] = pv_pot["geom"].representative_point() # calculation of power in kW pv_pot["installed capacity in kW"] = pd.Series() pv_pot["installed capacity in kW"] = pv_pot["area"] * pow_per_area # check for maximal capacity for PV ground mounted limit_cap = 120000 # in kW pv_pot["installed capacity in kW"] = pv_pot[ "installed capacity in kW" ].apply(lambda x: x if x < limit_cap else limit_cap) return pv_pot def adapt_grid_level(pv_pot, max_dist_hv, con): """Check and if needed adapt grid level of newly built pv parks. Parameters ---------- pv_pot: gpd.GeoDataFrame() Newly built pv parks on selected potential areas max_dist_hv: int Assumption for maximum distance of park with hv-power to next substation in m con: Connection to database """ # divide dataframe in MV and HV pv_pot_mv = pv_pot[pv_pot["voltage_level"] == 5] pv_pot_hv = pv_pot[pv_pot["voltage_level"] == 4] # check installed capacity in MV max_cap_mv = 5500 # in kW # find PVs which need to be HV or to have reduced capacity pv_pot_mv_to_hv = pv_pot_mv[ pv_pot_mv["installed capacity in kW"] > max_cap_mv ] if len(pv_pot_mv_to_hv) > 0: # import data for HV substations sql = "SELECT point, voltage FROM grid.egon_hvmv_substation" hvmv_substation = gpd.GeoDataFrame.from_postgis( sql, con, geom_col="point" ) hvmv_substation = hvmv_substation.to_crs(3035) hvmv_substation["voltage"] = hvmv_substation["voltage"].apply( lambda x: int(x.split(";")[0]) ) hv_substations = hvmv_substation[ hvmv_substation["voltage"] >= 110000 ] hv_substations = ( hv_substations.unary_union ) # join all the hv_substations # check distance to HV substations of PVs with too high installed # capacity for MV # calculate distance to substations pv_pot_mv_to_hv["dist_to_HV"] = ( pv_pot_mv_to_hv["geom"].to_crs(3035).distance(hv_substations) ) # adjust grid level and keep capacity if transmission lines are # close pv_pot_mv_to_hv = pv_pot_mv_to_hv[ pv_pot_mv_to_hv["dist_to_HV"] <= max_dist_hv ] pv_pot_mv_to_hv = pv_pot_mv_to_hv.drop(columns=["dist_to_HV"]) pv_pot_hv = pv_pot_hv.append(pv_pot_mv_to_hv) # delete PVs which are now HV from MV dataframe for index, pot in pv_pot_mv_to_hv.iterrows(): pv_pot_mv = pv_pot_mv.drop([index]) pv_pot_hv["voltage_level"] = 4 # keep grid level adjust capacity if transmission lines are too # far pv_pot_mv["installed capacity in kW"] = pv_pot_mv[ "installed capacity in kW" ].apply(lambda x: x if x < max_cap_mv else max_cap_mv) pv_pot_mv["voltage_level"] = 5 pv_pot = pv_pot_mv.append(pv_pot_hv) return pv_pot def build_additional_pv(potentials, pv, pow_per_area, con): """Build additional pv parks if pv parks on selected potential areas do not hit the target value. Parameters ---------- potenatials: gpd.GeoDataFrame() All suitable potential areas pv: gpd.GeoDataFrame() Newly built pv parks on selected potential areas pow_per_area: int Assumption for areas of existing pv farms and power of new built pv farms depending on area in kW/m² con: Connection to database """ # get MV grid districts sql = "SELECT bus_id, geom FROM grid.egon_mv_grid_district" distr = gpd.GeoDataFrame.from_postgis(sql, con) distr = distr.set_index("bus_id") # identify potential areas where there are no PV parks yet for index, pv in pv.iterrows(): potentials = potentials.drop([index]) # aggregate potential area per MV grid district pv_per_distr = gpd.GeoDataFrame() pv_per_distr["geom"] = distr["geom"].copy() centroids = potentials.copy() centroids["geom"] = centroids["geom"].representative_point() overlay = gpd.sjoin(centroids, distr) # ### examine potential area per grid district anz = len(overlay) anz_distr = len(overlay["index_right"].unique()) size = 137500 # m2 Fläche für > 5,5 MW: (5500 kW / (0,04 kW/m2)) anz_big = len(overlay[overlay["area"] >= size]) anz_small = len(overlay[overlay["area"] < size]) print(" ") print( "Examination of remaining potential areas in MV grid districts: " ) print("Number of potential areas: " + str(anz)) print(" -> distributed to " + str(anz_distr) + " districts") print("Number of areas with a potential >= 5,5 MW: " + str(anz_big)) print("Number of areas with a potential < 5,5 MW: " + str(anz_small)) print(" ") for index, dist in distr.iterrows(): pots = overlay[overlay["index_right"] == index]["geom"].index p = gpd.GeoSeries(index=pots) for i in pots: p.loc[i] = potentials["geom"].loc[i] pv_per_distr["geom"].loc[index] = p.unary_union # calculate area per MV grid district and linearly distribute needed # capacity considering pow_per_area pv_per_distr["area"] = pv_per_distr["geom"].area pv_per_distr["installed capacity in kW"] = ( pv_per_distr["area"] * pow_per_area ) # calculate centroid pv_per_distr["centroid"] = pv_per_distr["geom"].representative_point() return pv_per_distr def check_target( pv_rora_i, pv_agri_i, pv_exist_i, potentials_rora_i, potentials_agri_i, target_power, pow_per_area, con, ): """Check target value per scenario and per state. Parameters ---------- pv_rora_i: gpd.GeoDataFrame() Newly built pv parks on selected potential areas of road and railways p pv_agri_i: gpd.GeoDataFrame() Newly built pv parks on selected potential areas of agriculture pv_exist_i: gpd.GeoDataFrame() existing pv parks that don't intercept any potential area potenatials_rora_i: gpd.GeoDataFrame() All suitable potential areas of road and railway potenatials_rora_i: gpd.GeoDataFrame() All suitable potential areas of agriculture target_power: int Target for installed capacity of pv ground mounted in referenced state pow_per_area: int Assumption for areas of existing pv farms and power of new built pv farms depending on area in kW/m² con: Connection to database """ # sum overall installed capacity for MV and HV total_pv_power = ( pv_rora_i["installed capacity in kW"].sum() + pv_agri_i["installed capacity in kW"].sum() + pv_exist_i["installed capacity in kW"].sum() ) pv_per_distr_i = gpd.GeoDataFrame() # check target value ### print(" ") print( "Installed capacity on areas with existing plants: " + str(total_pv_power / 1000) + " MW" ) # linear scale farms to meet target if sum of installed capacity is # too high if total_pv_power >= target_power: scale_factor = target_power / total_pv_power pv_rora_i["installed capacity in kW"] = ( pv_rora_i["installed capacity in kW"] * scale_factor ) pv_agri_i["installed capacity in kW"] = ( pv_agri_i["installed capacity in kW"] * scale_factor ) pv_exist_i["installed capacity in kW"] = ( pv_exist_i["installed capacity in kW"] * scale_factor ) pv_per_distr_i["grid_district"] = pd.Series() pv_per_distr_i["installed capacity in kW"] = pd.Series(0) ### print( "Expansion of existing PV parks on potential areas to " "achieve target capacity is sufficient." ) print( "Installed power is greater than the target value, scaling " "is applied:" ) print("Scaling factor: " + str(scale_factor)) # build new pv parks if sum of installed capacity is below target # value elif total_pv_power < target_power: rest_cap = target_power - total_pv_power ### print( "Expansion of existing PV parks on potential areas to " "achieve target capacity is unsufficient:" ) print("Residual capacity: " + str(rest_cap / 1000) + " MW") print( "Residual capacity will initially be distributed via " "remaining potential areas 'Road & Railway'." ) # build pv parks in potential areas road & railway pv_per_distr_i = build_additional_pv( potentials_rora_i, pv_rora_i, pow_per_area, con ) # change index to add different Dataframes in the end pv_per_distr_i["grid_district"] = pv_per_distr_i.index.copy() pv_per_distr_i.index = range(0, len(pv_per_distr_i)) # delete empty grid districts index_names = pv_per_distr_i[ pv_per_distr_i["installed capacity in kW"].isna() ].index pv_per_distr_i.drop(index_names, inplace=True) if pv_per_distr_i["installed capacity in kW"].sum() > rest_cap: scale_factor = ( rest_cap / pv_per_distr_i["installed capacity in kW"].sum() ) pv_per_distr_i["installed capacity in kW"] = ( pv_per_distr_i["installed capacity in kW"] * scale_factor ) ### print( "Residual capacity got distributed via scaling factor " + str(scale_factor) + " to remaining potential areas 'Road & Railway'." ) # build pv parks on potential areas agriculture if still necessary elif pv_per_distr_i["installed capacity in kW"].sum() < rest_cap: rest_cap = ( target_power - total_pv_power - pv_per_distr_i["installed capacity in kW"].sum() ) ### print( "Distribution via potential areas Road & Railway " "unsufficient to achieve target capacity:" ) print("Residual capacity: " + str(rest_cap / 1000) + " MW") print( "Residual capacity is distributed to remaining potential " "areas 'Agriculture'." ) pv_per_distr_i_2 = build_additional_pv( potentials_agri_i, pv_agri_i, pow_per_area, con ) # change index to add different Dataframes in the end pv_per_distr_i_2["grid_district"] = pv_per_distr_i_2.index pv_per_distr_i_2.index = range(len(pv_per_distr_i_2)) # delete empty grid districts index_names = pv_per_distr_i_2[ pv_per_distr_i_2["installed capacity in kW"].isna() ].index pv_per_distr_i_2.drop(index_names, inplace=True) if ( pv_per_distr_i_2["installed capacity in kW"].sum() > rest_cap ): scale_factor = ( rest_cap / pv_per_distr_i_2["installed capacity in kW"].sum() ) pv_per_distr_i_2["installed capacity in kW"] = ( pv_per_distr_i_2["installed capacity in kW"] * scale_factor ) ### print( "Residual capacity got distributed via scaling " "factor " + str(scale_factor) + " to remaining potential areas 'Road & Railway' " "and 'Agriculture'." ) pv_per_distr_i = pv_per_distr_i.append( pv_per_distr_i_2, ignore_index=True ) # assign grid level to pv_per_distr v_lvl = pd.Series(dtype=int, index=pv_per_distr_i.index) for index, distr in pv_per_distr_i.iterrows(): if distr["installed capacity in kW"] > 5500: # > 5 MW v_lvl[index] = 4 else: v_lvl[index] = 5 pv_per_distr_i["voltage_level"] = v_lvl # new overall installed capacity total_pv_power = ( pv_rora_i["installed capacity in kW"].sum() + pv_agri_i["installed capacity in kW"].sum() + pv_exist_i["installed capacity in kW"].sum() + pv_per_distr_i["installed capacity in kW"].sum() ) ### print( "Total installed capacity of PV farms: " + str(total_pv_power / 1000) + " MW" ) print(" ") pv_rora_i = pv_rora_i[pv_rora_i["installed capacity in kW"] > 0] pv_agri_i = pv_agri_i[pv_agri_i["installed capacity in kW"] > 0] pv_exist_i = pv_exist_i[pv_exist_i["installed capacity in kW"] > 0] pv_per_distr_i = pv_per_distr_i[ pv_per_distr_i["installed capacity in kW"] > 0 ] return pv_rora_i, pv_agri_i, pv_exist_i, pv_per_distr_i def keep_existing_pv(mastr, con): pv_exist = mastr[ [ "geometry", "installed capacity in kW", "voltage_level", ] ] pv_exist.rename(columns={"geometry": "centroid"}, inplace=True) pv_exist = gpd.GeoDataFrame(pv_exist, geometry="centroid", crs=3035) # German states sql = "SELECT geometry as geom, gf FROM boundaries.vg250_lan" land = gpd.GeoDataFrame.from_postgis(sql, con).to_crs(3035) land = land[(land["gf"] != 1) & (land["gf"] != 2)] land = land.unary_union pv_exist = gpd.clip(pv_exist, land) return pv_exist def run_methodology( con=db.engine(), pow_per_area=0.04, join_buffer=10, max_dist_hv=20000, show_map=False, ): """Execute methodology to distribute pv ground mounted. Parameters ---------- con: Connection to database pow_per_area: int, default 0.4 Assumption for areas of existing pv farms and power of new built pv farms depending on area in kW/m² join_buffer : int, default 10 Maximum distance for joining of potential areas (only small ones to big ones) in m max_dist_hv : int, default 20000 Assumption for maximum distance of park with hv-power to next substation in m show_map: boolean Optional creation of map to show distribution of installed capacity """ ### print(" ") print("MaStR-Data") print(" ") # MaStR-data: existing PV farms mastr = mastr_existing_pv(pow_per_area) ### print(" ") print("potential area") print(" ") # database-data: potential areas for new PV farms potentials_rora, potentials_agri = potential_areas(con, join_buffer) ### print(" ") print("select potentials area") print(" ") # select potential areas with existing PV farms to build new PV farms pv_rora, mastr = select_pot_areas(mastr, potentials_rora) pv_agri, mastr = select_pot_areas(mastr, potentials_agri) ### print(" ") print( "build PV parks where there is PV ground mounted already " "(-> MaStR) on potential area" ) print(" ") # build new PV farms pv_rora = build_pv(pv_rora, pow_per_area) pv_agri = build_pv(pv_agri, pow_per_area) # keep the existing pv_farms that don't intercept potential areas exist = keep_existing_pv(mastr, con) ### print(" ") print("adapt grid level of PV parks") print(" ") # adapt grid level to new farms rora = adapt_grid_level(pv_rora, max_dist_hv, con) agri = adapt_grid_level(pv_agri, max_dist_hv, con) ### print(" ") print( "check target value and build more PV parks on potential area if " "necessary" ) print(" ") # 1) scenario: eGon2035 ### print(" ") print("scenario: eGon2035") print(" ") # German states sql = "SELECT geometry as geom, nuts FROM boundaries.vg250_lan" states = gpd.GeoDataFrame.from_postgis(sql, con) # assumption for target value of installed capacity sql = ( "SELECT capacity,scenario_name,nuts FROM " "supply.egon_scenario_capacities WHERE carrier='solar'" ) target = pd.read_sql(sql, con) target = target[target["scenario_name"] == "eGon2035"] nuts = np.unique(target["nuts"]) # initialize final dataframe pv_rora = gpd.GeoDataFrame() pv_agri = gpd.GeoDataFrame() pv_exist = gpd.GeoDataFrame() pv_per_distr = gpd.GeoDataFrame() # prepare selection per state rora = rora.set_geometry("centroid") agri = agri.set_geometry("centroid") potentials_rora = potentials_rora.set_geometry("geom") potentials_agri = potentials_agri.set_geometry("geom") # check target value per state for i in nuts: target_power = ( target[target["nuts"] == i]["capacity"].iloc[0] * 1000 ) ### land = target[target["nuts"] == i]["nuts"].iloc[0] print(" ") print("Bundesland (NUTS): " + land) print("target power: " + str(target_power / 1000) + " MW") # select state state = states[states["nuts"] == i] state = state.to_crs(3035) # select PVs in state rora_i = gpd.sjoin(rora, state) agri_i = gpd.sjoin(agri, state) exist_i = gpd.sjoin(exist, state) rora_i.drop("index_right", axis=1, inplace=True) agri_i.drop("index_right", axis=1, inplace=True) exist_i.drop("index_right", axis=1, inplace=True) rora_i.drop_duplicates(inplace=True) agri_i.drop_duplicates(inplace=True) exist_i.drop_duplicates(inplace=True) # select potential areas in state potentials_rora_i = gpd.sjoin(potentials_rora, state) potentials_agri_i = gpd.sjoin(potentials_agri, state) potentials_rora_i.drop("index_right", axis=1, inplace=True) potentials_agri_i.drop("index_right", axis=1, inplace=True) potentials_rora_i.drop_duplicates(inplace=True) potentials_agri_i.drop_duplicates(inplace=True) # check target value and adapt installed capacity if necessary rora_i, agri_i, exist_i, distr_i = check_target( rora_i, agri_i, exist_i, potentials_rora_i, potentials_agri_i, target_power, pow_per_area, con, ) if len(distr_i) > 0: distr_i["nuts"] = target[target["nuts"] == i]["nuts"].iloc[0] # ### examination of built PV parks per state rora_i_mv = rora_i[rora_i["voltage_level"] == 5] rora_i_hv = rora_i[rora_i["voltage_level"] == 4] agri_i_mv = agri_i[agri_i["voltage_level"] == 5] agri_i_hv = agri_i[agri_i["voltage_level"] == 4] print("eGon2035: Examination of voltage level per federal state:") print("a) PVs on potential areas Road & Railway: ") print( "Total installed capacity: " + str(rora_i["installed capacity in kW"].sum() / 1000) + " MW" ) print("Number of PV farms: " + str(len(rora_i))) print(" - thereof MV: " + str(len(rora_i_mv))) print(" - thereof HV: " + str(len(rora_i_hv))) print("b) PVs on potential areas Agriculture: ") print( "Total installed capacity: " + str(agri_i["installed capacity in kW"].sum() / 1000) + " MW" ) print("Number of PV farms: " + str(len(agri_i))) print(" - thereof MV: " + str(len(agri_i_mv))) print(" - dthereof HV: " + str(len(agri_i_hv))) print("c) Existing PVs not in potential areas: ") print("Number of PV farms: " + str(len(exist_i))) print("d) PVs on additional potential areas per MV-District: ") if len(distr_i) > 0: distr_i_mv = distr_i[distr_i["voltage_level"] == 5] distr_i_hv = distr_i[distr_i["voltage_level"] == 4] print( "Total installed capacity: " + str(distr_i["installed capacity in kW"].sum() / 1000) + " MW" ) print("Number of PV farms: " + str(len(distr_i))) print(" - thereof MV: " + str(len(distr_i_mv))) print(" - thereof HV: " + str(len(distr_i_hv))) else: print(" -> No additional expansion necessary") print(" ") pv_rora = pv_rora.append(rora_i) pv_agri = pv_agri.append(agri_i) pv_exist = pv_exist.append(exist_i) if len(distr_i) > 0: pv_per_distr = pv_per_distr.append(distr_i) # 2) scenario: eGon100RE # assumption for target value of installed capacity in Germany per # scenario sql = ( "SELECT capacity,scenario_name FROM " "supply.egon_scenario_capacities WHERE carrier='solar'" ) target_power = pd.read_sql(sql, con) target_power = target_power[ target_power["scenario_name"] == "eGon100RE" ] target_power = target_power["capacity"].sum() * 1000 ### print(" ") print("scenario: eGon100RE") print("target power: " + str(target_power) + " kW") print(" ") # check target value and adapt installed capacity if necessary ( pv_rora_100RE, pv_agri_100RE, pv_exist_100RE, pv_per_distr_100RE, ) = check_target( rora, agri, exist, potentials_rora, potentials_agri, target_power, pow_per_area, con, ) # ### create map to show distribution of installed capacity if show_map == True: # 1) eGon2035 # get MV grid districts sql = "SELECT bus_id, geom FROM grid.egon_mv_grid_district" distr = gpd.GeoDataFrame.from_postgis(sql, con) distr = distr.set_index("bus_id") # assign pv_per_distr-power to districts distr["capacity"] = pd.Series() for index, row in distr.iterrows(): if index in np.unique(pv_per_distr["grid_district"]): pv = pv_per_distr[pv_per_distr["grid_district"] == index] x = pv["installed capacity in kW"].iloc[0] distr["capacity"].loc[index] = x else: distr["capacity"].loc[index] = 0 distr["capacity"] = distr["capacity"] / 1000 # add pv_rora- and pv_agri-power to district pv_rora = pv_rora.set_geometry("centroid") pv_agri = pv_agri.set_geometry("centroid") overlay_rora = gpd.sjoin(pv_rora, distr) overlay_agri = gpd.sjoin(pv_agri, distr) for index, row in distr.iterrows(): o_rora = overlay_rora[overlay_rora["index_right"] == index] o_agri = overlay_agri[overlay_agri["index_right"] == index] cap_rora = o_rora["installed capacity in kW"].sum() / 1000 cap_agri = o_agri["installed capacity in kW"].sum() / 1000 distr["capacity"].loc[index] = ( distr["capacity"].loc[index] + cap_rora + cap_agri ) from matplotlib import pyplot as plt fig, ax = plt.subplots(1, 1) distr.boundary.plot(linewidth=0.2, ax=ax, color="black") distr.plot( ax=ax, column="capacity", cmap="magma_r", legend=True, legend_kwds={ "label": "Installed capacity in MW", "orientation": "vertical", }, ) plt.savefig("pv_per_distr_map_eGon2035.png", dpi=300) # 2) eGon100RE # get MV grid districts sql = "SELECT bus_id, geom FROM grid.egon_mv_grid_district" distr = gpd.GeoDataFrame.from_postgis(sql, con) distr = distr.set_index("bus_id") # assign pv_per_distr-power to districts distr["capacity"] = pd.Series() for index, row in distr.iterrows(): if index in np.unique(pv_per_distr_100RE["grid_district"]): pv = pv_per_distr_100RE[ pv_per_distr_100RE["grid_district"] == index ] x = pv["installed capacity in kW"].iloc[0] distr["capacity"].loc[index] = x else: distr["capacity"].loc[index] = 0 distr["capacity"] = distr["capacity"] / 1000 # add pv_rora- and pv_agri-power to district pv_rora_100RE = pv_rora_100RE.set_geometry("centroid") pv_agri_100RE = pv_agri_100RE.set_geometry("centroid") overlay_rora = gpd.sjoin(pv_rora_100RE, distr) overlay_agri = gpd.sjoin(pv_agri_100RE, distr) for index, row in distr.iterrows(): o_rora = overlay_rora[overlay_rora["index_right"] == index] o_agri = overlay_agri[overlay_agri["index_right"] == index] cap_rora = o_rora["installed capacity in kW"].sum() / 1000 cap_agri = o_agri["installed capacity in kW"].sum() / 1000 distr["capacity"].loc[index] = ( distr["capacity"].loc[index] + cap_rora + cap_agri ) from matplotlib import pyplot as plt fig, ax = plt.subplots(1, 1) distr.boundary.plot(linewidth=0.2, ax=ax, color="black") distr.plot( ax=ax, column="capacity", cmap="magma_r", legend=True, legend_kwds={ "label": "Installed capacity in MW", "orientation": "vertical", }, ) plt.savefig("pv_per_distr_map_eGon100RE.png", dpi=300) pv_rora = pv_rora[pv_rora["installed capacity in kW"] > 0] pv_agri = pv_agri[pv_agri["installed capacity in kW"] > 0] pv_per_distr = pv_per_distr[ pv_per_distr["installed capacity in kW"] > 0 ] pv_rora_100RE = pv_rora_100RE[ pv_rora_100RE["installed capacity in kW"] > 0 ] pv_agri_100RE = pv_agri_100RE[ pv_agri_100RE["installed capacity in kW"] > 0 ] pv_per_distr_100RE = pv_per_distr_100RE[ pv_per_distr_100RE["installed capacity in kW"] > 0 ] return ( pv_rora, pv_agri, pv_exist, pv_per_distr, pv_rora_100RE, pv_agri_100RE, pv_exist_100RE, pv_per_distr_100RE, ) def insert_pv_parks( pv_rora, pv_agri, pv_exist, pv_per_distr, scenario_name ): """Write to database. Parameters ---------- pv_rora : gpd.GeoDataFrame() Pv parks on selected potential areas of raod and railway pv_agri : gpd.GeoDataFrame() Pv parks on selected potential areas of raod and railway pv_exist : gpd.GeoDataFrame() Existing Pv parks on selected areas pv_per_distr: gpd.GeoDataFrame() Additionally built pv parks on potential areas per mv grid district scenario_name: Scenario name of calculation """ # prepare dataframe for integration in supply.egon_power_plants pv_parks = pv_rora.append( [pv_agri, pv_exist, pv_per_distr], ignore_index=True ) pv_parks["el_capacity"] = pv_parks["installed capacity in kW"] / 1000 pv_parks.rename(columns={"centroid": "geometry"}, inplace=True) pv_parks = gpd.GeoDataFrame(pv_parks, geometry="geometry", crs=3035) pv_parks = pv_parks[["el_capacity", "voltage_level", "geometry"]] # integration in supply.egon_power_plants con = db.engine() # maximum ID in egon_power_plants sql = "SELECT MAX(id) FROM supply.egon_power_plants" max_id = pd.read_sql(sql, con) max_id = max_id["max"].iat[0] if max_id is None: max_id = 1 pv_park_id = max_id + 1 # copy relevant columns from pv_parks insert_pv_parks = pv_parks[ ["el_capacity", "voltage_level", "geometry"] ] insert_pv_parks = insert_pv_parks.set_geometry("geometry") insert_pv_parks["voltage_level"] = insert_pv_parks[ "voltage_level" ].apply(int) # set static column values insert_pv_parks["carrier"] = "solar" insert_pv_parks["scenario"] = scenario_name # change name and crs of geometry column insert_pv_parks.set_crs(epsg=3035, allow_override=True, inplace=True) insert_pv_parks = ( insert_pv_parks.rename({"geometry": "geom"}, axis=1) .set_geometry("geom") .to_crs(4326) ) # reset index insert_pv_parks.index = pd.RangeIndex( start=pv_park_id, stop=pv_park_id + len(insert_pv_parks), name="id" ) # insert into database insert_pv_parks.reset_index().to_postgis( "egon_power_plants", schema="supply", con=db.engine(), if_exists="append", ) return pv_parks # ######################################################################## # execute methodology ( pv_rora, pv_agri, pv_exist, pv_per_distr, pv_rora_100RE, pv_agri_100RE, pv_exist_100RE, pv_per_distr_100RE, ) = run_methodology( con=db.engine(), pow_per_area=0.04, join_buffer=10, max_dist_hv=20000, show_map=False, ) # ### examination of results if len(pv_per_distr) > 0: pv_per_distr_mv = pv_per_distr[pv_per_distr["voltage_level"] == 5] pv_per_distr_hv = pv_per_distr[pv_per_distr["voltage_level"] == 4] pv_rora_mv = pv_rora[pv_rora["voltage_level"] == 5] pv_rora_hv = pv_rora[pv_rora["voltage_level"] == 4] pv_agri_mv = pv_agri[pv_agri["voltage_level"] == 5] pv_agri_hv = pv_agri[pv_agri["voltage_level"] == 4] print(" ") print("eGon2035: Examination of overall voltage levels:") print("a) PVs on potential areas Road & Railway: ") print( "Total installed capacity: " + str(pv_rora["installed capacity in kW"].sum() / 1000) + " MW" ) print("Number of PV farms: " + str(len(pv_rora))) print(" - thereof MV: " + str(len(pv_rora_mv))) print(" - thereof HV: " + str(len(pv_rora_hv))) print("b) PVs on potential areas Agriculture: ") print( "Total installed capacity: " + str(pv_agri["installed capacity in kW"].sum() / 1000) + " MW" ) print("Number of PV farms: " + str(len(pv_agri))) print(" - thereof MV: " + str(len(pv_agri_mv))) print(" - thereof HV: " + str(len(pv_agri_hv))) print("c) Existing PVs not in potential areas: ") print("Number of PV farms: " + str(len(pv_exist))) print("d) PVs on additional potential areas per MV-District: ") if len(pv_per_distr) > 0: print( "Total installed capacity: " + str(pv_per_distr["installed capacity in kW"].sum() / 1000) + " MW" ) print("Number of PV farms: " + str(len(pv_per_distr))) print(" - thereof MV: " + str(len(pv_per_distr_mv))) print(" - thereof HV: " + str(len(pv_per_distr_hv))) else: print(" -> No additional expansion needed") print(" ") ### # save to DB if ( pv_rora["installed capacity in kW"].sum() > 0 or pv_agri["installed capacity in kW"].sum() > 0 or pv_per_distr["installed capacity in kW"].sum() > 0 or pv_exist["installed capacity in kW"].sum() > 0 ): pv_parks = insert_pv_parks( pv_rora, pv_agri, pv_exist, pv_per_distr, "eGon2035" ) else: pv_parks = gpd.GeoDataFrame() if ( pv_rora_100RE["installed capacity in kW"].sum() > 0 or pv_agri_100RE["installed capacity in kW"].sum() > 0 or pv_per_distr_100RE["installed capacity in kW"].sum() > 0 or pv_exist_100RE["installed capacity in kW"].sum() > 0 ): pv_parks_100RE = insert_pv_parks( pv_rora_100RE, pv_agri_100RE, pv_exist_100RE, pv_per_distr_100RE, "eGon100RE", ) else: pv_parks_100RE = gpd.GeoDataFrame() return pv_parks, pv_parks_100RE