"""The central module containing all code dealing with the spatial
distribution of industrial electricity demands.
Industrial demands from DemandRegio are distributed from nuts3 level down
to osm landuse polygons and/or industrial sites also identified within this
processing step bringing three different inputs together.
"""
from sqlalchemy import ARRAY, Column, Float, Integer, String
from sqlalchemy.ext.declarative import declarative_base
import geopandas as gpd
import pandas as pd
from egon.data import db
from egon.data.config import settings
from egon.data.datasets import Dataset, DatasetSources, DatasetTargets
from egon.data.datasets.industry.temporal import (
insert_osm_ind_load,
insert_sites_ind_load,
)
Base = declarative_base()
[docs]
class EgonDemandRegioOsmIndElectricity(Base):
__tablename__ = "egon_demandregio_osm_ind_electricity"
__table_args__ = {"schema": "demand"}
id = Column(Integer, primary_key=True)
osm_id = Column(Integer)
scenario = Column(String(20), primary_key=True)
wz = Column(Integer)
demand = Column(Float)
[docs]
class EgonDemandRegioSitesIndElectricity(Base):
__tablename__ = "egon_demandregio_sites_ind_electricity"
__table_args__ = {"schema": "demand"}
industrial_sites_id = Column(Integer, primary_key=True)
scenario = Column(String(20), primary_key=True)
wz = Column(Integer)
demand = Column(Float)
[docs]
class DemandCurvesOsmIndustry(Base):
__tablename__ = "egon_osm_ind_load_curves"
__table_args__ = {"schema": "demand"}
bus = Column(Integer, primary_key=True)
scn_name = Column(String, primary_key=True)
p_set = Column(ARRAY(Float))
[docs]
class DemandCurvesOsmIndustryIndividual(Base):
__tablename__ = "egon_osm_ind_load_curves_individual"
__table_args__ = {"schema": "demand"}
osm_id = Column(Integer, primary_key=True)
bus_id = Column(Integer)
scn_name = Column(String, primary_key=True)
p_set = Column(ARRAY(Float))
peak_load = Column(Float)
demand = Column(Float)
voltage_level = Column(Integer)
[docs]
class DemandCurvesSitesIndustry(Base):
__tablename__ = "egon_sites_ind_load_curves"
__table_args__ = {"schema": "demand"}
bus = Column(Integer, primary_key=True)
scn_name = Column(String, primary_key=True)
wz = Column(Integer, primary_key=True)
p_set = Column(ARRAY(Float))
[docs]
class DemandCurvesSitesIndustryIndividual(Base):
__tablename__ = "egon_sites_ind_load_curves_individual"
__table_args__ = {"schema": "demand"}
site_id = Column(Integer, primary_key=True)
bus_id = Column(Integer)
scn_name = Column(String, primary_key=True)
p_set = Column(ARRAY(Float))
peak_load = Column(Float)
demand = Column(Float)
voltage_level = Column(Integer)
wz = Column(Integer)
[docs]
def create_tables():
"""Create tables for industrial sites and distributed industrial demands
Returns
-------
None.
"""
targets = IndustrialDemandCurves.targets
# Create target schema
db.execute_sql("CREATE SCHEMA IF NOT EXISTS demand;")
# Drop tables and sequences before recreating them
db.execute_sql(f"""DROP TABLE IF EXISTS
{targets.tables["sites"]} CASCADE;""")
db.execute_sql(f"""DROP TABLE IF EXISTS
{targets.tables["osm"]} CASCADE;""")
db.execute_sql(f"""DROP TABLE IF EXISTS
{targets.tables["osm_load"]} CASCADE;""")
db.execute_sql(f"""DROP TABLE IF EXISTS
{targets.tables["osm_load_individual"]} CASCADE;""")
db.execute_sql(f"""DROP TABLE IF EXISTS
{targets.tables["sites_load"]} CASCADE;""")
db.execute_sql(f"""DROP TABLE IF EXISTS
{targets.tables["sites_load_individual"]} CASCADE;""")
engine = db.engine()
EgonDemandRegioSitesIndElectricity.__table__.create(
bind=engine, checkfirst=True
)
EgonDemandRegioOsmIndElectricity.__table__.create(
bind=engine, checkfirst=True
)
DemandCurvesOsmIndustry.__table__.create(bind=engine, checkfirst=True)
DemandCurvesOsmIndustryIndividual.__table__.create(
bind=engine, checkfirst=True
)
DemandCurvesSitesIndustry.__table__.create(bind=engine, checkfirst=True)
DemandCurvesSitesIndustryIndividual.__table__.create(
bind=engine, checkfirst=True
)
[docs]
def industrial_demand_distr():
"""Distribute electrical demands for industry to osm landuse polygons
and/or industrial sites, identified earlier in the process.
The demands per subsector on nuts3-level from demandregio are distributed
linearly to the area of the corresponding landuse polygons or evenly to
identified industrial sites.
Returns
-------
None.
"""
# Read information from configuration file
sources = IndustrialDemandCurves.sources
targets = IndustrialDemandCurves.targets
# Delete data from table
db.execute_sql(f"""DELETE FROM {targets.tables["sites"]}""")
db.execute_sql(f"""DELETE FROM {targets.tables["osm"]}""")
for scn in settings()["egon-data"]["--scenarios"]:
boundaries = db.select_geodataframe(
f"""SELECT nuts, geometry
FROM {sources.tables["vg250_krs"]}""",
index_col="nuts",
geom_col="geometry",
epsg=3035,
)
# Select industrial landuse polygons
landuse = db.select_geodataframe(
f"""SELECT id, area_ha, geom
FROM {sources.tables["osm_landuse"]}
WHERE sector = 3
AND NOT ST_Intersects(
geom,
(SELECT ST_UNION(ST_Transform(geom,3035)) FROM
{sources.tables["industrial_sites"]}))
AND name NOT LIKE '%%kraftwerk%%'
AND name NOT LIKE '%%Stadtwerke%%'
AND name NOT LIKE '%%Müllverbrennung%%'
AND name NOT LIKE '%%Müllverwertung%%'
AND name NOT LIKE '%%Abfall%%'
AND name NOT LIKE '%%Kraftwerk%%'
AND name NOT LIKE '%%Wertstoff%%'
AND name NOT LIKE '%%olarpark%%'
AND name NOT LIKE '%%Gewerbegebiet%%'
AND name NOT LIKE '%%Gewerbepark%%'
AND name NOT LIKE '%%heizwerk%%'
AND name NOT LIKE '%%Heizwerk%%'
AND name NOT LIKE '%%Kläranlage%%'
AND name NOT LIKE '%%Klärwerk%%'
AND name NOT LIKE '%%Biogasanlage%%'
AND name NOT LIKE '%%Wasserwerk%%'
AND name NOT LIKE '%%Recyclinghof%%'
AND name NOT LIKE '%%Recyclingpark%%'""",
geom_col="geom",
epsg=3035,
)
# Spatially join vg250_krs and industrial landuse areas
landuse = gpd.sjoin(
landuse, boundaries, how="inner", predicate="intersects"
)
# Rename column
landuse = landuse.rename({"nuts": "nuts3"}, axis=1)
landuse_nuts3 = landuse[["area_ha", "nuts3"]]
landuse_nuts3 = landuse_nuts3.groupby(["nuts3"]).sum().reset_index()
# Select data on industrial sites
sites = db.select_dataframe(
f"""SELECT id, wz, nuts3 FROM
{sources.tables["industrial_sites"]}""",
index_col=None,
)
# Count number of industrial sites per subsector (wz) and nuts3
# district
sites_grouped = (
sites.groupby(["nuts3", "wz"]).size().reset_index(name="counts")
)
# Select industrial demands on nuts3 level from local database
demand_nuts3_import = db.select_dataframe(f"""SELECT nuts3, demand, wz
FROM {sources.tables["demandregio"]}
WHERE scenario = '{scn}'
AND demand > 0
AND wz IN
(SELECT wz FROM {sources.tables["wz"]}
WHERE sector = 'industry')""")
# Replace wz=17 and wz=18 by wz=1718 as a differentiation of these two
# subsectors can't be performed
demand_nuts3_import["wz"] = demand_nuts3_import["wz"].replace(
[17, 18], 1718
)
# Group results by nuts3 and wz to aggregate demands from subsectors
# 17 and 18
demand_nuts3 = (
demand_nuts3_import.groupby(["nuts3", "wz"]).sum().reset_index()
)
# A differentiation between those industrial subsectors (wz) which
# aren't represented and subsectors with a representation in the
# dataset on industrial sites is needed
# Select industrial demand for sectors which aren't found in
# industrial sites as category a
demand_nuts3_a = demand_nuts3[
~demand_nuts3["wz"].isin([1718, 19, 20, 23, 24])
]
# Select industrial demand for sectors which are found in industrial
# sites as category b
demand_nuts3_b = demand_nuts3[
demand_nuts3["wz"].isin([1718, 19, 20, 23, 24])
]
# Bring demands on nuts3 level and information on industrial sites per
# nuts3 district together
demand_nuts3_b = demand_nuts3_b.merge(
sites_grouped,
how="left",
left_on=["nuts3", "wz"],
right_on=["nuts3", "wz"],
)
# Define share of industrial demand per nuts3 region and subsector
# allocated to industrial sites
share_to_sites = 0.5
# Define demand per site for every nuts3 region and subsector
demand_nuts3_b["demand_per_site"] = (
demand_nuts3_b["demand"] * share_to_sites
) / demand_nuts3_b["counts"]
# Replace NaN by 0
demand_nuts3_b = demand_nuts3_b.fillna(0)
# Calculate demand which needs to be distributed to osm landuse areas
# from category b
demand_nuts3_b["demand_b_osm"] = demand_nuts3_b["demand"] - (
demand_nuts3_b["demand_per_site"] * demand_nuts3_b["counts"]
)
# Add information about demand per site to sites dataframe
sites = sites.merge(
demand_nuts3_b[["nuts3", "wz", "demand_per_site"]],
how="left",
left_on=["nuts3", "wz"],
right_on=["nuts3", "wz"],
)
sites = sites.rename(columns={"demand_per_site": "demand"})
demand_nuts3_b_osm = demand_nuts3_b[["nuts3", "wz", "demand_b_osm"]]
demand_nuts3_b_osm = demand_nuts3_b_osm.rename(
{"demand_b_osm": "demand"}, axis=1
)
# Create df containing all demand per wz which will be allocated to
# osm areas
demand_nuts3_osm_wz = pd.concat(
[demand_nuts3_a, demand_nuts3_b_osm], ignore_index=True
)
demand_nuts3_osm_wz = (
demand_nuts3_osm_wz.groupby(["nuts3", "wz"]).sum().reset_index()
)
# Calculate demand per hectar for each nuts3 region
demand_nuts3_osm_wz = demand_nuts3_osm_wz.merge(
landuse_nuts3, how="left", left_on=["nuts3"], right_on=["nuts3"]
)
demand_nuts3_osm_wz["demand_per_ha"] = (
demand_nuts3_osm_wz["demand"] / demand_nuts3_osm_wz["area_ha"]
)
# Add information about demand per ha to landuse df
landuse = landuse.merge(
demand_nuts3_osm_wz[["nuts3", "demand_per_ha", "wz"]],
how="left",
left_on=["nuts3"],
right_on=["nuts3"],
)
landuse["demand"] = landuse["area_ha"] * landuse["demand_per_ha"]
# Adjust dataframes for export to local database
sites = sites.rename({"id": "industrial_sites_id"}, axis=1)
sites["scenario"] = scn
sites.set_index("industrial_sites_id", inplace=True)
landuse = landuse.rename({"id": "osm_id"}, axis=1)
# Remove duplicates and adjust index
landuse = (
landuse.drop("geom", axis="columns")
.groupby(["osm_id", "wz"])
.sum()
.reset_index()
)
landuse.index.rename("id", inplace=True)
landuse["scenario"] = scn
# Write data to db
sites[["scenario", "wz", "demand"]].to_sql(
targets.get_table_name("sites"),
con=db.engine(),
schema=targets.get_table_schema("sites"),
if_exists="append",
)
landuse[["osm_id", "scenario", "wz", "demand"]].to_sql(
targets.get_table_name("osm"),
con=db.engine(),
schema=targets.get_table_schema("osm"),
if_exists="append",
)
[docs]
class IndustrialDemandCurves(Dataset):
"""
Distribute industrial electricity demands to industrial sites and OSM
landuse areas
Creates different tables to store industrial electricity demand curves on
different aggregation levels. In a first step industrial demands taken from
DemandRegio are distributed to industrial sites and OSM polygons which are
tagged as industrial areas. This method takes information on the different
industrial sectors into account and allocates the annual demand as well as
load curves accordingly.
*Dependencies*
* :py:class:`DemandRegio <egon.data.datasets.demandregio.DemandRegio>`
* :py:class:`MergeIndustrialSites <egon.data.datasets.industrial_sites.MergeIndustrialSites>`
* :py:class:`OsmLanduse <egon.data.datasets.loadarea.OsmLanduse>`
* :py:func:`define_mv_grid_districts <egon.data.datasets.mv_grid_districts.define_mv_grid_districts>`
* :py:class:`OpenStreetMap <egon.data.datasets.osm.OpenStreetMap>`
*Resulting tables*
* :py:class:`demand.egon_demandregio_osm_ind_electricity <egon.data.datasets.industry.EgonDemandRegioOsmIndElectricity>` is created and filled
* :py:class:`demand.egon_demandregio_sites_ind_electricity <egon.data.datasets.industry.EgonDemandRegioSitesIndElectricity>` is created and filled
* :py:class:`demand.egon_osm_ind_load_curves <egon.data.datasets.industry.DemandCurvesOsmIndustry>` is created and filled
* :py:class:`demand.egon_osm_ind_load_curves_individual <egon.data.datasets.industry.DemandCurvesOsmIndustryIndividual>` is created and filled
* :py:class:`demand.egon_sites_ind_load_curves <egon.data.datasets.industry.DemandCurvesSitesIndustry>` is created and filled
* :py:class:`demand.egon_sites_ind_load_curves_individual <egon.data.datasets.industry.DemandCurvesSitesIndustryIndividual>` is created and filled
"""
#:
name: str = "Industrial_demand_curves"
#:
version: str = "0.0.8"
sources = DatasetSources(
tables={
"demandregio": "demand.egon_demandregio_cts_ind",
"wz": "demand.egon_demandregio_wz",
"osm_landuse": "openstreetmap.osm_landuse",
"industrial_sites": "demand.egon_industrial_sites",
"vg250_krs": "boundaries.vg250_krs",
"osm": "demand.egon_demandregio_osm_ind_electricity",
"sites": "demand.egon_demandregio_sites_ind_electricity",
"sites_geom": "demand.egon_industrial_sites",
"demandregio_industry": "demand.egon_demandregio_cts_ind",
"demandregio_wz": "demand.egon_demandregio_wz",
"demandregio_timeseries": "demand.egon_demandregio_timeseries_cts_ind",
"hvmv_substation": "grid.egon_hvmv_substation",
"egon_mv_grid_district": "grid.egon_mv_grid_district",
"egon_ehv_voronoi": "grid.egon_ehv_substation_voronoi",
}
)
targets = DatasetTargets(
tables={
"osm": "demand.egon_demandregio_osm_ind_electricity",
"sites": "demand.egon_demandregio_sites_ind_electricity",
"osm_load": "demand.egon_osm_ind_load_curves",
"osm_load_individual": "demand.egon_osm_ind_load_curves_individual",
"sites_load": "demand.egon_sites_ind_load_curves",
"sites_load_individual": "demand.egon_sites_ind_load_curves_individual",
}
)
def __init__(self, dependencies):
super().__init__(
name=self.name,
version=self.version,
dependencies=dependencies,
tasks=(
create_tables,
industrial_demand_distr,
insert_osm_ind_load,
insert_sites_ind_load,
),
)