"""Import MaStR dataset and write to DB tables

Data dump from Marktstammdatenregister (2022-11-17) is imported into the
database. Only some technologies are taken into account and written to the
following tables:

* PV: table `supply.egon_power_plants_pv`
* wind turbines: table `supply.egon_power_plants_wind`
* biomass/biogas plants: table `supply.egon_power_plants_biomass`
* hydro plants: table `supply.egon_power_plants_hydro`

Handling of empty source data in MaStr dump:
* `voltage_level`: inferred based on nominal power (`capacity`) using the
  ranges from
  which results in True in column `voltage_level_inferred`. Remaining datasets
  are set to -1 (which only occurs if `capacity` is empty).
* `supply.egon_power_plants_*.bus_id`: set to -1 (only if not within grid
  districts or no geom available, e.g. for units with nom. power <30 kW)
* `supply.egon_power_plants_hydro.plant_type`: NaN

The data is used especially for the generation of status quo grids by ding0.
from __future__ import annotations

from pathlib import Path

from loguru import logger
import geopandas as gpd
import numpy as np
import pandas as pd

from import config, db
from import (
from import (

    config.settings()["egon-data"]["--dataset-boundary"] == "Everything"

[docs]def isfloat(num: str): """ Determine if string can be converted to float. Parameters ----------- num : str String to parse. Returns ------- bool Returns True in string can be parsed to float. """ try: float(num) return True except ValueError: return False
[docs]def zip_and_municipality_from_standort( standort: str, ) -> tuple[str, bool]: """ Get zip code and municipality from Standort string split into a list. Parameters ----------- standort : str Standort as given from MaStR data. Returns ------- str Standort with only the zip code and municipality as well a ', Germany' added. """ standort_list = standort.split() found = False count = 0 for count, elem in enumerate(standort_list): if len(elem) != 5: continue if not elem.isnumeric(): continue found = True break if found: cleaned_str = " ".join(standort_list[count:]) return cleaned_str, found logger.warning( "Couldn't identify zip code. This entry will be dropped." f" Original standort: {standort}." ) return standort, found
[docs]def infer_voltage_level( units_gdf: gpd.GeoDataFrame, ) -> gpd.GeoDataFrame: """ Infer nan values in voltage level derived from generator capacity to the power plants. Parameters ----------- units_gdf : geopandas.GeoDataFrame GeoDataFrame containing units with voltage levels from MaStR Returnsunits_gdf: gpd.GeoDataFrame ------- geopandas.GeoDataFrame GeoDataFrame containing units all having assigned a voltage level. """ def voltage_levels(p: float) -> int: if p <= 100: return 7 elif p <= 200: return 6 elif p <= 5500: return 5 elif p <= 20000: return 4 elif p <= 120000: return 3 return 1 units_gdf["voltage_level_inferred"] = False mask = units_gdf.voltage_level.isna() units_gdf.loc[mask, "voltage_level_inferred"] = True units_gdf.loc[mask, "voltage_level"] = units_gdf.loc[ mask ].Nettonennleistung.apply(voltage_levels) return units_gdf
[docs]def import_mastr() -> None: """Import MaStR data into database""" engine = db.engine() # import geocoded data cfg = config.datasets()["mastr_new"] path_parts = cfg["geocoding_path"] path = Path(*["."] + path_parts).resolve() path = list(path.iterdir())[0] deposit_id_geocoding = int([-1].split(".")[0].split("_")[-1]) deposit_id_mastr = cfg["deposit_id"] if deposit_id_geocoding != deposit_id_mastr: raise AssertionError( f"The zenodo (sandbox) deposit ID {deposit_id_mastr} for the MaStR" f" dataset is not matching with the geocoding version " f"{deposit_id_geocoding}. Make sure to hermonize the data. When " f"the MaStR dataset is updated also update the geocoding and " f"update the egon data bundle. The geocoding can be done using: " f"" ) geocoding_gdf = gpd.read_file(path) # remove failed requests geocoding_gdf = geocoding_gdf.loc[geocoding_gdf.geometry.is_valid] EgonMastrGeocoded.__table__.drop(bind=engine, checkfirst=True) EgonMastrGeocoded.__table__.create(bind=engine, checkfirst=True) geocoding_gdf.to_postgis( name=EgonMastrGeocoded.__tablename__, con=engine, if_exists="append", schema=EgonMastrGeocoded.__table_args__["schema"], index=True, ) cfg = config.datasets()["power_plants"] cols_mapping = { "all": { "EinheitMastrNummer": "gens_id", "EinheitBetriebsstatus": "status", "Inbetriebnahmedatum": "commissioning_date", "Postleitzahl": "postcode", "Ort": "city", "Gemeinde": "municipality", "Bundesland": "federal_state", "Nettonennleistung": "capacity", "Einspeisungsart": "feedin_type", }, "pv": { "Lage": "site_type", "Standort": "site", "Nutzungsbereich": "usage_sector", "Hauptausrichtung": "orientation_primary", "HauptausrichtungNeigungswinkel": "orientation_primary_angle", "Nebenausrichtung": "orientation_secondary", "NebenausrichtungNeigungswinkel": "orientation_secondary_angle", "EinheitlicheAusrichtungUndNeigungswinkel": "orientation_uniform", "AnzahlModule": "module_count", "zugeordneteWirkleistungWechselrichter": "capacity_inverter", }, "wind": { "Lage": "site_type", "Hersteller": "manufacturer_name", "Typenbezeichnung": "type_name", "Nabenhoehe": "hub_height", "Rotordurchmesser": "rotor_diameter", }, "biomass": { "Technologie": "technology", "Hauptbrennstoff": "main_fuel", "Biomasseart": "fuel_type", "ThermischeNutzleistung": "th_capacity", }, "hydro": { "ArtDerWasserkraftanlage": "plant_type", "ArtDesZuflusses": "water_origin", }, "combustion": { "Energietraeger": "carrier", "Hauptbrennstoff": "main_fuel", "WeitererHauptbrennstoff": "other_main_fuel", "Technologie": "technology", "ThermischeNutzleistung": "th_capacity", }, "gsgk": { "Energietraeger": "carrier", "Technologie": "technology", }, "nuclear": { "Energietraeger": "carrier", "Technologie": "technology", }, "storage": { "Energietraeger": "carrier", "Technologie": "technology", "Batterietechnologie": "battery_type", "Pumpspeichertechnologie": "pump_storage_type", }, } source_files = { "pv": WORKING_DIR_MASTR_NEW / cfg["sources"]["mastr_pv"], "wind": WORKING_DIR_MASTR_NEW / cfg["sources"]["mastr_wind"], "biomass": WORKING_DIR_MASTR_NEW / cfg["sources"]["mastr_biomass"], "hydro": WORKING_DIR_MASTR_NEW / cfg["sources"]["mastr_hydro"], "combustion": WORKING_DIR_MASTR_NEW / cfg["sources"]["mastr_combustion"], "gsgk": WORKING_DIR_MASTR_NEW / cfg["sources"]["mastr_gsgk"], "nuclear": WORKING_DIR_MASTR_NEW / cfg["sources"]["mastr_nuclear"], "storage": WORKING_DIR_MASTR_NEW / cfg["sources"]["mastr_storage"], } target_tables = { "pv": EgonPowerPlantsPv, "wind": EgonPowerPlantsWind, "biomass": EgonPowerPlantsBiomass, "hydro": EgonPowerPlantsHydro, "combustion": EgonPowerPlantsCombustion, "gsgk": EgonPowerPlantsGsgk, "nuclear": EgonPowerPlantsNuclear, "storage": EgonPowerPlantsStorage, } vlevel_mapping = { "Höchstspannung": 1, "UmspannungZurHochspannung": 2, "Hochspannung": 3, "UmspannungZurMittelspannung": 4, "Mittelspannung": 5, "UmspannungZurNiederspannung": 6, "Niederspannung": 7, } # import locations locations = pd.read_csv( WORKING_DIR_MASTR_NEW / cfg["sources"]["mastr_location"], index_col=None, ) # import grid districts mv_grid_districts = db.select_geodataframe( f""" SELECT * FROM {cfg['sources']['egon_mv_grid_district']} """, epsg=4326, ) # import units technologies = [ "pv", "wind", "biomass", "hydro", "combustion", "gsgk", "nuclear", "storage", ] for tech in technologies: # read units"===== Importing MaStR dataset: {tech} =====") logger.debug("Reading CSV and filtering data...") units = pd.read_csv( source_files[tech], usecols=( ["LokationMastrNummer", "Laengengrad", "Breitengrad", "Land"] + list(cols_mapping["all"].keys()) + list(cols_mapping[tech].keys()) ), index_col=None, dtype={"Postleitzahl": str}, low_memory=False, ).rename(columns=cols_mapping) # drop units outside of Germany len_old = len(units) units = units.loc[units.Land == "Deutschland"] logger.debug( f"{len_old - len(units)} units outside of Germany dropped..." ) # get boundary boundary = ( federal_state_data([0, "geom"] ) # drop units installed after reference date from cfg # (eGon2021 scenario) len_old = len(units) ts = pd.Timestamp(config.datasets()["mastr_new"]["egon2021_date_max"]) units = units.loc[pd.to_datetime(units.Inbetriebnahmedatum) <= ts] logger.debug( f"{len_old - len(units)} units installed after {ts} dropped..." ) # drop not operating units len_old = len(units) units = units.loc[ units.EinheitBetriebsstatus.isin( ["InBetrieb", "VoruebergehendStillgelegt"] ) ] logger.debug(f"{len_old - len(units)} not operating units dropped...") # filter for SH units if in testmode if not TESTMODE_OFF: "TESTMODE: Dropping all units outside of Schleswig-Holstein..." ) units = units.loc[units.Bundesland == "SchleswigHolstein"] # merge and rename voltage level logger.debug("Merging with locations and allocate voltage level...") units = units.merge( locations[["MaStRNummer", "Spannungsebene"]], left_on="LokationMastrNummer", right_on="MaStRNummer", how="left", ) # convert voltage levels to numbers units["voltage_level"] = units.Spannungsebene.replace(vlevel_mapping) # set voltage level for nan values units = infer_voltage_level(units) # add geometry logger.debug("Adding geometries...") units = gpd.GeoDataFrame( units, geometry=gpd.points_from_xy( units["Laengengrad"], units["Breitengrad"], crs=4326 ), crs=4326, ) units["geometry_geocoded"] = ( units.Laengengrad.isna() | units.Laengengrad.isna() ) units.loc[~units.geometry_geocoded, "geometry_geocoded"] = ~units.loc[ ~units.geometry_geocoded, "geometry" ].is_valid units_wo_geom = units["geometry_geocoded"].sum() logger.debug( f"{units_wo_geom}/{len(units)} units do not have a geometry!" " Adding geocoding results." ) # determine zip and municipality string mask = ( units.Postleitzahl.apply(isfloat) & ~units.Postleitzahl.isna() & ~units.Gemeinde.isna() ) units["zip_and_municipality"] = np.nan ok_units = units.loc[mask] units.loc[mask, "zip_and_municipality"] = ( ok_units.Postleitzahl.astype(int).astype(str).str.zfill(5) + " " + ok_units.Gemeinde.astype(str).str.rstrip().str.lstrip() + ", Deutschland" ) # get zip and municipality from Standort parse_df = units.loc[~mask] if not parse_df.empty and "Standort" in parse_df.columns: init_len = len(parse_df) f"Parsing ZIP code and municipality from Standort for " f"{init_len} values for {tech}." ) parse_df[["zip_and_municipality", "drop_this"]] = ( parse_df.Standort.astype(str) .apply(zip_and_municipality_from_standort) .tolist() ) parse_df = parse_df.loc[parse_df.drop_this] if not parse_df.empty: units.loc[ parse_df.index, "zip_and_municipality" ] = parse_df.zip_and_municipality # add geocoding to missing units = units.merge( right=geocoding_gdf[["zip_and_municipality", "geometry"]].rename( columns={"geometry": "temp"} ), how="left", on="zip_and_municipality", ) units.loc[units.geometry_geocoded, "geometry"] = units.loc[ units.geometry_geocoded, "temp" ] init_len = len(units) "Dropping units outside boundary by geometry or without geometry" "..." ) units.dropna(subset=["geometry"], inplace=True) units = units.loc[units.geometry.within(boundary)] if init_len > 0: logger.debug( f"{init_len - len(units)}/{init_len} " f"({((init_len - len(units)) / init_len) * 100: g} %) dropped." ) # drop unnecessary and rename columns logger.debug("Reformatting...") units.drop( columns=[ "LokationMastrNummer", "MaStRNummer", "Laengengrad", "Breitengrad", "Spannungsebene", "Land", "temp", ], inplace=True, ) mapping = cols_mapping["all"].copy() mapping.update(cols_mapping[tech]) mapping.update({"geometry": "geom"}) units.rename(columns=mapping, inplace=True) units["voltage_level"] = units.voltage_level.fillna(-1).astype(int) units.set_geometry("geom", inplace=True) units["id"] = range(len(units)) # change capacity unit: kW to MW units["capacity"] = units["capacity"] / 1e3 if "capacity_inverter" in units.columns: units["capacity_inverter"] = units["capacity_inverter"] / 1e3 if "th_capacity" in units.columns: units["th_capacity"] = units["th_capacity"] / 1e3 # assign bus ids logger.debug("Assigning bus ids...") units = units.assign( bus_id=units.loc[~units.geom.x.isna()] .sjoin(mv_grid_districts[["bus_id", "geom"]], how="left") .drop(columns=["index_right"]) .bus_id ) units["bus_id"] = units.bus_id.fillna(-1).astype(int) # write to DB"Writing {len(units)} units to DB...") units.to_postgis( name=target_tables[tech].__tablename__, con=engine, if_exists="append", schema=target_tables[tech].__table_args__["schema"], )