"""Import MaStR dataset and write to DB tables
Data dump from Marktstammdatenregister (2022-11-17) is imported into the
database. Only some technologies are taken into account and written to the
following tables:
* PV: table `supply.egon_power_plants_pv`
* wind turbines: table `supply.egon_power_plants_wind`
* biomass/biogas plants: table `supply.egon_power_plants_biomass`
* hydro plants: table `supply.egon_power_plants_hydro`
Handling of empty source data in MaStr dump:
* `voltage_level`: inferred based on nominal power (`capacity`) using the
ranges from
https://redmine.iks.cs.ovgu.de/oe/projects/ego-n/wiki/Definition_of_thresholds_for_voltage_level_assignment
which results in True in column `voltage_level_inferred`. Remaining datasets
are set to -1 (which only occurs if `capacity` is empty).
* `supply.egon_power_plants_*.bus_id`: set to -1 (only if not within grid
districts or no geom available, e.g. for units with nom. power <30 kW)
* `supply.egon_power_plants_hydro.plant_type`: NaN
The data is used especially for the generation of status quo grids by ding0.
"""
from __future__ import annotations
from pathlib import Path
from loguru import logger
import geopandas as gpd
import numpy as np
import pandas as pd
from egon.data import config, db
from egon.data.datasets.mastr import WORKING_DIR_MASTR_NEW
from egon.data.datasets.power_plants.mastr_db_classes import (
EgonMastrGeocoded,
EgonPowerPlantsBiomass,
EgonPowerPlantsCombustion,
EgonPowerPlantsGsgk,
EgonPowerPlantsHydro,
EgonPowerPlantsNuclear,
EgonPowerPlantsPv,
EgonPowerPlantsStorage,
EgonPowerPlantsWind,
)
from egon.data.datasets.power_plants.pv_rooftop_buildings import (
federal_state_data,
)
TESTMODE_OFF = (
config.settings()["egon-data"]["--dataset-boundary"] == "Everything"
)
[docs]def isfloat(num: str):
"""
Determine if string can be converted to float.
Parameters
-----------
num : str
String to parse.
Returns
-------
bool
Returns True in string can be parsed to float.
"""
try:
float(num)
return True
except ValueError:
return False
[docs]def zip_and_municipality_from_standort(
standort: str,
) -> tuple[str, bool]:
"""
Get zip code and municipality from Standort string split into a list.
Parameters
-----------
standort : str
Standort as given from MaStR data.
Returns
-------
str
Standort with only the zip code and municipality
as well a ', Germany' added.
"""
standort_list = standort.split()
found = False
count = 0
for count, elem in enumerate(standort_list):
if len(elem) != 5:
continue
if not elem.isnumeric():
continue
found = True
break
if found:
cleaned_str = " ".join(standort_list[count:])
return cleaned_str, found
logger.warning(
"Couldn't identify zip code. This entry will be dropped."
f" Original standort: {standort}."
)
return standort, found
[docs]def infer_voltage_level(
units_gdf: gpd.GeoDataFrame,
) -> gpd.GeoDataFrame:
"""
Infer nan values in voltage level derived from generator capacity to
the power plants.
Parameters
-----------
units_gdf : geopandas.GeoDataFrame
GeoDataFrame containing units with voltage levels from MaStR
Returnsunits_gdf: gpd.GeoDataFrame
-------
geopandas.GeoDataFrame
GeoDataFrame containing units all having assigned a voltage level.
"""
def voltage_levels(p: float) -> int:
if p <= 100:
return 7
elif p <= 200:
return 6
elif p <= 5500:
return 5
elif p <= 20000:
return 4
elif p <= 120000:
return 3
return 1
units_gdf["voltage_level_inferred"] = False
mask = units_gdf.voltage_level.isna()
units_gdf.loc[mask, "voltage_level_inferred"] = True
units_gdf.loc[mask, "voltage_level"] = units_gdf.loc[
mask
].Nettonennleistung.apply(voltage_levels)
return units_gdf
[docs]def import_mastr() -> None:
"""Import MaStR data into database"""
engine = db.engine()
# import geocoded data
cfg = config.datasets()["mastr_new"]
path_parts = cfg["geocoding_path"]
path = Path(*["."] + path_parts).resolve()
path = list(path.iterdir())[0]
deposit_id_geocoding = int(path.parts[-1].split(".")[0].split("_")[-1])
deposit_id_mastr = cfg["deposit_id"]
if deposit_id_geocoding != deposit_id_mastr:
raise AssertionError(
f"The zenodo (sandbox) deposit ID {deposit_id_mastr} for the MaStR"
f" dataset is not matching with the geocoding version "
f"{deposit_id_geocoding}. Make sure to hermonize the data. When "
f"the MaStR dataset is updated also update the geocoding and "
f"update the egon data bundle. The geocoding can be done using: "
f"https://github.com/RLI-sandbox/mastr-geocoding"
)
geocoding_gdf = gpd.read_file(path)
# remove failed requests
geocoding_gdf = geocoding_gdf.loc[geocoding_gdf.geometry.is_valid]
EgonMastrGeocoded.__table__.drop(bind=engine, checkfirst=True)
EgonMastrGeocoded.__table__.create(bind=engine, checkfirst=True)
geocoding_gdf.to_postgis(
name=EgonMastrGeocoded.__tablename__,
con=engine,
if_exists="append",
schema=EgonMastrGeocoded.__table_args__["schema"],
index=True,
)
cfg = config.datasets()["power_plants"]
cols_mapping = {
"all": {
"EinheitMastrNummer": "gens_id",
"EinheitBetriebsstatus": "status",
"Inbetriebnahmedatum": "commissioning_date",
"Postleitzahl": "postcode",
"Ort": "city",
"Gemeinde": "municipality",
"Bundesland": "federal_state",
"Nettonennleistung": "capacity",
"Einspeisungsart": "feedin_type",
},
"pv": {
"Lage": "site_type",
"Standort": "site",
"Nutzungsbereich": "usage_sector",
"Hauptausrichtung": "orientation_primary",
"HauptausrichtungNeigungswinkel": "orientation_primary_angle",
"Nebenausrichtung": "orientation_secondary",
"NebenausrichtungNeigungswinkel": "orientation_secondary_angle",
"EinheitlicheAusrichtungUndNeigungswinkel": "orientation_uniform",
"AnzahlModule": "module_count",
"zugeordneteWirkleistungWechselrichter": "capacity_inverter",
},
"wind": {
"Lage": "site_type",
"Hersteller": "manufacturer_name",
"Typenbezeichnung": "type_name",
"Nabenhoehe": "hub_height",
"Rotordurchmesser": "rotor_diameter",
},
"biomass": {
"Technologie": "technology",
"Hauptbrennstoff": "main_fuel",
"Biomasseart": "fuel_type",
"ThermischeNutzleistung": "th_capacity",
},
"hydro": {
"ArtDerWasserkraftanlage": "plant_type",
"ArtDesZuflusses": "water_origin",
},
"combustion": {
"Energietraeger": "carrier",
"Hauptbrennstoff": "main_fuel",
"WeitererHauptbrennstoff": "other_main_fuel",
"Technologie": "technology",
"ThermischeNutzleistung": "th_capacity",
},
"gsgk": {
"Energietraeger": "carrier",
"Technologie": "technology",
},
"nuclear": {
"Energietraeger": "carrier",
"Technologie": "technology",
},
"storage": {
"Energietraeger": "carrier",
"Technologie": "technology",
"Batterietechnologie": "battery_type",
"Pumpspeichertechnologie": "pump_storage_type",
},
}
source_files = {
"pv": WORKING_DIR_MASTR_NEW / cfg["sources"]["mastr_pv"],
"wind": WORKING_DIR_MASTR_NEW / cfg["sources"]["mastr_wind"],
"biomass": WORKING_DIR_MASTR_NEW / cfg["sources"]["mastr_biomass"],
"hydro": WORKING_DIR_MASTR_NEW / cfg["sources"]["mastr_hydro"],
"combustion": WORKING_DIR_MASTR_NEW
/ cfg["sources"]["mastr_combustion"],
"gsgk": WORKING_DIR_MASTR_NEW / cfg["sources"]["mastr_gsgk"],
"nuclear": WORKING_DIR_MASTR_NEW / cfg["sources"]["mastr_nuclear"],
"storage": WORKING_DIR_MASTR_NEW / cfg["sources"]["mastr_storage"],
}
target_tables = {
"pv": EgonPowerPlantsPv,
"wind": EgonPowerPlantsWind,
"biomass": EgonPowerPlantsBiomass,
"hydro": EgonPowerPlantsHydro,
"combustion": EgonPowerPlantsCombustion,
"gsgk": EgonPowerPlantsGsgk,
"nuclear": EgonPowerPlantsNuclear,
"storage": EgonPowerPlantsStorage,
}
vlevel_mapping = {
"Höchstspannung": 1,
"UmspannungZurHochspannung": 2,
"Hochspannung": 3,
"UmspannungZurMittelspannung": 4,
"Mittelspannung": 5,
"UmspannungZurNiederspannung": 6,
"Niederspannung": 7,
}
# import locations
locations = pd.read_csv(
WORKING_DIR_MASTR_NEW / cfg["sources"]["mastr_location"],
index_col=None,
)
# import grid districts
mv_grid_districts = db.select_geodataframe(
f"""
SELECT * FROM {cfg['sources']['egon_mv_grid_district']}
""",
epsg=4326,
)
# import units
technologies = [
"pv",
"wind",
"biomass",
"hydro",
"combustion",
"gsgk",
"nuclear",
"storage",
]
for tech in technologies:
# read units
logger.info(f"===== Importing MaStR dataset: {tech} =====")
logger.debug("Reading CSV and filtering data...")
units = pd.read_csv(
source_files[tech],
usecols=(
["LokationMastrNummer", "Laengengrad", "Breitengrad", "Land"]
+ list(cols_mapping["all"].keys())
+ list(cols_mapping[tech].keys())
),
index_col=None,
dtype={"Postleitzahl": str},
low_memory=False,
).rename(columns=cols_mapping)
# drop units outside of Germany
len_old = len(units)
units = units.loc[units.Land == "Deutschland"]
logger.debug(
f"{len_old - len(units)} units outside of Germany dropped..."
)
# get boundary
boundary = (
federal_state_data(geocoding_gdf.crs).dissolve().at[0, "geom"]
)
# drop units installed after reference date from cfg
# (eGon2021 scenario)
len_old = len(units)
ts = pd.Timestamp(config.datasets()["mastr_new"]["egon2021_date_max"])
units = units.loc[pd.to_datetime(units.Inbetriebnahmedatum) <= ts]
logger.debug(
f"{len_old - len(units)} units installed after {ts} dropped..."
)
# drop not operating units
len_old = len(units)
units = units.loc[
units.EinheitBetriebsstatus.isin(
["InBetrieb", "VoruebergehendStillgelegt"]
)
]
logger.debug(f"{len_old - len(units)} not operating units dropped...")
# filter for SH units if in testmode
if not TESTMODE_OFF:
logger.info(
"TESTMODE: Dropping all units outside of Schleswig-Holstein..."
)
units = units.loc[units.Bundesland == "SchleswigHolstein"]
# merge and rename voltage level
logger.debug("Merging with locations and allocate voltage level...")
units = units.merge(
locations[["MaStRNummer", "Spannungsebene"]],
left_on="LokationMastrNummer",
right_on="MaStRNummer",
how="left",
)
# convert voltage levels to numbers
units["voltage_level"] = units.Spannungsebene.replace(vlevel_mapping)
# set voltage level for nan values
units = infer_voltage_level(units)
# add geometry
logger.debug("Adding geometries...")
units = gpd.GeoDataFrame(
units,
geometry=gpd.points_from_xy(
units["Laengengrad"], units["Breitengrad"], crs=4326
),
crs=4326,
)
units["geometry_geocoded"] = (
units.Laengengrad.isna() | units.Laengengrad.isna()
)
units.loc[~units.geometry_geocoded, "geometry_geocoded"] = ~units.loc[
~units.geometry_geocoded, "geometry"
].is_valid
units_wo_geom = units["geometry_geocoded"].sum()
logger.debug(
f"{units_wo_geom}/{len(units)} units do not have a geometry!"
" Adding geocoding results."
)
# determine zip and municipality string
mask = (
units.Postleitzahl.apply(isfloat)
& ~units.Postleitzahl.isna()
& ~units.Gemeinde.isna()
)
units["zip_and_municipality"] = np.nan
ok_units = units.loc[mask]
units.loc[mask, "zip_and_municipality"] = (
ok_units.Postleitzahl.astype(int).astype(str).str.zfill(5)
+ " "
+ ok_units.Gemeinde.astype(str).str.rstrip().str.lstrip()
+ ", Deutschland"
)
# get zip and municipality from Standort
parse_df = units.loc[~mask]
if not parse_df.empty and "Standort" in parse_df.columns:
init_len = len(parse_df)
logger.info(
f"Parsing ZIP code and municipality from Standort for "
f"{init_len} values for {tech}."
)
parse_df[["zip_and_municipality", "drop_this"]] = (
parse_df.Standort.astype(str)
.apply(zip_and_municipality_from_standort)
.tolist()
)
parse_df = parse_df.loc[parse_df.drop_this]
if not parse_df.empty:
units.loc[
parse_df.index, "zip_and_municipality"
] = parse_df.zip_and_municipality
# add geocoding to missing
units = units.merge(
right=geocoding_gdf[["zip_and_municipality", "geometry"]].rename(
columns={"geometry": "temp"}
),
how="left",
on="zip_and_municipality",
)
units.loc[units.geometry_geocoded, "geometry"] = units.loc[
units.geometry_geocoded, "temp"
]
init_len = len(units)
logger.info(
"Dropping units outside boundary by geometry or without geometry"
"..."
)
units.dropna(subset=["geometry"], inplace=True)
units = units.loc[units.geometry.within(boundary)]
if init_len > 0:
logger.debug(
f"{init_len - len(units)}/{init_len} "
f"({((init_len - len(units)) / init_len) * 100: g} %) dropped."
)
# drop unnecessary and rename columns
logger.debug("Reformatting...")
units.drop(
columns=[
"LokationMastrNummer",
"MaStRNummer",
"Laengengrad",
"Breitengrad",
"Spannungsebene",
"Land",
"temp",
],
inplace=True,
)
mapping = cols_mapping["all"].copy()
mapping.update(cols_mapping[tech])
mapping.update({"geometry": "geom"})
units.rename(columns=mapping, inplace=True)
units["voltage_level"] = units.voltage_level.fillna(-1).astype(int)
units.set_geometry("geom", inplace=True)
units["id"] = range(len(units))
# change capacity unit: kW to MW
units["capacity"] = units["capacity"] / 1e3
if "capacity_inverter" in units.columns:
units["capacity_inverter"] = units["capacity_inverter"] / 1e3
if "th_capacity" in units.columns:
units["th_capacity"] = units["th_capacity"] / 1e3
# assign bus ids
logger.debug("Assigning bus ids...")
units = units.assign(
bus_id=units.loc[~units.geom.x.isna()]
.sjoin(mv_grid_districts[["bus_id", "geom"]], how="left")
.drop(columns=["index_right"])
.bus_id
)
units["bus_id"] = units.bus_id.fillna(-1).astype(int)
# write to DB
logger.info(f"Writing {len(units)} units to DB...")
units.to_postgis(
name=target_tables[tech].__tablename__,
con=engine,
if_exists="append",
schema=target_tables[tech].__table_args__["schema"],
)