Source code for

Central module containing all code dealing with processing era5 weather data.

from sqlalchemy import Column, ForeignKey, Integer
from sqlalchemy.ext.declarative import declarative_base
import geopandas as gpd
import numpy as np
import pandas as pd

from import db
from import Dataset
from import EgonEra5Cells, import_cutout
from import get_sector_parameters
from import DestatisZensusPopulationPerHa

[docs]class RenewableFeedin(Dataset): """ Calculate possible feedin time series for renewable energy generators This dataset calculates possible feedin timeseries for fluctuation renewable generators and coefficient of performance time series for heat pumps. Relevant input is the downloaded weather data. Parameters for the time series calcultaion are also defined by representative types of pv plants and wind turbines that are selected within this dataset. The resulting profiles are stored in the database. *Dependencies* * :py:class:`WeatherData <>` * :py:class:`Vg250 <>` * :py:class:`ZensusVg250 <>` *Resulting tables* * :py:class:`supply.egon_era5_renewable_feedin <>` is filled """ #: name: str = "RenewableFeedin" #: version: str = "0.0.7" def __init__(self, dependencies): super().__init__(, version=self.version, dependencies=dependencies, tasks={ wind, pv, solar_thermal, heat_pump_cop, wind_offshore, mapping_zensus_weather, }, )
Base = declarative_base() engine = db.engine()
[docs]class MapZensusWeatherCell(Base): __tablename__ = "egon_map_zensus_weather_cell" __table_args__ = {"schema": "boundaries"} zensus_population_id = Column( Integer, ForeignKey(, primary_key=True, index=True, ) w_id = Column(Integer, ForeignKey(EgonEra5Cells.w_id), index=True)
[docs]def weather_cells_in_germany(geom_column="geom"): """Get weather cells which intersect with Germany Returns ------- GeoPandas.GeoDataFrame Index and points of weather cells inside Germany """ cfg =["renewable_feedin"]["sources"] return db.select_geodataframe( f"""SELECT w_id, geom_point, geom FROM {cfg['weather_cells']['schema']}. {cfg['weather_cells']['table']} WHERE ST_Intersects('SRID=4326; POLYGON((5 56, 15.5 56, 15.5 47, 5 47, 5 56))', geom)""", geom_col=geom_column, index_col="w_id", )
[docs]def offshore_weather_cells(geom_column="geom"): """Get weather cells which intersect with Germany Returns ------- GeoPandas.GeoDataFrame Index and points of weather cells inside Germany """ cfg =["renewable_feedin"]["sources"] return db.select_geodataframe( f"""SELECT w_id, geom_point, geom FROM {cfg['weather_cells']['schema']}. {cfg['weather_cells']['table']} WHERE ST_Intersects('SRID=4326; POLYGON((5.5 55.5, 14.5 55.5, 14.5 53.5, 5.5 53.5, 5.5 55.5))', geom)""", geom_col=geom_column, index_col="w_id", )
[docs]def federal_states_per_weather_cell(): """Assings a federal state to each weather cell in Germany. Sets the federal state to the weather celss using the centroid. Weather cells at the borders whoes centroid is not inside Germany are assinged to the closest federal state. Returns ------- GeoPandas.GeoDataFrame Index, points and federal state of weather cells inside Germany """ cfg =["renewable_feedin"]["sources"] # Select weather cells and ferear states from database weather_cells = weather_cells_in_germany(geom_column="geom_point") federal_states = db.select_geodataframe( f"""SELECT gen, geometry FROM {cfg['vg250_lan_union']['schema']}. {cfg['vg250_lan_union']['table']}""", geom_col="geometry", index_col="gen", ) # Map federal state and onshore wind turbine to weather cells weather_cells["federal_state"] = gpd.sjoin( weather_cells, federal_states ).index_right # Assign a federal state to each cell inside Germany buffer = 1000 while (buffer < 30000) & ( len(weather_cells[weather_cells["federal_state"].isnull()]) > 0 ): cells = weather_cells[weather_cells["federal_state"].isnull()] cells.loc[:, "geom_point"] = cells.geom_point.buffer(buffer) weather_cells.loc[cells.index, "federal_state"] = gpd.sjoin( cells, federal_states ).index_right buffer += 200 weather_cells = ( weather_cells.reset_index() .drop_duplicates(subset="w_id", keep="first") .set_index("w_id") ) weather_cells = weather_cells.dropna(axis=0, subset=["federal_state"]) return weather_cells.to_crs(4326)
[docs]def turbine_per_weather_cell(): """Assign wind onshore turbine types to weather cells Returns ------- weather_cells : GeoPandas.GeoDataFrame Weather cells in Germany including turbine type """ # Select representative onshore wind turbines per federal state map_federal_states_turbines = { "Schleswig-Holstein": "E-126", "Bremen": "E-126", "Hamburg": "E-126", "Mecklenburg-Vorpommern": "E-126", "Niedersachsen": "E-126", "Berlin": "E-141", "Brandenburg": "E-141", "Hessen": "E-141", "Nordrhein-Westfalen": "E-141", "Sachsen": "E-141", "Sachsen-Anhalt": "E-141", "Thüringen": "E-141", "Baden-Württemberg": "E-141", "Bayern": "E-141", "Rheinland-Pfalz": "E-141", "Saarland": "E-141", } # Select weather cells and federal states weather_cells = federal_states_per_weather_cell() # Assign turbine type per federal state weather_cells["wind_turbine"] = weather_cells["federal_state"].map( map_federal_states_turbines ) return weather_cells
[docs]def feedin_per_turbine(): """Calculate feedin timeseries per turbine type and weather cell Returns ------- gdf : GeoPandas.GeoDataFrame Feed-in timeseries per turbine type and weather cell """ # Select weather data for Germany cutout = import_cutout(boundary="Germany") gdf = gpd.GeoDataFrame(geometry=cutout.grid_cells(), crs=4326) # Calculate feedin-timeseries for E-141 # source: # turbine_e141 = { "name": "E141 4200 kW", "hub_height": 129, "P": 4.200, "V": np.arange(1, 26, dtype=float), "POW": np.array( [ 0.0, 0.022, 0.104, 0.26, 0.523, 0.92, 1.471, 2.151, 2.867, 3.481, 3.903, 4.119, 4.196, 4.2, 4.2, 4.2, 4.2, 4.2, 4.2, 4.2, 4.2, 4.2, 4.2, 4.2, 4.2, ] ), } ts_e141 = cutout.wind( turbine_e141, per_unit=True, shapes=cutout.grid_cells() ) gdf["E-141"] = ts_e141.to_pandas().transpose().values.tolist() # Calculate feedin-timeseries for E-126 # source: # turbine_e126 = { "name": "E126 4200 kW", "hub_height": 159, "P": 4.200, "V": np.arange(1, 26, dtype=float), "POW": np.array( [ 0.0, 0.0, 0.058, 0.185, 0.4, 0.745, 1.2, 1.79, 2.45, 3.12, 3.66, 4.0, 4.15, 4.2, 4.2, 4.2, 4.2, 4.2, 4.2, 4.2, 4.2, 4.2, 4.2, 4.2, 4.2, ] ), } ts_e126 = cutout.wind( turbine_e126, per_unit=True, shapes=cutout.grid_cells() ) gdf["E-126"] = ts_e126.to_pandas().transpose().values.tolist() return gdf
[docs]def wind(): """Insert feed-in timeseries for wind onshore turbines to database Returns ------- None. """ cfg =["renewable_feedin"]["targets"] # Get weather cells with turbine type weather_cells = turbine_per_weather_cell() weather_cells = weather_cells[weather_cells.wind_turbine.notnull()] # Calculate feedin timeseries per turbine and weather cell timeseries_per_turbine = feedin_per_turbine() # Join weather cells and feedin-timeseries timeseries = gpd.sjoin(weather_cells, timeseries_per_turbine)[ ["E-141", "E-126"] ] weather_year = get_sector_parameters("global", "eGon2035")["weather_year"] df = pd.DataFrame( index=weather_cells.index, columns=["weather_year", "carrier", "feedin"], data={"weather_year": weather_year, "carrier": "wind_onshore"}, ) # Insert feedin for selected turbine per weather cell for turbine in ["E-126", "E-141"]: idx = weather_cells.index[ (weather_cells.wind_turbine == turbine) & (weather_cells.index.isin(timeseries.index)) ] df.loc[idx, "feedin"] = timeseries.loc[idx, turbine].values db.execute_sql( f""" DELETE FROM {cfg['feedin_table']['schema']}. {cfg['feedin_table']['table']} WHERE carrier = 'wind_onshore'""" ) # Insert values into database df.to_sql( cfg["feedin_table"]["table"], schema=cfg["feedin_table"]["schema"], con=db.engine(), if_exists="append", )
[docs]def wind_offshore(): """Insert feed-in timeseries for wind offshore turbines to database Returns ------- None. """ # Get offshore weather cells arround Germany weather_cells = offshore_weather_cells() # Select weather data for German coast cutout = import_cutout(boundary="Germany-offshore") # Select weather year from cutout weather_year ="-")[2] # Calculate feedin timeseries ts_wind_offshore = cutout.wind( "Vestas_V164_7MW_offshore", per_unit=True, shapes=weather_cells.to_crs(4326).geom, ) # Create dataframe and insert to database insert_feedin(ts_wind_offshore, "wind_offshore", weather_year)
[docs]def pv(): """Insert feed-in timeseries for pv plants to database Returns ------- None. """ # Get weather cells in Germany weather_cells = weather_cells_in_germany() # Select weather data for Germany cutout = import_cutout(boundary="Germany") # Select weather year from cutout weather_year ="-")[1] # Calculate feedin timeseries ts_pv = cutout.pv( "CSi", orientation={"slope": 35.0, "azimuth": 180.0}, per_unit=True, shapes=weather_cells.to_crs(4326).geom, ) # Create dataframe and insert to database insert_feedin(ts_pv, "pv", weather_year)
[docs]def solar_thermal(): """Insert feed-in timeseries for pv plants to database Returns ------- None. """ # Get weather cells in Germany weather_cells = weather_cells_in_germany() # Select weather data for Germany cutout = import_cutout(boundary="Germany") # Select weather year from cutout weather_year ="-")[1] # Calculate feedin timeseries ts_solar_thermal = cutout.solar_thermal( clearsky_model="simple", orientation={"slope": 45.0, "azimuth": 180.0}, per_unit=True, shapes=weather_cells.to_crs(4326).geom, capacity_factor=False, ) # Create dataframe and insert to database insert_feedin(ts_solar_thermal, "solar_thermal", weather_year)
[docs]def heat_pump_cop(): """ Calculate coefficient of performance for heat pumps according to T. Brown et al: "Synergies of sector coupling and transmission reinforcement in a cost-optimised, highlyrenewable European energy system", 2018, p. 8 Returns ------- None. """ # Assume temperature of heating system to 55°C according to Brown et. al t_sink = 55 carrier = "heat_pump_cop" # Load configuration cfg =["renewable_feedin"] # Get weather cells in Germany weather_cells = weather_cells_in_germany() # Select weather data for Germany cutout = import_cutout(boundary="Germany") # Select weather year from cutout weather_year ="-")[1] # Calculate feedin timeseries temperature = cutout.temperature( shapes=weather_cells.to_crs(4326).geom ).transpose() t_source = temperature.to_pandas() delta_t = t_sink - t_source # Calculate coefficient of performance for air sourced heat pumps # according to Brown et. al cop = 6.81 - 0.121 * delta_t + 0.00063 * delta_t**2 df = pd.DataFrame( index=temperature.to_pandas().index, columns=["weather_year", "carrier", "feedin"], data={"weather_year": weather_year, "carrier": carrier}, ) df.feedin = cop.values.tolist() # Delete existing rows for carrier db.execute_sql( f""" DELETE FROM {cfg['targets']['feedin_table']['schema']}. {cfg['targets']['feedin_table']['table']} WHERE carrier = '{carrier}'""" ) # Insert values into database df.to_sql( cfg["targets"]["feedin_table"]["table"], schema=cfg["targets"]["feedin_table"]["schema"], con=db.engine(), if_exists="append", )
[docs]def insert_feedin(data, carrier, weather_year): """Insert feedin data into database Parameters ---------- data : xarray.core.dataarray.DataArray Feedin timeseries data carrier : str Name of energy carrier weather_year : int Selected weather year Returns ------- None. """ # Transpose DataFrame data = data.transpose().to_pandas() # Load configuration cfg =["renewable_feedin"] # Initialize DataFrame df = pd.DataFrame( index=data.index, columns=["weather_year", "carrier", "feedin"], data={"weather_year": weather_year, "carrier": carrier}, ) # Convert solar thermal data from W/m^2 to MW/(1000m^2) = kW/m^2 if carrier == "solar_thermal": data *= 1e-3 # Insert feedin into DataFrame df.feedin = data.values.tolist() # Delete existing rows for carrier db.execute_sql( f""" DELETE FROM {cfg['targets']['feedin_table']['schema']}. {cfg['targets']['feedin_table']['table']} WHERE carrier = '{carrier}'""" ) # Insert values into database df.to_sql( cfg["targets"]["feedin_table"]["table"], schema=cfg["targets"]["feedin_table"]["schema"], con=db.engine(), if_exists="append", )
[docs]def mapping_zensus_weather(): """Perform mapping between era5 weather cell and zensus grid""" with db.session_scope() as session: cells_query = session.query("zensus_population_id"), DestatisZensusPopulationPerHa.geom_point, ) gdf_zensus_population = gpd.read_postgis( cells_query.statement, cells_query.session.bind, index_col=None, geom_col="geom_point", ) with db.session_scope() as session: cells_query = session.query(EgonEra5Cells.w_id, EgonEra5Cells.geom) gdf_weather_cell = gpd.read_postgis( cells_query.statement, cells_query.session.bind, index_col=None, geom_col="geom", ) # CRS is 4326 gdf_weather_cell = gdf_weather_cell.to_crs(epsg=3035) gdf_zensus_weather = gdf_zensus_population.sjoin( gdf_weather_cell, how="left", predicate="within" ) MapZensusWeatherCell.__table__.drop(bind=engine, checkfirst=True) MapZensusWeatherCell.__table__.create(bind=engine, checkfirst=True) # Write mapping into db with db.session_scope() as session: session.bulk_insert_mappings( MapZensusWeatherCell, gdf_zensus_weather[["zensus_population_id", "w_id"]].to_dict( orient="records" ), )