Source code for egon.data.datasets.power_plants.mastr

"""Import MaStR dataset and write to DB tables

Data dump from Marktstammdatenregister (2022-11-17) is imported into the
database. Only some technologies are taken into account and written to the
following tables:

* PV: table `supply.egon_power_plants_pv`
* wind turbines: table `supply.egon_power_plants_wind`
* biomass/biogas plants: table `supply.egon_power_plants_biomass`
* hydro plants: table `supply.egon_power_plants_hydro`

Handling of empty source data in MaStr dump:
* `voltage_level`: inferred based on nominal power (`capacity`) using the
  ranges from
  https://redmine.iks.cs.ovgu.de/oe/projects/ego-n/wiki/Definition_of_thresholds_for_voltage_level_assignment
  which results in True in column `voltage_level_inferred`. Remaining datasets
  are set to -1 (which only occurs if `capacity` is empty).
* `supply.egon_power_plants_*.bus_id`: set to -1 (only if not within grid
  districts or no geom available, e.g. for units with nom. power <30 kW)
* `supply.egon_power_plants_hydro.plant_type`: NaN

The data is used especially for the generation of status quo grids by ding0.
"""
from geoalchemy2 import Geometry
from sqlalchemy import (
    Boolean,
    Column,
    DateTime,
    Float,
    Integer,
    Sequence,
    String,
)
from sqlalchemy.ext.declarative import declarative_base
import geopandas as gpd
import pandas as pd

from egon.data import db
from egon.data.datasets.mastr import WORKING_DIR_MASTR_NEW
import egon.data.config

Base = declarative_base()

TESTMODE_OFF = (
    egon.data.config.settings()["egon-data"]["--dataset-boundary"]
    == "Everything"
)


[docs]class EgonPowerPlantsPv(Base): __tablename__ = "egon_power_plants_pv" __table_args__ = {"schema": "supply"} id = Column(Integer, Sequence("pp_pv_seq"), primary_key=True) bus_id = Column(Integer, nullable=True) # Grid district id gens_id = Column(String, nullable=True) # EinheitMastrNummer status = Column(String, nullable=True) # EinheitBetriebsstatus commissioning_date = Column(DateTime, nullable=True) # Inbetriebnahmedatum postcode = Column(String(5), nullable=True) # Postleitzahl city = Column(String(50), nullable=True) # Ort federal_state = Column(String(31), nullable=True) # Bundesland site_type = Column(String(69), nullable=True) # Lage usage_sector = Column(String(36), nullable=True) # Nutzungsbereich orientation_primary = Column(String(11), nullable=True) # Hauptausrichtung orientation_primary_angle = Column( String(18), nullable=True ) # HauptausrichtungNeigungswinkel orientation_secondary = Column( String(11), nullable=True ) # Nebenausrichtung orientation_secondary_angle = Column( String(18), nullable=True ) # NebenausrichtungNeigungswinkel orientation_uniform = Column( Boolean, nullable=True ) # EinheitlicheAusrichtungUndNeigungswinkel module_count = Column(Float, nullable=True) # AnzahlModule capacity = Column(Float, nullable=True) # Nettonennleistung capacity_inverter = Column( Float, nullable=True ) # ZugeordneteWirkleistungWechselrichter in MW feedin_type = Column(String(47), nullable=True) # Einspeisungsart voltage_level = Column(Integer, nullable=True) voltage_level_inferred = Column(Boolean, nullable=True) geom = Column(Geometry("POINT", 4326), index=True, nullable=True)
[docs]class EgonPowerPlantsWind(Base): __tablename__ = "egon_power_plants_wind" __table_args__ = {"schema": "supply"} id = Column(Integer, Sequence("pp_wind_seq"), primary_key=True) bus_id = Column(Integer, nullable=True) # Grid district id gens_id = Column(String, nullable=True) # EinheitMastrNummer status = Column(String, nullable=True) # EinheitBetriebsstatus commissioning_date = Column(DateTime, nullable=True) # Inbetriebnahmedatum postcode = Column(String(5), nullable=True) # Postleitzahl city = Column(String(50), nullable=True) # Ort federal_state = Column(String(31), nullable=True) # Bundesland site_type = Column(String(17), nullable=True) # Lage manufacturer_name = Column(String(100), nullable=True) # Hersteller type_name = Column(String(100), nullable=True) # Typenbezeichnung hub_height = Column(Float, nullable=True) # Nabenhoehe rotor_diameter = Column(Float, nullable=True) # Rotordurchmesser capacity = Column(Float, nullable=True) # Nettonennleistung feedin_type = Column(String(47), nullable=True) # Einspeisungsart voltage_level = Column(Integer, nullable=True) voltage_level_inferred = Column(Boolean, nullable=True) geom = Column(Geometry("POINT", 4326), index=True, nullable=True)
[docs]class EgonPowerPlantsBiomass(Base): __tablename__ = "egon_power_plants_biomass" __table_args__ = {"schema": "supply"} id = Column(Integer, Sequence("pp_biomass_seq"), primary_key=True) bus_id = Column(Integer, nullable=True) # Grid district id gens_id = Column(String, nullable=True) # EinheitMastrNummer status = Column(String, nullable=True) # EinheitBetriebsstatus commissioning_date = Column(DateTime, nullable=True) # Inbetriebnahmedatum postcode = Column(String(5), nullable=True) # Postleitzahl city = Column(String(50), nullable=True) # Ort federal_state = Column(String(31), nullable=True) # Bundesland technology = Column(String(45), nullable=True) # Technologie fuel_name = Column(String(52), nullable=True) # Hauptbrennstoff fuel_type = Column(String(19), nullable=True) # Biomasseart capacity = Column(Float, nullable=True) # Nettonennleistung th_capacity = Column(Float, nullable=True) # ThermischeNutzleistung feedin_type = Column(String(47), nullable=True) # Einspeisungsart voltage_level = Column(Integer, nullable=True) voltage_level_inferred = Column(Boolean, nullable=True) geom = Column(Geometry("POINT", 4326), index=True, nullable=True)
[docs]class EgonPowerPlantsHydro(Base): __tablename__ = "egon_power_plants_hydro" __table_args__ = {"schema": "supply"} id = Column(Integer, Sequence("pp_hydro_seq"), primary_key=True) bus_id = Column(Integer, nullable=True) # Grid district id gens_id = Column(String, nullable=True) # EinheitMastrNummer status = Column(String, nullable=True) # EinheitBetriebsstatus commissioning_date = Column(DateTime, nullable=True) # Inbetriebnahmedatum postcode = Column(String(5), nullable=True) # Postleitzahl city = Column(String(50), nullable=True) # Ort federal_state = Column(String(31), nullable=True) # Bundesland plant_type = Column(String(39), nullable=True) # ArtDerWasserkraftanlage water_origin = Column(String(20), nullable=True) # ArtDesZuflusses capacity = Column(Float, nullable=True) # Nettonennleistung feedin_type = Column(String(47), nullable=True) # Einspeisungsart voltage_level = Column(Integer, nullable=True) voltage_level_inferred = Column(Boolean, nullable=True) geom = Column(Geometry("POINT", 4326), index=True, nullable=True)
[docs]def import_mastr() -> None: """Import MaStR data into database""" def infer_voltage_level( units_gdf: gpd.GeoDataFrame, ) -> gpd.GeoDataFrame: """ Infer nan values in voltage level derived from generator capacity to the power plants. Parameters ----------- units_gdf : geopandas.GeoDataFrame GeoDataFrame containing units with voltage levels from MaStR Returnsunits_gdf: gpd.GeoDataFrame ------- geopandas.GeoDataFrame GeoDataFrame containing units all having assigned a voltage level. """ def voltage_levels(p: float) -> int: if p <= 100: return 7 elif p <= 200: return 6 elif p <= 5500: return 5 elif p <= 20000: return 4 elif p <= 120000: return 3 return 1 units_gdf["voltage_level_inferred"] = False mask = units_gdf.voltage_level.isna() units_gdf.loc[mask, "voltage_level_inferred"] = True units_gdf.loc[mask, "voltage_level"] = units_gdf.loc[ mask ].Nettonennleistung.apply(voltage_levels) return units_gdf engine = db.engine() cfg = egon.data.config.datasets()["power_plants"] cols_mapping = { "all": { "EinheitMastrNummer": "gens_id", "EinheitBetriebsstatus": "status", "Inbetriebnahmedatum": "commissioning_date", "Postleitzahl": "postcode", "Ort": "city", "Bundesland": "federal_state", "Nettonennleistung": "capacity", "Einspeisungsart": "feedin_type", }, "pv": { "Lage": "site_type", "Nutzungsbereich": "usage_sector", "Hauptausrichtung": "orientation_primary", "HauptausrichtungNeigungswinkel": "orientation_primary_angle", "Nebenausrichtung": "orientation_secondary", "NebenausrichtungNeigungswinkel": "orientation_secondary_angle", "EinheitlicheAusrichtungUndNeigungswinkel": "orientation_uniform", "AnzahlModule": "module_count", "zugeordneteWirkleistungWechselrichter": "capacity_inverter", }, "wind": { "Lage": "site_type", "Hersteller": "manufacturer_name", "Typenbezeichnung": "type_name", "Nabenhoehe": "hub_height", "Rotordurchmesser": "rotor_diameter", }, "biomass": { "Technologie": "technology", "Hauptbrennstoff": "fuel_name", "Biomasseart": "fuel_type", "ThermischeNutzleistung": "th_capacity", }, "hydro": { "ArtDerWasserkraftanlage": "plant_type", "ArtDesZuflusses": "water_origin", }, } source_files = { "pv": WORKING_DIR_MASTR_NEW / cfg["sources"]["mastr_pv"], "wind": WORKING_DIR_MASTR_NEW / cfg["sources"]["mastr_wind"], "biomass": WORKING_DIR_MASTR_NEW / cfg["sources"]["mastr_biomass"], "hydro": WORKING_DIR_MASTR_NEW / cfg["sources"]["mastr_hydro"], } target_tables = { "pv": EgonPowerPlantsPv, "wind": EgonPowerPlantsWind, "biomass": EgonPowerPlantsBiomass, "hydro": EgonPowerPlantsHydro, } vlevel_mapping = { "Höchstspannung": 1, "UmspannungZurHochspannung": 2, "Hochspannung": 3, "UmspannungZurMittelspannung": 4, "Mittelspannung": 5, "UmspannungZurNiederspannung": 6, "Niederspannung": 7, } # import locations locations = pd.read_csv( WORKING_DIR_MASTR_NEW / cfg["sources"]["mastr_location"], index_col=None, ) # import grid districts mv_grid_districts = db.select_geodataframe( f""" SELECT * FROM {cfg['sources']['egon_mv_grid_district']} """, epsg=4326, ) # import units technologies = ["pv", "wind", "biomass", "hydro"] for tech in technologies: # read units print(f"===== Importing MaStR dataset: {tech} =====") print(" Reading CSV and filtering data...") units = pd.read_csv( source_files[tech], usecols=( ["LokationMastrNummer", "Laengengrad", "Breitengrad", "Land"] + list(cols_mapping["all"].keys()) + list(cols_mapping[tech].keys()) ), index_col=None, dtype={"Postleitzahl": str}, ).rename(columns=cols_mapping) # drop units outside of Germany len_old = len(units) units = units.loc[units.Land == "Deutschland"] print(f" {len_old-len(units)} units outside of Germany dropped...") # filter for SH units if in testmode if not TESTMODE_OFF: print( """ TESTMODE: Dropping all units outside of Schleswig-Holstein... """ ) units = units.loc[units.Bundesland == "SchleswigHolstein"] # merge and rename voltage level print(" Merging with locations and allocate voltage level...") units = units.merge( locations[["MaStRNummer", "Spannungsebene"]], left_on="LokationMastrNummer", right_on="MaStRNummer", how="left", ) # convert voltage levels to numbers units["voltage_level"] = units.Spannungsebene.replace(vlevel_mapping) # set voltage level for nan values units = infer_voltage_level(units) # add geometry print(" Adding geometries...") units = gpd.GeoDataFrame( units, geometry=gpd.points_from_xy( units["Laengengrad"], units["Breitengrad"], crs=4326 ), crs=4326, ) units_wo_geom = len( units.loc[(units.Laengengrad.isna() | units.Laengengrad.isna())] ) print( f" {units_wo_geom}/{len(units)} units do not have a geometry!" ) # drop unnecessary and rename columns print(" Reformatting...") units.drop( columns=[ "LokationMastrNummer", "MaStRNummer", "Laengengrad", "Breitengrad", "Spannungsebene", "Land", ], inplace=True, ) mapping = cols_mapping["all"].copy() mapping.update(cols_mapping[tech]) mapping.update({"geometry": "geom"}) units.rename(columns=mapping, inplace=True) units["voltage_level"] = units.voltage_level.fillna(-1).astype(int) units.set_geometry("geom", inplace=True) units["id"] = range(0, len(units)) # change capacity unit: kW to MW units["capacity"] = units["capacity"] / 1e3 if "capacity_inverter" in units.columns: units["capacity_inverter"] = units["capacity_inverter"] / 1e3 if "th_capacity" in units.columns: units["th_capacity"] = units["th_capacity"] / 1e3 # assign bus ids print(" Assigning bus ids...") units = units.assign( bus_id=units.loc[~units.geom.x.isna()] .sjoin(mv_grid_districts[["bus_id", "geom"]], how="left") .drop(columns=["index_right"]) .bus_id ) units["bus_id"] = units.bus_id.fillna(-1).astype(int) # write to DB print(f" Writing {len(units)} units to DB...") units.to_postgis( name=target_tables[tech].__tablename__, con=engine, if_exists="append", schema=target_tables[tech].__table_args__["schema"], )