"""The central module containing all code dealing with individual heat supply.
The following main things are done in this module:
* ??
* Desaggregation of heat pump capacities to individual buildings
* Determination of minimum required heat pump capacity for pypsa-eur-sec
"""
from pathlib import Path
import os
import random
from airflow.operators.python import PythonOperator
from psycopg2.extensions import AsIs, register_adapter
from sqlalchemy import ARRAY, REAL, Column, Integer, String
from sqlalchemy.ext.declarative import declarative_base
import geopandas as gpd
import numpy as np
import pandas as pd
import saio
from egon.data import config, db, logger
from egon.data.datasets import (
Dataset,
load_sources_and_targets,
wrapped_partial,
)
from egon.data.datasets.district_heating_areas import (
MapZensusDistrictHeatingAreas,
)
from egon.data.datasets.electricity_demand_timeseries.cts_buildings import (
calc_cts_building_profiles,
)
from egon.data.datasets.electricity_demand_timeseries.mapping import (
EgonMapZensusMvgdBuildings,
)
from egon.data.datasets.electricity_demand_timeseries.tools import (
write_table_to_postgres,
)
from egon.data.datasets.emobility.motorized_individual_travel.helpers import (
reduce_mem_usage,
)
from egon.data.datasets.heat_demand import EgonPetaHeat
from egon.data.datasets.heat_demand_timeseries.daily import (
EgonDailyHeatDemandPerClimateZone,
EgonMapZensusClimateZones,
)
from egon.data.datasets.heat_demand_timeseries.idp_pool import (
EgonHeatTimeseries,
)
# get zensus cells with district heating
from egon.data.datasets.zensus_mv_grid_districts import MapZensusGridDistricts
engine = db.engine()
Base = declarative_base()
scenarios = config.settings()["egon-data"]["--scenarios"]
[docs]
class EgonEtragoTimeseriesIndividualHeating(Base):
"""
Class definition of table demand.egon_etrago_timeseries_individual_heating.
This table contains aggregated heat load profiles of all buildings with heat pumps
within an MV grid as well as of all buildings with gas boilers within an MV grid for
the different scenarios. The data is used in eTraGo.
"""
__tablename__ = "egon_etrago_timeseries_individual_heating"
__table_args__ = {"schema": "demand"}
bus_id = Column(Integer, primary_key=True)
scenario = Column(String, primary_key=True)
carrier = Column(String, primary_key=True)
dist_aggregated_mw = Column(ARRAY(REAL))
[docs]
class EgonHpCapacityBuildings(Base):
"""
Class definition of table demand.egon_hp_capacity_buildings.
This table contains the heat pump capacity of all buildings with a heat pump.
"""
__tablename__ = "egon_hp_capacity_buildings"
__table_args__ = {"schema": "demand"}
building_id = Column(Integer, primary_key=True)
scenario = Column(String, primary_key=True)
hp_capacity = Column(REAL)
[docs]
class HeatPumpsPypsaEur(Dataset):
"""
Class to determine minimum heat pump capcacities per building for the PyPSA-EUR run.
The goal is to ensure that the heat pump capacities determined in PyPSA-EUR are
sufficient to serve the heat demand of individual buildings after the
desaggregation from a few nodes in PyPSA-EUR to the individual buildings.
As the heat peak load is not previously determined, it is as well done in this
dataset. Further, as determining heat peak load requires heat load
profiles of the buildings to be set up, this task is also utilised to set up
heat load profiles of all buildings with heat pumps within a grid in the eGon100RE
scenario used in eTraGo.
For more information see data documentation on :ref:`dec-heat-pumps-ref`.
*Dependencies*
* :py:class:`CtsDemandBuildings
<egon.data.datasets.electricity_demand_timeseries.cts_buildings.CtsDemandBuildings>`
* :py:class:`DistrictHeatingAreas
<egon.data.datasets.district_heating_areas.DistrictHeatingAreas>`
* :py:class:`HeatTimeSeries
<egon.data.datasets.heat_demand_timeseries.HeatTimeSeries>`
*Resulting tables*
* `input-pypsa-eur-sec/minimum_hp_capacity_mv_grid_100RE.csv` file is created,
containing the minimum required heat pump capacity per MV grid in MW as
input for PyPSA-EUR (created within :func:`export_min_cap_to_csv`)
* :py:class:`demand.egon_etrago_timeseries_individual_heating
<egon.data.datasets.heat_supply.individual_heating.EgonEtragoTimeseriesIndividualHeating>`
is created and filled
* :py:class:`demand.egon_building_heat_peak_loads
<egon.data.datasets.heat_supply.individual_heating.BuildingHeatPeakLoads>`
is created and filled
**What is the challenge?**
The main challenge lies in the set up of heat demand profiles per building in
:func:`aggregate_residential_and_cts_profiles()` as it takes alot of time and
in grids with a high number of buildings requires alot of RAM. Both runtime and RAM
usage needed to be improved several times. To speed up the process, tasks are set
up to run in parallel. This currently leads to alot of connections being opened and
at a certain point to a runtime error due to too many open connections.
**What are central assumptions during the data processing?**
Central assumption for determining the minimum required heat pump capacity
is that heat pumps can be dimensioned using an approach from the network development
plan that uses the building's peak heat demand and a fixed COP (see
data documentation on :ref:`dec-heat-pumps-ref`).
**Drawbacks and limitations of the data**
The heat demand profiles used here to determine the heat peak load have very few
very high peaks that lead to large heat pump capacities. This should be solved
somehow. Cutting off the peak is not possible, as the time series of each building
is not saved but generated on the fly. Also, just using smaller heat pumps would
lead to infeasibilities in eDisGo.
"""
#:
name: str = "HeatPumpsPypsaEurSec"
#:
version: str = "0.0.4"
def __init__(self, dependencies):
def dyn_parallel_tasks_pypsa_eur():
"""Dynamically generate tasks
The goal is to speed up tasks by parallelising bulks of mvgds.
The number of parallel tasks is defined via parameter
`parallel_tasks` in the dataset config `datasets.yml`.
Returns
-------
set of airflow.PythonOperators
The tasks. Each element is of
:func:`egon.data.datasets.heat_supply.individual_heating.
determine_hp_cap_peak_load_mvgd_ts_pypsa_eur`
"""
parallel_tasks = config.datasets()["demand_timeseries_mvgd"].get(
"parallel_tasks", 1
)
tasks = set()
for i in range(parallel_tasks):
tasks.add(
PythonOperator(
task_id=(
f"individual_heating."
f"determine-hp-capacity-pypsa-eur-"
f"mvgd-bulk{i}"
),
python_callable=split_mvgds_into_bulks,
op_kwargs={
"n": i,
"max_n": parallel_tasks,
"func": determine_hp_cap_peak_load_mvgd_ts_pypsa_eur, # noqa: E501
},
)
)
return tasks
tasks_HeatPumpsPypsaEur = set()
if "eGon100RE" in scenarios:
tasks_HeatPumpsPypsaEur = (
delete_pypsa_eur_sec_csv_file,
delete_mvgd_ts_100RE,
delete_heat_peak_loads_100RE,
{*dyn_parallel_tasks_pypsa_eur()},
)
else:
tasks_HeatPumpsPypsaEur = (
PythonOperator(
task_id="HeatPumpsPypsaEur_skipped",
python_callable=skip_task,
op_kwargs={
"scn": "eGon100RE",
"task": "HeatPumpsPypsaEur",
},
),
)
super().__init__(
name=self.name,
version=self.version,
dependencies=dependencies,
tasks=tasks_HeatPumpsPypsaEur,
)
[docs]
class HeatPumpsStatusQuo(Dataset):
def __init__(self, dependencies):
def dyn_parallel_tasks_status_quo(scenario):
"""Dynamically generate tasks
The goal is to speed up tasks by parallelising bulks of mvgds.
The number of parallel tasks is defined via parameter
`parallel_tasks` in the dataset config `datasets.yml`.
Returns
-------
set of airflow.PythonOperators
The tasks. Each element is of
:func:`egon.data.datasets.heat_supply.individual_heating.
determine_hp_cap_peak_load_mvgd_ts_status_quo`
"""
parallel_tasks = config.datasets()["demand_timeseries_mvgd"].get(
"parallel_tasks", 1
)
tasks = set()
for i in range(parallel_tasks):
tasks.add(
PythonOperator(
task_id=(
"individual_heating."
f"determine-hp-capacity-{scenario}-"
f"mvgd-bulk{i}"
),
python_callable=split_mvgds_into_bulks,
op_kwargs={
"n": i,
"max_n": parallel_tasks,
"scenario": scenario,
"func": determine_hp_cap_peak_load_mvgd_ts_status_quo,
},
)
)
return tasks
if any(
"status" in scenario
for scenario in config.settings()["egon-data"]["--scenarios"]
):
tasks = ()
for scenario in config.settings()["egon-data"]["--scenarios"]:
if "status" in scenario:
postfix = f"_{scenario[-4:]}"
tasks += (
wrapped_partial(
delete_heat_peak_loads_status_quo,
scenario=scenario,
postfix=postfix,
),
wrapped_partial(
delete_hp_capacity_status_quo,
scenario=scenario,
postfix=postfix,
),
wrapped_partial(
delete_mvgd_ts_status_quo,
scenario=scenario,
postfix=postfix,
),
)
tasks += ({*dyn_parallel_tasks_status_quo(scenario)},)
else:
tasks = (
PythonOperator(
task_id="HeatPumpsSQ_skipped",
python_callable=skip_task,
op_kwargs={"scn": "sq", "task": "HeatPumpsStatusQuo"},
),
)
super().__init__(
name="HeatPumpsStatusQuo",
version="0.0.5",
dependencies=dependencies,
tasks=tasks,
)
[docs]
class HeatPumps2035(Dataset):
"""
Class for desaggregation of heat pump capcacities per MV grid district to individual
buildings for eGon2035 scenario.
The heat pump capacity per MV grid district is disaggregated to buildings
with individual heating based on the buildings heat peak demand. The buildings are
chosen randomly until the target capacity per MV grid district is reached. Buildings
with PV rooftop have a higher probability to be assigned a heat pump. As the
building's heat peak load is not previously determined, it is as well done in this
dataset. Further, as determining heat peak load requires heat load
profiles of the buildings to be set up, this task is also utilised to set up
aggregated heat load profiles of all buildings with heat pumps within a grid as
well as for all buildings with a gas boiler (i.e. all buildings with decentral
heating system minus buildings with heat pump) needed in eTraGo.
For more information see data documentation on :ref:`dec-heat-pumps-ref`.
Heat pump capacity per building in the eGon100RE scenario is set up in a separate
dataset, :py:class:`HeatPumps2050 <HeatPumps2050>`, as for one reason in case of the
eGon100RE scenario the minimum required heat pump capacity per building can directly
be determined using the peak heat demand per building determined in the dataset
:py:class:`HeatPumpsPypsaEurSec <HeatPumpsPypsaEurSec>`, whereas peak heat
demand data does not yet exist for the eGon2035 scenario. Another reason is,
that in case of the eGon100RE scenario all buildings with individual heating have a
heat pump whereas in the eGon2035 scenario buildings are randomly selected until the
installed heat pump capacity per MV grid is met. All other buildings with individual
heating but no heat pump are assigned a gas boiler.
*Dependencies*
* :py:class:`CtsDemandBuildings
<egon.data.datasets.electricity_demand_timeseries.cts_buildings.CtsDemandBuildings>`
* :py:class:`DistrictHeatingAreas
<egon.data.datasets.district_heating_areas.DistrictHeatingAreas>`
* :py:class:`HeatSupply <egon.data.datasets.heat_supply.HeatSupply>`
* :py:class:`HeatTimeSeries
<egon.data.datasets.heat_demand_timeseries.HeatTimeSeries>`
* :py:class:`HeatPumpsPypsaEurSec
<egon.data.datasets.heat_supply.individual_heating.HeatPumpsPypsaEurSec>`
* :py:func:`pv_rooftop_to_buildings
<egon.data.datasets.power_plants.pv_rooftop_buildings.pv_rooftop_to_buildings>`
*Resulting tables*
* :py:class:`demand.egon_hp_capacity_buildings
<egon.data.datasets.heat_supply.individual_heating.EgonHpCapacityBuildings>`
is created (if it doesn't yet exist) and filled
* :py:class:`demand.egon_etrago_timeseries_individual_heating
<egon.data.datasets.heat_supply.individual_heating.EgonEtragoTimeseriesIndividualHeating>`
is created (if it doesn't yet exist) and filled
* :py:class:`demand.egon_building_heat_peak_loads
<egon.data.datasets.heat_supply.individual_heating.BuildingHeatPeakLoads>`
is created (if it doesn't yet exist) and filled
**What is the challenge?**
The main challenge lies in the set up of heat demand profiles per building in
:func:`aggregate_residential_and_cts_profiles()` as it takes alot of time and
in grids with a high number of buildings requires alot of RAM. Both runtime and RAM
usage needed to be improved several times. To speed up the process, tasks are set
up to run in parallel. This currently leads to alot of connections being opened and
at a certain point to a runtime error due to too many open connections.
**What are central assumptions during the data processing?**
Central assumption for desaggregating the heat pump capacity to individual buildings
is that heat pumps can be dimensioned using an approach from the network development
plan that uses the building's peak heat demand and a fixed COP (see
data documentation on :ref:`dec-heat-pumps-ref`).
Another central assumption is, that buildings with PV rooftop plants are more likely
to have a heat pump than other buildings (see
:func:`determine_buildings_with_hp_in_mv_grid()` for details).
**Drawbacks and limitations of the data**
The heat demand profiles used here to determine the heat peak load have very few
very high peaks that lead to large heat pump capacities. This should be solved
somehow. Cutting off the peak is not possible, as the time series of each building
is not saved but generated on the fly. Also, just using smaller heat pumps would
lead to infeasibilities in eDisGo.
"""
#:
name: str = "HeatPumps2035"
#:
version: str = "0.0.3"
def __init__(self, dependencies):
def dyn_parallel_tasks_2035():
"""Dynamically generate tasks
The goal is to speed up tasks by parallelising bulks of mvgds.
The number of parallel tasks is defined via parameter
`parallel_tasks` in the dataset config `datasets.yml`.
Returns
-------
set of airflow.PythonOperators
The tasks. Each element is of
:func:`egon.data.datasets.heat_supply.individual_heating.
determine_hp_cap_peak_load_mvgd_ts_2035`
"""
parallel_tasks = config.datasets()["demand_timeseries_mvgd"].get(
"parallel_tasks", 1
)
tasks = set()
for i in range(parallel_tasks):
tasks.add(
PythonOperator(
task_id=(
"individual_heating."
f"determine-hp-capacity-2035-"
f"mvgd-bulk{i}"
),
python_callable=split_mvgds_into_bulks,
op_kwargs={
"n": i,
"max_n": parallel_tasks,
"func": determine_hp_cap_peak_load_mvgd_ts_2035,
},
)
)
return tasks
if "eGon2035" in scenarios:
tasks_HeatPumps2035 = (
delete_heat_peak_loads_2035,
delete_hp_capacity_2035,
delete_mvgd_ts_2035,
{*dyn_parallel_tasks_2035()},
)
else:
tasks_HeatPumps2035 = (
PythonOperator(
task_id="HeatPumps2035_skipped",
python_callable=skip_task,
op_kwargs={"scn": "eGon2035", "task": "HeatPumps2035"},
),
)
super().__init__(
name=self.version,
version="0.0.4",
dependencies=dependencies,
tasks=tasks_HeatPumps2035,
)
[docs]
class HeatPumps2050(Dataset):
"""
Class for desaggregation of heat pump capcacities per MV grid district to individual
buildings for eGon100RE scenario.
Optimised heat pump capacity from PyPSA-EUR run is disaggregated to all buildings
with individual heating (as heat pumps are the only option for individual heating
in the eGon100RE scenario) based on buildings heat peak demand. The heat peak demand
per building does in this dataset, in contrast to the
:py:class:`HeatPumps2035 <egon.data.datasets.pypsaeursec.HeatPumps2035>` dataset,
not need to be determined, as it was already determined in the
:py:class:`PypsaEurSec <egon.data.datasets.pypsaeursec.PypsaEurSec>` dataset.
For more information see data documentation on :ref:`dec-heat-pumps-ref`.
Heat pump capacity per building for the eGon2035 scenario is set up in a separate
dataset, :py:class:`HeatPumps2035 <HeatPumps2035>`. See there for further
information as to why.
*Dependencies*
* :py:class:`PypsaEurSec <egon.data.datasets.pypsaeursec.PypsaEurSec>`
* :py:class:`HeatPumpsPypsaEurSec
<egon.data.datasets.heat_supply.individual_heating.HeatPumpsPypsaEurSec>`
* :py:class:`HeatSupply <egon.data.datasets.heat_supply.HeatSupply>`
*Resulting tables*
* :py:class:`demand.egon_hp_capacity_buildings
<egon.data.datasets.heat_supply.individual_heating.EgonHpCapacityBuildings>`
is created (if it doesn't yet exist) and filled
**What are central assumptions during the data processing?**
Central assumption for desaggregating the heat pump capacity to individual buildings
is that heat pumps can be dimensioned using an approach from the network development
plan that uses the building's peak heat demand and a fixed COP (see
data documentation on :ref:`dec-heat-pumps-ref`).
**Drawbacks and limitations of the data**
The heat demand profiles used here to determine the heat peak load have very few
very high peaks that lead to large heat pump capacities. This should be solved
somehow. Cutting off the peak is not possible, as the time series of each building
is not saved but generated on the fly. Also, just using smaller heat pumps would
lead to infeasibilities in eDisGo.
"""
#:
name: str = "HeatPumps2050"
#:
version: str = "0.0.4"
def __init__(self, dependencies):
tasks_HeatPumps2050 = set()
if "eGon100RE" in scenarios:
tasks_HeatPumps2050 = (
delete_hp_capacity_100RE,
determine_hp_cap_buildings_eGon100RE,
)
else:
tasks_HeatPumps2050 = (
PythonOperator(
task_id="HeatPumps2050_skipped",
python_callable=skip_task,
op_kwargs={"scn": "eGon100RE", "task": "HeatPumps2050"},
),
)
super().__init__(
name=self.name,
version=self.version,
dependencies=dependencies,
tasks=tasks_HeatPumps2050,
)
[docs]
class BuildingHeatPeakLoads(Base):
"""
Class definition of table demand.egon_building_heat_peak_loads.
Table with peak heat demand of residential and CTS heat demand combined for
each building.
"""
__tablename__ = "egon_building_heat_peak_loads"
__table_args__ = {"schema": "demand"}
building_id = Column(Integer, primary_key=True)
scenario = Column(String, primary_key=True)
sector = Column(String, primary_key=True)
peak_load_in_w = Column(REAL)
[docs]
def skip_task(scn=str, task=str):
logger.info(
f"{scn} is not in the list of scenarios. {task} dataset is skipped."
)
return
[docs]
def adapt_numpy_float64(numpy_float64):
return AsIs(numpy_float64)
[docs]
def adapt_numpy_int64(numpy_int64):
return AsIs(numpy_int64)
[docs]
def cascade_per_technology(
heat_per_mv,
technologies,
scenario,
distribution_level,
max_size_individual_chp=0.05,
):
"""Add plants for individual heat.
Currently only on mv grid district level.
Parameters
----------
mv_grid_districts : geopandas.geodataframe.GeoDataFrame
MV grid districts including the heat demand
technologies : pandas.DataFrame
List of supply technologies and their parameters
scenario : str
Name of the scenario
max_size_individual_chp : float
Maximum capacity of an individual chp in MW
Returns
-------
mv_grid_districts : geopandas.geodataframe.GeoDataFrame
MV grid district which need additional individual heat supply
technologies : pandas.DataFrame
List of supply technologies and their parameters
append_df : pandas.DataFrame
List of plants per mv grid for the selected technology
"""
sources, targets = load_sources_and_targets("HeatSupply")
tech = technologies[technologies.priority == technologies.priority.max()]
# Distribute heat pumps linear to remaining demand.
if tech.index == "heat_pump":
if distribution_level == "federal_states":
# Select target values per federal state
target = db.select_dataframe(
f"""
SELECT DISTINCT ON (gen) gen as state, capacity
FROM {sources.tables['scenario_capacities']} a
JOIN {sources.tables['federal_states']} b
ON a.nuts = b.nuts
WHERE scenario_name = '{scenario}'
AND carrier = 'residential_rural_heat_pump'
""",
index_col="state",
)
heat_per_mv["share"] = heat_per_mv.groupby(
"state",
group_keys=False,
).remaining_demand.apply(lambda grp: grp / grp.sum())
append_df = (
heat_per_mv["share"]
.mul(target.capacity[heat_per_mv["state"]].values)
.reset_index()
)
else:
# Select target value for Germany
target = db.select_dataframe(f"""
SELECT SUM(capacity) AS capacity
FROM {sources.tables['scenario_capacities']} a
WHERE scenario_name = '{scenario}'
AND carrier = 'rural_heat_pump'
""")
if not target.capacity[0]:
target.capacity[0] = 0
if (
config.settings()["egon-data"]["--dataset-boundary"]
== "Schleswig-Holstein"
):
target.capacity[0] /= 16
heat_per_mv["share"] = (
heat_per_mv.remaining_demand
/ heat_per_mv.remaining_demand.sum()
)
append_df = (
heat_per_mv["share"].mul(target.capacity[0]).reset_index()
)
append_df.rename(
{"bus_id": "mv_grid_id", "share": "capacity"}, axis=1, inplace=True
)
elif (tech.index == "gas_boiler") & (scenario == "eGon2035"):
append_df = pd.DataFrame(
data={
"capacity": heat_per_mv.remaining_demand.div(
tech.estimated_flh.values[0]
),
"carrier": f"residential_rural_{tech.index}",
"mv_grid_id": heat_per_mv.index,
"scenario": scenario,
}
)
elif tech.index in ("gas_boiler", "resistive_heater", "solar_thermal"):
# Select target value for Germany
target = db.select_dataframe(f"""
SELECT SUM(capacity) AS capacity
FROM {sources.tables['scenario_capacities']} a
WHERE scenario_name = '{scenario}'
AND carrier = 'rural_{tech.index[0]}'
""")
if (
config.settings()["egon-data"]["--dataset-boundary"]
== "Schleswig-Holstein"
):
target.capacity[0] /= 16
heat_per_mv["share"] = (
heat_per_mv.remaining_demand / heat_per_mv.remaining_demand.sum()
)
append_df = heat_per_mv["share"].mul(target.capacity[0]).reset_index()
append_df.rename(
{"bus_id": "mv_grid_id", "share": "capacity"}, axis=1, inplace=True
)
else:
append_df = pd.DataFrame(
data={
"capacity": heat_per_mv.remaining_demand.div(
tech.estimated_flh.values[0]
),
"carrier": f"residential_rural_{tech.index}",
"mv_grid_id": heat_per_mv.index,
"scenario": scenario,
}
)
if append_df.size > 0:
append_df["carrier"] = tech.index[0]
heat_per_mv.loc[
append_df.mv_grid_id, "remaining_demand"
] -= append_df.set_index("mv_grid_id").capacity.mul(
tech.estimated_flh.values[0]
)
heat_per_mv = heat_per_mv[heat_per_mv.remaining_demand >= 0]
technologies = technologies.drop(tech.index)
return heat_per_mv, technologies, append_df
[docs]
def cascade_heat_supply_indiv(scenario, distribution_level, plotting=True):
"""Assigns supply strategy for individual heating in four steps.
1. all small scale CHP are connected.
2. If the supply can not meet the heat demand, solar thermal collectors
are attached. This is not implemented yet, since individual
solar thermal plants are not considered in eGon2035 scenario.
3. If this is not suitable, the mv grid is also supplied by heat pumps.
4. The last option are individual gas boilers.
Parameters
----------
scenario : str
Name of scenario
plotting : bool, optional
Choose if individual heating supply is plotted. The default is True.
Returns
-------
resulting_capacities : pandas.DataFrame
List of plants per mv grid
"""
sources, targets = load_sources_and_targets("HeatSupply")
# Select residential heat demand per mv grid district and federal state
heat_per_mv = db.select_geodataframe(
f"""
SELECT d.bus_id as bus_id, SUM(demand) as demand,
c.vg250_lan as state, d.geom
FROM {sources.tables['heat_demand']} a
JOIN {sources.tables['map_zensus_grid']} b
ON a.zensus_population_id = b.zensus_population_id
JOIN {sources.tables['map_vg250_grid']} c
ON b.bus_id = c.bus_id
JOIN {sources.tables['mv_grids']} d
ON d.bus_id = c.bus_id
WHERE scenario = '{scenario}'
AND a.zensus_population_id NOT IN (
SELECT zensus_population_id
FROM {sources.tables['map_dh']}
WHERE scenario = '{scenario}')
GROUP BY d.bus_id, vg250_lan, geom
""",
index_col="bus_id",
)
# Store geometry of mv grid
geom_mv = heat_per_mv.geom.centroid.copy()
# Initalize Dataframe for results
resulting_capacities = pd.DataFrame(
columns=["mv_grid_id", "carrier", "capacity"]
)
# Set technology data according to
# http://www.wbzu.de/seminare/infopool/infopool-bhkw
if scenario == "eGon2035":
technologies = pd.DataFrame(
index=["heat_pump", "gas_boiler"],
columns=["estimated_flh", "priority"],
data={"estimated_flh": [4000, 8000], "priority": [2, 1]},
)
elif scenario == "eGon100RE":
technologies = pd.DataFrame(
index=[
"heat_pump",
"resistive_heater",
"solar_thermal",
"gas_boiler",
"oil_boiler",
],
columns=["estimated_flh", "priority"],
data={
"estimated_flh": [4000, 2000, 2000, 8000, 8000],
"priority": [5, 4, 3, 2, 1],
},
)
elif "status" in scenario:
technologies = pd.DataFrame(
index=["heat_pump"],
columns=["estimated_flh", "priority"],
data={"estimated_flh": [4000], "priority": [1]},
)
else:
raise ValueError(f"{scenario=} is not valid.")
# In the beginning, the remaining demand equals demand
heat_per_mv["remaining_demand"] = heat_per_mv["demand"]
# Connect new technologies, if there is still heat demand left
while (len(technologies) > 0) and (len(heat_per_mv) > 0):
# Attach new supply technology
heat_per_mv, technologies, append_df = cascade_per_technology(
heat_per_mv, technologies, scenario, distribution_level
)
# Collect resulting capacities
resulting_capacities = pd.concat(
[resulting_capacities, append_df], ignore_index=True
)
if plotting:
plot_heat_supply(resulting_capacities)
return gpd.GeoDataFrame(
resulting_capacities,
geometry=geom_mv[resulting_capacities.mv_grid_id].values,
)
[docs]
def get_peta_demand(mvgd, scenario):
"""
Retrieve annual peta heat demand for residential buildings for either
eGon2035 or eGon100RE scenario.
Parameters
----------
mvgd : int
MV grid ID.
scenario : str
Possible options are eGon2035 or eGon100RE
Returns
-------
df_peta_demand : pd.DataFrame
Annual residential heat demand per building and scenario. Columns of
the dataframe are zensus_population_id and demand.
"""
with db.session_scope() as session:
query = (
session.query(
MapZensusGridDistricts.zensus_population_id,
EgonPetaHeat.demand,
)
.filter(MapZensusGridDistricts.bus_id == mvgd)
.filter(
MapZensusGridDistricts.zensus_population_id
== EgonPetaHeat.zensus_population_id
)
.filter(
EgonPetaHeat.sector == "residential",
EgonPetaHeat.scenario == scenario,
)
)
df_peta_demand = pd.read_sql(
query.statement, query.session.bind, index_col=None
)
return df_peta_demand
[docs]
def get_residential_heat_profile_ids(mvgd):
"""
Retrieve 365 daily heat profiles ids per residential building and selected
mvgd.
Parameters
----------
mvgd : int
ID of MVGD
Returns
-------
df_profiles_ids : pd.DataFrame
Residential daily heat profile ID's per building. Columns of the
dataframe are zensus_population_id, building_id,
selected_idp_profiles, buildings and day_of_year.
"""
with db.session_scope() as session:
query = (
session.query(
MapZensusGridDistricts.zensus_population_id,
EgonHeatTimeseries.building_id,
EgonHeatTimeseries.selected_idp_profiles,
)
.filter(MapZensusGridDistricts.bus_id == mvgd)
.filter(
MapZensusGridDistricts.zensus_population_id
== EgonHeatTimeseries.zensus_population_id
)
)
df_profiles_ids = pd.read_sql(
query.statement, query.session.bind, index_col=None
)
# Add building count per cell
df_profiles_ids = pd.merge(
left=df_profiles_ids,
right=df_profiles_ids.groupby("zensus_population_id")["building_id"]
.count()
.rename("buildings"),
left_on="zensus_population_id",
right_index=True,
)
# unnest array of ids per building
df_profiles_ids = df_profiles_ids.explode("selected_idp_profiles")
# add day of year column by order of list
df_profiles_ids["day_of_year"] = (
df_profiles_ids.groupby("building_id").cumcount() + 1
)
return df_profiles_ids
[docs]
def get_daily_profiles(profile_ids):
"""
Parameters
----------
profile_ids : list(int)
daily heat profile ID's
Returns
-------
df_profiles : pd.DataFrame
Residential daily heat profiles. Columns of the dataframe are idp,
house, temperature_class and hour.
"""
saio.register_schema("demand", db.engine())
from saio.demand import egon_heat_idp_pool
with db.session_scope() as session:
query = session.query(egon_heat_idp_pool).filter(
egon_heat_idp_pool.index.in_(profile_ids)
)
df_profiles = pd.read_sql(
query.statement, query.session.bind, index_col="index"
)
# unnest array of profile values per id
df_profiles = df_profiles.explode("idp")
# Add column for hour of day
df_profiles["hour"] = df_profiles.groupby(axis=0, level=0).cumcount() + 1
return df_profiles
[docs]
def get_daily_demand_share(mvgd):
"""per census cell
Parameters
----------
mvgd : int
MVGD id
Returns
-------
df_daily_demand_share : pd.DataFrame
Daily annual demand share per cencus cell. Columns of the dataframe
are zensus_population_id, day_of_year and daily_demand_share.
"""
with db.session_scope() as session:
query = session.query(
MapZensusGridDistricts.zensus_population_id,
EgonDailyHeatDemandPerClimateZone.day_of_year,
EgonDailyHeatDemandPerClimateZone.daily_demand_share,
).filter(
EgonMapZensusClimateZones.climate_zone
== EgonDailyHeatDemandPerClimateZone.climate_zone,
MapZensusGridDistricts.zensus_population_id
== EgonMapZensusClimateZones.zensus_population_id,
MapZensusGridDistricts.bus_id == mvgd,
)
df_daily_demand_share = pd.read_sql(
query.statement, query.session.bind, index_col=None
)
return df_daily_demand_share
[docs]
def calc_residential_heat_profiles_per_mvgd(mvgd, scenario):
"""
Gets residential heat profiles per building in MV grid for either eGon2035
or eGon100RE scenario.
Parameters
----------
mvgd : int
MV grid ID.
scenario : str
Possible options are eGon2035 or eGon100RE.
Returns
--------
pd.DataFrame
Heat demand profiles of buildings. Columns are:
* zensus_population_id : int
Zensus cell ID building is in.
* building_id : int
ID of building.
* day_of_year : int
Day of the year (1 - 365).
* hour : int
Hour of the day (1 - 24).
* demand_ts : float
Building's residential heat demand in MW, for specified hour
of the year (specified through columns `day_of_year` and
`hour`).
"""
columns = [
"zensus_population_id",
"building_id",
"day_of_year",
"hour",
"demand_ts",
]
df_peta_demand = get_peta_demand(mvgd, scenario)
df_peta_demand = reduce_mem_usage(df_peta_demand)
# TODO maybe return empty dataframe
if df_peta_demand.empty:
logger.info(f"No demand for MVGD: {mvgd}")
return pd.DataFrame(columns=columns)
df_profiles_ids = get_residential_heat_profile_ids(mvgd)
if df_profiles_ids.empty:
logger.info(f"No profiles for MVGD: {mvgd}")
return pd.DataFrame(columns=columns)
df_profiles = get_daily_profiles(
df_profiles_ids["selected_idp_profiles"].unique()
)
df_daily_demand_share = get_daily_demand_share(mvgd)
# Merge profile ids to peta demand by zensus_population_id
df_profile_merge = pd.merge(
left=df_peta_demand, right=df_profiles_ids, on="zensus_population_id"
)
df_profile_merge.demand = df_profile_merge.demand.div(
df_profile_merge.buildings
)
df_profile_merge.drop("buildings", axis="columns", inplace=True)
# Merge daily demand to daily profile ids by zensus_population_id and day
df_profile_merge = pd.merge(
left=df_profile_merge,
right=df_daily_demand_share,
on=["zensus_population_id", "day_of_year"],
)
df_profile_merge.demand = df_profile_merge.demand.mul(
df_profile_merge.daily_demand_share
)
df_profile_merge.drop("daily_demand_share", axis="columns", inplace=True)
df_profile_merge = reduce_mem_usage(df_profile_merge)
# Merge daily profiles by profile id
df_profile_merge = pd.merge(
left=df_profile_merge,
right=df_profiles[["idp", "hour"]],
left_on="selected_idp_profiles",
right_index=True,
)
df_profile_merge = reduce_mem_usage(df_profile_merge)
df_profile_merge.demand = df_profile_merge.demand.mul(
df_profile_merge.idp.astype(float)
)
df_profile_merge.drop("idp", axis="columns", inplace=True)
df_profile_merge.rename(
{"demand": "demand_ts"}, axis="columns", inplace=True
)
df_profile_merge = reduce_mem_usage(df_profile_merge)
return df_profile_merge.loc[:, columns]
[docs]
def plot_heat_supply(resulting_capacities):
from matplotlib import pyplot as plt
mv_grids = db.select_geodataframe(
"""
SELECT * FROM grid.egon_mv_grid_district
""",
index_col="bus_id",
)
for c in ["CHP", "heat_pump"]:
mv_grids[c] = (
resulting_capacities[resulting_capacities.carrier == c]
.set_index("mv_grid_id")
.capacity
)
fig, ax = plt.subplots(1, 1)
mv_grids.boundary.plot(linewidth=0.2, ax=ax, color="black")
mv_grids.plot(
ax=ax,
column=c,
cmap="magma_r",
legend=True,
legend_kwds={
"label": f"Installed {c} in MW",
"orientation": "vertical",
},
)
plt.savefig(f"plots/individual_heat_supply_{c}.png", dpi=300)
[docs]
def get_zensus_cells_with_decentral_heat_demand_in_mv_grid(
scenario, mv_grid_id
):
"""
Returns zensus cell IDs with decentral heating systems in given MV grid.
As cells with district heating differ between scenarios, this is also
depending on the scenario.
Parameters
-----------
scenario : str
Name of scenario. Can be either "eGon2035" or "eGon100RE".
mv_grid_id : int
ID of MV grid.
Returns
--------
pd.Index(int)
Zensus cell IDs (as int) of buildings with decentral heating systems in
given MV grid. Type is pandas Index to avoid errors later on when it is
used in a query.
"""
# get zensus cells in grid
zensus_population_ids = db.select_dataframe(
f"""
SELECT zensus_population_id
FROM boundaries.egon_map_zensus_grid_districts
WHERE bus_id = {mv_grid_id}
""",
index_col=None,
).zensus_population_id.values
# maybe use adapter
# convert to pd.Index (otherwise type is np.int64, which will for some
# reason throw an error when used in a query)
zensus_population_ids = pd.Index(zensus_population_ids)
# get zensus cells with district heating
with db.session_scope() as session:
query = session.query(
MapZensusDistrictHeatingAreas.zensus_population_id,
).filter(
MapZensusDistrictHeatingAreas.scenario == scenario,
MapZensusDistrictHeatingAreas.zensus_population_id.in_(
zensus_population_ids
),
)
cells_with_dh = pd.read_sql(
query.statement, query.session.bind, index_col=None
).zensus_population_id.values
# remove zensus cells with district heating
zensus_population_ids = zensus_population_ids.drop(
cells_with_dh, errors="ignore"
)
return pd.Index(zensus_population_ids)
[docs]
def get_residential_buildings_with_decentral_heat_demand_in_mv_grid(
scenario, mv_grid_id
):
"""
Returns building IDs of buildings with decentral residential heat demand in
given MV grid.
As cells with district heating differ between scenarios, this is also
depending on the scenario.
Parameters
-----------
scenario : str
Name of scenario. Can be either "eGon2035" or "eGon100RE".
mv_grid_id : int
ID of MV grid.
Returns
--------
pd.Index(int)
Building IDs (as int) of buildings with decentral heating system in
given MV grid. Type is pandas Index to avoid errors later on when it is
used in a query.
"""
# get zensus cells with decentral heating
zensus_population_ids = (
get_zensus_cells_with_decentral_heat_demand_in_mv_grid(
scenario, mv_grid_id
)
)
# get buildings with decentral heat demand
saio.register_schema("demand", engine)
from saio.demand import egon_heat_timeseries_selected_profiles
with db.session_scope() as session:
query = session.query(
egon_heat_timeseries_selected_profiles.building_id,
).filter(
egon_heat_timeseries_selected_profiles.zensus_population_id.in_(
zensus_population_ids
)
)
buildings_with_heat_demand = pd.read_sql(
query.statement, query.session.bind, index_col=None
).building_id.values
return pd.Index(buildings_with_heat_demand)
[docs]
def get_cts_buildings_with_decentral_heat_demand_in_mv_grid(
scenario, mv_grid_id
):
"""
Returns building IDs of buildings with decentral CTS heat demand in
given MV grid.
As cells with district heating differ between scenarios, this is also
depending on the scenario.
Parameters
-----------
scenario : str
Name of scenario. Can be either "eGon2035" or "eGon100RE".
mv_grid_id : int
ID of MV grid.
Returns
--------
pd.Index(int)
Building IDs (as int) of buildings with decentral heating system in
given MV grid. Type is pandas Index to avoid errors later on when it is
used in a query.
"""
# get zensus cells with decentral heating
zensus_population_ids = (
get_zensus_cells_with_decentral_heat_demand_in_mv_grid(
scenario, mv_grid_id
)
)
# get buildings with decentral heat demand
with db.session_scope() as session:
query = session.query(EgonMapZensusMvgdBuildings.building_id).filter(
EgonMapZensusMvgdBuildings.sector == "cts",
EgonMapZensusMvgdBuildings.zensus_population_id.in_(
zensus_population_ids
),
)
buildings_with_heat_demand = pd.read_sql(
query.statement, query.session.bind, index_col=None
).building_id.values
return pd.Index(buildings_with_heat_demand)
[docs]
def get_buildings_with_decentral_heat_demand_in_mv_grid(mvgd, scenario):
"""
Returns building IDs of buildings with decentral heat demand in
given MV grid.
As cells with district heating differ between scenarios, this is also
depending on the scenario. CTS and residential have to be retrieved
seperatly as some residential buildings only have electricity but no
heat demand. This does not occure in CTS.
Parameters
-----------
mvgd : int
ID of MV grid.
scenario : str
Name of scenario. Can be either "eGon2035" or "eGon100RE".
Returns
--------
pd.Index(int)
Building IDs (as int) of buildings with decentral heating system in
given MV grid. Type is pandas Index to avoid errors later on when it is
used in a query.
"""
# get residential buildings with decentral heating systems
buildings_decentral_heating_res = (
get_residential_buildings_with_decentral_heat_demand_in_mv_grid(
scenario, mvgd
)
)
# get CTS buildings with decentral heating systems
buildings_decentral_heating_cts = (
get_cts_buildings_with_decentral_heat_demand_in_mv_grid(scenario, mvgd)
)
# merge residential and CTS buildings
buildings_decentral_heating = buildings_decentral_heating_res.union(
buildings_decentral_heating_cts
).unique()
return buildings_decentral_heating
[docs]
def get_total_heat_pump_capacity_of_mv_grid(scenario, mv_grid_id):
"""
Returns total heat pump capacity per grid that was previously defined
(by NEP or pypsa-eur-sec).
Parameters
-----------
scenario : str
Name of scenario. Can be either "eGon2035" or "eGon100RE".
mv_grid_id : int
ID of MV grid.
Returns
--------
float
Total heat pump capacity in MW in given MV grid.
"""
from egon.data.datasets.heat_supply import EgonIndividualHeatingSupply
with db.session_scope() as session:
query = (
session.query(
EgonIndividualHeatingSupply.mv_grid_id,
EgonIndividualHeatingSupply.capacity,
)
.filter(EgonIndividualHeatingSupply.scenario == scenario)
.filter(EgonIndividualHeatingSupply.carrier == "heat_pump")
.filter(EgonIndividualHeatingSupply.mv_grid_id == mv_grid_id)
)
hp_cap_mv_grid = pd.read_sql(
query.statement, query.session.bind, index_col="mv_grid_id"
)
if hp_cap_mv_grid.empty:
return 0.0
else:
return hp_cap_mv_grid.capacity.values[0]
[docs]
def get_heat_peak_demand_per_building(scenario, building_ids):
""""""
with db.session_scope() as session:
query = (
session.query(
BuildingHeatPeakLoads.building_id,
BuildingHeatPeakLoads.peak_load_in_w,
)
.filter(BuildingHeatPeakLoads.scenario == scenario)
.filter(BuildingHeatPeakLoads.building_id.in_(building_ids))
)
df_heat_peak_demand = pd.read_sql(
query.statement, query.session.bind, index_col=None
)
# TODO remove check
if df_heat_peak_demand.duplicated("building_id").any():
raise ValueError("Duplicate building_id")
# convert to series and from W to MW
df_heat_peak_demand = (
df_heat_peak_demand.set_index("building_id").loc[:, "peak_load_in_w"]
* 1e6
)
return df_heat_peak_demand
[docs]
def determine_minimum_hp_capacity_per_building(
peak_heat_demand, flexibility_factor=24 / 18, cop=1.7
):
"""
Determines minimum required heat pump capacity.
Parameters
----------
peak_heat_demand : pd.Series
Series with peak heat demand per building in MW. Index contains the
building ID.
flexibility_factor : float
Factor to overdimension the heat pump to allow for some flexible
dispatch in times of high heat demand. Per default, a factor of 24/18
is used, to take into account
Returns
-------
pd.Series
Pandas series with minimum required heat pump capacity per building in
MW.
"""
return peak_heat_demand * flexibility_factor / cop
[docs]
def determine_buildings_with_hp_in_mv_grid(
hp_cap_mv_grid, min_hp_cap_per_building
):
"""
Distributes given total heat pump capacity to buildings based on their peak
heat demand.
Parameters
-----------
hp_cap_mv_grid : float
Total heat pump capacity in MW in given MV grid.
min_hp_cap_per_building : pd.Series
Pandas series with minimum required heat pump capacity per building
in MW.
Returns
-------
pd.Index(int)
Building IDs (as int) of buildings to get heat demand time series for.
"""
building_ids = min_hp_cap_per_building.index
# get buildings with PV to give them a higher priority when selecting
# buildings a heat pump will be allocated to
saio.register_schema("supply", engine)
from saio.supply import egon_power_plants_pv_roof_building
with db.session_scope() as session:
query = session.query(
egon_power_plants_pv_roof_building.building_id
).filter(
egon_power_plants_pv_roof_building.building_id.in_(building_ids),
egon_power_plants_pv_roof_building.scenario == "eGon2035",
)
buildings_with_pv = pd.read_sql(
query.statement, query.session.bind, index_col=None
).building_id.values
# set different weights for buildings with PV and without PV
weight_with_pv = 1.5
weight_without_pv = 1.0
weights = pd.concat(
[
pd.DataFrame(
{"weight": weight_without_pv},
index=building_ids.drop(buildings_with_pv, errors="ignore"),
),
pd.DataFrame({"weight": weight_with_pv}, index=buildings_with_pv),
]
)
# normalise weights (probability needs to add up to 1)
weights.weight = weights.weight / weights.weight.sum()
# get random order at which buildings are chosen
np.random.seed(db.credentials()["--random-seed"])
buildings_with_hp_order = np.random.choice(
weights.index,
size=len(weights),
replace=False,
p=weights.weight.values,
)
# select buildings until HP capacity in MV grid is reached (some rest
# capacity will remain)
hp_cumsum = min_hp_cap_per_building.loc[buildings_with_hp_order].cumsum()
buildings_with_hp = hp_cumsum[hp_cumsum <= hp_cap_mv_grid].index
# choose random heat pumps until remaining heat pumps are larger than
# remaining heat pump capacity
remaining_hp_cap = (
hp_cap_mv_grid - min_hp_cap_per_building.loc[buildings_with_hp].sum()
)
min_cap_buildings_wo_hp = min_hp_cap_per_building.loc[
building_ids.drop(buildings_with_hp)
]
possible_buildings = min_cap_buildings_wo_hp[
min_cap_buildings_wo_hp <= remaining_hp_cap
].index
while len(possible_buildings) > 0:
random.seed(db.credentials()["--random-seed"])
new_hp_building = random.choice(possible_buildings)
# add new building to building with HP
buildings_with_hp = buildings_with_hp.union(
pd.Index([new_hp_building])
)
# determine if there are still possible buildings
remaining_hp_cap = (
hp_cap_mv_grid
- min_hp_cap_per_building.loc[buildings_with_hp].sum()
)
min_cap_buildings_wo_hp = min_hp_cap_per_building.loc[
building_ids.drop(buildings_with_hp)
]
possible_buildings = min_cap_buildings_wo_hp[
min_cap_buildings_wo_hp <= remaining_hp_cap
].index
return buildings_with_hp
[docs]
def desaggregate_hp_capacity(min_hp_cap_per_building, hp_cap_mv_grid):
"""
Desaggregates the required total heat pump capacity to buildings.
All buildings are previously assigned a minimum required heat pump
capacity. If the total heat pump capacity exceeds this, larger heat pumps
are assigned.
Parameters
------------
min_hp_cap_per_building : pd.Series
Pandas series with minimum required heat pump capacity per building
in MW.
hp_cap_mv_grid : float
Total heat pump capacity in MW in given MV grid.
Returns
--------
pd.Series
Pandas series with heat pump capacity per building in MW.
"""
# distribute remaining capacity to all buildings with HP depending on
# installed HP capacity
allocated_cap = min_hp_cap_per_building.sum()
remaining_cap = hp_cap_mv_grid - allocated_cap
fac = remaining_cap / allocated_cap
hp_cap_per_building = (
min_hp_cap_per_building * fac + min_hp_cap_per_building
)
hp_cap_per_building.index.name = "building_id"
return hp_cap_per_building
[docs]
def determine_min_hp_cap_buildings_pypsa_eur_sec(
peak_heat_demand, building_ids
):
"""
Determines minimum required HP capacity in MV grid in MW as input for
pypsa-eur-sec.
Parameters
----------
peak_heat_demand : pd.Series
Series with peak heat demand per building in MW. Index contains the
building ID.
building_ids : pd.Index(int)
Building IDs (as int) of buildings with decentral heating system in
given MV grid.
Returns
--------
float
Minimum required HP capacity in MV grid in MW.
"""
if len(building_ids) > 0:
peak_heat_demand = peak_heat_demand.loc[building_ids]
# determine minimum required heat pump capacity per building
min_hp_cap_buildings = determine_minimum_hp_capacity_per_building(
peak_heat_demand
)
return min_hp_cap_buildings.sum()
else:
return 0.0
[docs]
def determine_hp_cap_buildings_pvbased_per_mvgd(
scenario, mv_grid_id, peak_heat_demand, building_ids
):
"""
Determines which buildings in the MV grid will have a HP (buildings with PV
rooftop are more likely to be assigned) in the eGon2035 scenario, as well
as their respective HP capacity in MW.
Parameters
-----------
mv_grid_id : int
ID of MV grid.
peak_heat_demand : pd.Series
Series with peak heat demand per building in MW. Index contains the
building ID.
building_ids : pd.Index(int)
Building IDs (as int) of buildings with decentral heating system in
given MV grid.
"""
hp_cap_grid = get_total_heat_pump_capacity_of_mv_grid(scenario, mv_grid_id)
if len(building_ids) > 0 and hp_cap_grid > 0.0:
peak_heat_demand = peak_heat_demand.loc[building_ids]
# determine minimum required heat pump capacity per building
min_hp_cap_buildings = determine_minimum_hp_capacity_per_building(
peak_heat_demand
)
# select buildings that will have a heat pump
buildings_with_hp = determine_buildings_with_hp_in_mv_grid(
hp_cap_grid, min_hp_cap_buildings
)
# distribute total heat pump capacity to all buildings with HP
hp_cap_per_building = desaggregate_hp_capacity(
min_hp_cap_buildings.loc[buildings_with_hp], hp_cap_grid
)
return hp_cap_per_building.rename("hp_capacity")
else:
return pd.Series(dtype="float64").rename("hp_capacity")
[docs]
def determine_hp_cap_buildings_eGon100RE_per_mvgd(mv_grid_id):
"""
Determines HP capacity per building in eGon100RE scenario.
In eGon100RE scenario all buildings without district heating get a heat
pump.
Returns
--------
pd.Series
Pandas series with heat pump capacity per building in MW.
"""
hp_cap_grid = get_total_heat_pump_capacity_of_mv_grid(
"eGon100RE", mv_grid_id
)
if hp_cap_grid > 0.0:
# get buildings with decentral heating systems
building_ids = get_buildings_with_decentral_heat_demand_in_mv_grid(
mv_grid_id, scenario="eGon100RE"
)
logger.info(f"MVGD={mv_grid_id} | Get peak loads from DB")
df_peak_heat_demand = get_heat_peak_demand_per_building(
"eGon100RE", building_ids
)
logger.info(f"MVGD={mv_grid_id} | Determine HP capacities.")
# determine minimum required heat pump capacity per building
min_hp_cap_buildings = determine_minimum_hp_capacity_per_building(
df_peak_heat_demand, flexibility_factor=24 / 18, cop=1.7
)
logger.info(f"MVGD={mv_grid_id} | Desaggregate HP capacities.")
# distribute total heat pump capacity to all buildings with HP
hp_cap_per_building = desaggregate_hp_capacity(
min_hp_cap_buildings, hp_cap_grid
)
return hp_cap_per_building.rename("hp_capacity")
else:
return pd.Series(dtype="float64").rename("hp_capacity")
[docs]
def determine_hp_cap_buildings_eGon100RE():
"""
Main function to determine HP capacity per building in eGon100RE scenario.
"""
# ========== Register np datatypes with SQLA ==========
register_adapter(np.float64, adapt_numpy_float64)
register_adapter(np.int64, adapt_numpy_int64)
# =====================================================
with db.session_scope() as session:
query = (
session.query(
MapZensusGridDistricts.bus_id,
)
.filter(
MapZensusGridDistricts.zensus_population_id
== EgonPetaHeat.zensus_population_id
)
.distinct(MapZensusGridDistricts.bus_id)
)
mvgd_ids = pd.read_sql(
query.statement, query.session.bind, index_col=None
)
mvgd_ids = mvgd_ids.sort_values("bus_id")
mvgd_ids = mvgd_ids["bus_id"].values
df_hp_cap_per_building_100RE_db = pd.DataFrame(
columns=["building_id", "hp_capacity"]
)
for mvgd_id in mvgd_ids:
logger.info(f"MVGD={mvgd_id} | Start")
hp_cap_per_building_100RE = (
determine_hp_cap_buildings_eGon100RE_per_mvgd(mvgd_id)
)
if not hp_cap_per_building_100RE.empty:
df_hp_cap_per_building_100RE_db = pd.concat(
[
df_hp_cap_per_building_100RE_db,
hp_cap_per_building_100RE.reset_index(),
],
axis=0,
)
logger.info(f"MVGD={min(mvgd_ids)} : {max(mvgd_ids)} | Write data to db.")
df_hp_cap_per_building_100RE_db["scenario"] = "eGon100RE"
EgonHpCapacityBuildings.__table__.create(bind=engine, checkfirst=True)
write_table_to_postgres(
df_hp_cap_per_building_100RE_db,
EgonHpCapacityBuildings,
drop=False,
)
[docs]
def aggregate_residential_and_cts_profiles(mvgd, scenario):
"""
Gets residential and CTS heat demand profiles per building and aggregates
them.
Parameters
----------
mvgd : int
MV grid ID.
scenario : str
Possible options are eGon2035 or eGon100RE.
Returns
--------
pd.DataFrame
Table of demand profile per building. Column names are building IDs and
index is hour of the year as int (0-8759).
"""
# ############### get residential heat demand profiles ###############
df_heat_ts = calc_residential_heat_profiles_per_mvgd(
mvgd=mvgd, scenario=scenario
)
# pivot to allow aggregation with CTS profiles
df_heat_ts = df_heat_ts.pivot(
index=["day_of_year", "hour"],
columns="building_id",
values="demand_ts",
)
df_heat_ts = df_heat_ts.sort_index().reset_index(drop=True)
# ############### get CTS heat demand profiles ###############
heat_demand_cts_ts = calc_cts_building_profiles(
bus_ids=[mvgd],
scenario=scenario,
sector="heat",
)
# ############# aggregate residential and CTS demand profiles #############
df_heat_ts = pd.concat([df_heat_ts, heat_demand_cts_ts], axis=1)
df_heat_ts = df_heat_ts.groupby(axis=1, level=0).sum()
return df_heat_ts
[docs]
def export_to_db(df_peak_loads_db, df_heat_mvgd_ts_db, drop=False):
"""
Function to export the collected results of all MVGDs per bulk to DB.
Parameters
----------
df_peak_loads_db : pd.DataFrame
Table of building peak loads of all MVGDs per bulk
df_heat_mvgd_ts_db : pd.DataFrame
Table of all aggregated MVGD profiles per bulk
drop : boolean
Drop and recreate table if True
"""
df_peak_loads_db = df_peak_loads_db.melt(
id_vars="building_id",
var_name="scenario",
value_name="peak_load_in_w",
)
df_peak_loads_db["building_id"] = df_peak_loads_db["building_id"].astype(
int
)
df_peak_loads_db["sector"] = "residential+cts"
# From MW to W
df_peak_loads_db["peak_load_in_w"] = (
df_peak_loads_db["peak_load_in_w"] * 1e6
)
write_table_to_postgres(df_peak_loads_db, BuildingHeatPeakLoads, drop=drop)
dtypes = {
column.key: column.type
for column in EgonEtragoTimeseriesIndividualHeating.__table__.columns
}
df_heat_mvgd_ts_db = df_heat_mvgd_ts_db.loc[:, dtypes.keys()]
if drop:
logger.info(
f"Drop and recreate table "
f"{EgonEtragoTimeseriesIndividualHeating.__table__.name}."
)
EgonEtragoTimeseriesIndividualHeating.__table__.drop(
bind=engine, checkfirst=True
)
EgonEtragoTimeseriesIndividualHeating.__table__.create(
bind=engine, checkfirst=True
)
with db.session_scope() as session:
df_heat_mvgd_ts_db.to_sql(
name=EgonEtragoTimeseriesIndividualHeating.__table__.name,
schema=EgonEtragoTimeseriesIndividualHeating.__table__.schema,
con=session.connection(),
if_exists="append",
method="multi",
index=False,
dtype=dtypes,
)
[docs]
def export_min_cap_to_csv(df_hp_min_cap_mv_grid_pypsa_eur_sec):
"""Export minimum capacity of heat pumps for pypsa eur sec to csv"""
df_hp_min_cap_mv_grid_pypsa_eur_sec.index.name = "mvgd_id"
df_hp_min_cap_mv_grid_pypsa_eur_sec = (
df_hp_min_cap_mv_grid_pypsa_eur_sec.to_frame(
name="min_hp_capacity"
).reset_index()
)
folder = Path(".") / "input-pypsa-eur-sec"
file = folder / "minimum_hp_capacity_mv_grid_100RE.csv"
# Create the folder, if it does not exist already
if not os.path.exists(folder):
os.mkdir(folder)
if not file.is_file():
logger.info(f"Create {file}")
df_hp_min_cap_mv_grid_pypsa_eur_sec.to_csv(file, mode="w", header=True)
else:
df_hp_min_cap_mv_grid_pypsa_eur_sec.to_csv(
file, mode="a", header=False
)
[docs]
def delete_pypsa_eur_sec_csv_file():
"""Delete pypsa eur sec minimum heat pump capacity csv before new run"""
folder = Path(".") / "input-pypsa-eur-sec"
file = folder / "minimum_hp_capacity_mv_grid_100RE.csv"
if file.is_file():
logger.info(f"Delete {file}")
os.remove(file)
[docs]
def catch_missing_buidings(buildings_decentral_heating, peak_load):
"""
Check for missing buildings and reduce the list of buildings with
decentral heating if no peak loads available. This should only happen
in case of cutout SH
Parameters
-----------
buildings_decentral_heating : list(int)
Array or list of buildings with decentral heating
peak_load : pd.Series
Peak loads of all building within the mvgd
"""
# Catch missing buildings key error
# should only happen within cutout SH
if not all(buildings_decentral_heating.isin(peak_load.index)):
diff = buildings_decentral_heating.difference(peak_load.index)
logger.warning(
f"Dropped {len(diff)} building ids due to missing peak "
f"loads. {len(buildings_decentral_heating)} left."
)
logger.info(f"Dropped buildings: {diff.values}")
buildings_decentral_heating = buildings_decentral_heating.drop(diff)
return buildings_decentral_heating
[docs]
def determine_hp_cap_peak_load_mvgd_ts_2035(mvgd_ids):
"""
Main function to determine HP capacity per building in eGon2035 scenario.
Further, creates heat demand time series for all buildings with heat pumps
in MV grid, as well as for all buildings with gas boilers, used in eTraGo.
Parameters
-----------
mvgd_ids : list(int)
List of MV grid IDs to determine data for.
"""
# ========== Register np datatypes with SQLA ==========
register_adapter(np.float64, adapt_numpy_float64)
register_adapter(np.int64, adapt_numpy_int64)
# =====================================================
df_peak_loads_db = pd.DataFrame()
df_hp_cap_per_building_2035_db = pd.DataFrame()
df_heat_mvgd_ts_db = pd.DataFrame()
for mvgd in mvgd_ids:
logger.info(f"MVGD={mvgd} | Start")
# ############# aggregate residential and CTS demand profiles #####
df_heat_ts = aggregate_residential_and_cts_profiles(
mvgd, scenario="eGon2035"
)
# ##################### determine peak loads ###################
logger.info(f"MVGD={mvgd} | Determine peak loads.")
peak_load_2035 = df_heat_ts.max().rename("eGon2035")
# ######## determine HP capacity per building #########
logger.info(f"MVGD={mvgd} | Determine HP capacities.")
buildings_decentral_heating = (
get_buildings_with_decentral_heat_demand_in_mv_grid(
mvgd, scenario="eGon2035"
)
)
# Reduce list of decentral heating if no Peak load available
# TODO maybe remove after succesfull DE run
# Might be fixed in #990
buildings_decentral_heating = catch_missing_buidings(
buildings_decentral_heating, peak_load_2035
)
hp_cap_per_building_2035 = determine_hp_cap_buildings_pvbased_per_mvgd(
"eGon2035",
mvgd,
peak_load_2035,
buildings_decentral_heating,
)
buildings_gas_2035 = pd.Index(buildings_decentral_heating).drop(
hp_cap_per_building_2035.index
)
# ################ aggregated heat profiles ###################
logger.info(f"MVGD={mvgd} | Aggregate heat profiles.")
df_mvgd_ts_2035_hp = df_heat_ts.loc[
:,
hp_cap_per_building_2035.index,
].sum(axis=1)
# heat demand time series for buildings with gas boiler
df_mvgd_ts_2035_gas = df_heat_ts.loc[:, buildings_gas_2035].sum(axis=1)
df_heat_mvgd_ts = pd.DataFrame(
data={
"carrier": ["heat_pump", "CH4"],
"bus_id": mvgd,
"scenario": ["eGon2035", "eGon2035"],
"dist_aggregated_mw": [
df_mvgd_ts_2035_hp.to_list(),
df_mvgd_ts_2035_gas.to_list(),
],
}
)
# ################ collect results ##################
logger.info(f"MVGD={mvgd} | Collect results.")
df_peak_loads_db = pd.concat(
[df_peak_loads_db, peak_load_2035.reset_index()],
axis=0,
ignore_index=True,
)
df_heat_mvgd_ts_db = pd.concat(
[df_heat_mvgd_ts_db, df_heat_mvgd_ts], axis=0, ignore_index=True
)
df_hp_cap_per_building_2035_db = pd.concat(
[
df_hp_cap_per_building_2035_db,
hp_cap_per_building_2035.reset_index(),
],
axis=0,
)
# ################ export to db #######################
logger.info(f"MVGD={min(mvgd_ids)} : {max(mvgd_ids)} | Write data to db.")
export_to_db(df_peak_loads_db, df_heat_mvgd_ts_db, drop=False)
df_hp_cap_per_building_2035_db["scenario"] = "eGon2035"
# TODO debug duplicated building_ids
duplicates = df_hp_cap_per_building_2035_db.loc[
df_hp_cap_per_building_2035_db.duplicated("building_id", keep=False)
]
if not duplicates.empty:
logger.info(
f"Dropped duplicated buildings: "
f"{duplicates.loc[:,['building_id', 'hp_capacity']]}"
)
df_hp_cap_per_building_2035_db.drop_duplicates("building_id", inplace=True)
df_hp_cap_per_building_2035_db.building_id = (
df_hp_cap_per_building_2035_db.building_id.astype(int)
)
write_table_to_postgres(
df_hp_cap_per_building_2035_db,
EgonHpCapacityBuildings,
drop=False,
)
[docs]
def determine_hp_cap_peak_load_mvgd_ts_status_quo(mvgd_ids, scenario):
"""
Main function to determine HP capacity per building in status quo scenario.
Further, creates heat demand time series for all buildings with heat pumps
in MV grid, as well as for all buildings with gas boilers, used in eTraGo.
Parameters
-----------
mvgd_ids : list(int)
List of MV grid IDs to determine data for.
"""
# ========== Register np datatypes with SQLA ==========
register_adapter(np.float64, adapt_numpy_float64)
register_adapter(np.int64, adapt_numpy_int64)
# =====================================================
df_peak_loads_db = pd.DataFrame()
df_hp_cap_per_building_status_quo_db = pd.DataFrame()
df_heat_mvgd_ts_db = pd.DataFrame()
for mvgd in mvgd_ids:
logger.info(f"MVGD={mvgd} | Start")
# ############# aggregate residential and CTS demand profiles #####
df_heat_ts = aggregate_residential_and_cts_profiles(
mvgd, scenario=scenario
)
# ##################### determine peak loads ###################
logger.info(f"MVGD={mvgd} | Determine peak loads.")
peak_load_status_quo = df_heat_ts.max().rename(scenario)
# ######## determine HP capacity per building #########
logger.info(f"MVGD={mvgd} | Determine HP capacities.")
buildings_decentral_heating = (
get_buildings_with_decentral_heat_demand_in_mv_grid(
mvgd, scenario=scenario
)
)
# Reduce list of decentral heating if no Peak load available
# TODO maybe remove after succesfull DE run
# Might be fixed in #990
buildings_decentral_heating = catch_missing_buidings(
buildings_decentral_heating, peak_load_status_quo
)
hp_cap_per_building_status_quo = (
determine_hp_cap_buildings_pvbased_per_mvgd(
scenario,
mvgd,
peak_load_status_quo,
buildings_decentral_heating,
)
)
# ################ aggregated heat profiles ###################
logger.info(f"MVGD={mvgd} | Aggregate heat profiles.")
df_mvgd_ts_status_quo_hp = df_heat_ts.loc[
:,
hp_cap_per_building_status_quo.index,
].sum(axis=1)
df_heat_mvgd_ts = pd.DataFrame(
data={
"carrier": "heat_pump",
"bus_id": mvgd,
"scenario": scenario,
"dist_aggregated_mw": [df_mvgd_ts_status_quo_hp.to_list()],
}
)
# ################ collect results ##################
logger.info(f"MVGD={mvgd} | Collect results.")
df_peak_loads_db = pd.concat(
[df_peak_loads_db, peak_load_status_quo.reset_index()],
axis=0,
ignore_index=True,
)
df_heat_mvgd_ts_db = pd.concat(
[df_heat_mvgd_ts_db, df_heat_mvgd_ts], axis=0, ignore_index=True
)
df_hp_cap_per_building_status_quo_db = pd.concat(
[
df_hp_cap_per_building_status_quo_db,
hp_cap_per_building_status_quo.reset_index(),
],
axis=0,
)
# ################ export to db #######################
logger.info(f"MVGD={min(mvgd_ids)} : {max(mvgd_ids)} | Write data to db.")
export_to_db(df_peak_loads_db, df_heat_mvgd_ts_db, drop=False)
df_hp_cap_per_building_status_quo_db["scenario"] = scenario
# TODO debug duplicated building_ids
duplicates = df_hp_cap_per_building_status_quo_db.loc[
df_hp_cap_per_building_status_quo_db.duplicated(
"building_id", keep=False
)
]
if not duplicates.empty:
logger.info(
f"Dropped duplicated buildings: "
f"{duplicates.loc[:,['building_id', 'hp_capacity']]}"
)
df_hp_cap_per_building_status_quo_db.drop_duplicates(
"building_id", inplace=True
)
df_hp_cap_per_building_status_quo_db.building_id = (
df_hp_cap_per_building_status_quo_db.building_id.astype(int)
)
write_table_to_postgres(
df_hp_cap_per_building_status_quo_db,
EgonHpCapacityBuildings,
drop=False,
)
[docs]
def determine_hp_cap_peak_load_mvgd_ts_pypsa_eur(mvgd_ids):
"""
Main function to determine minimum required HP capacity in MV for
pypsa-eur-sec. Further, creates heat demand time series for all buildings
with heat pumps in MV grid in eGon100RE scenario, used in eTraGo.
Parameters
-----------
mvgd_ids : list(int)
List of MV grid IDs to determine data for.
"""
# ========== Register np datatypes with SQLA ==========
register_adapter(np.float64, adapt_numpy_float64)
register_adapter(np.int64, adapt_numpy_int64)
# =====================================================
df_peak_loads_db = pd.DataFrame()
df_heat_mvgd_ts_db = pd.DataFrame()
df_hp_min_cap_mv_grid_pypsa_eur_sec = pd.Series(dtype="float64")
for mvgd in mvgd_ids:
logger.info(f"MVGD={mvgd} | Start")
# ############# aggregate residential and CTS demand profiles #####
df_heat_ts = aggregate_residential_and_cts_profiles(
mvgd, scenario="eGon100RE"
)
# ##################### determine peak loads ###################
logger.info(f"MVGD={mvgd} | Determine peak loads.")
peak_load_100RE = df_heat_ts.max().rename("eGon100RE")
# ######## determine minimum HP capacity pypsa-eur-sec ###########
logger.info(f"MVGD={mvgd} | Determine minimum HP capacity.")
buildings_decentral_heating = (
get_buildings_with_decentral_heat_demand_in_mv_grid(
mvgd, scenario="eGon100RE"
)
)
# Reduce list of decentral heating if no Peak load available
# TODO maybe remove after succesfull DE run
buildings_decentral_heating = catch_missing_buidings(
buildings_decentral_heating, peak_load_100RE
)
hp_min_cap_mv_grid_pypsa_eur_sec = (
determine_min_hp_cap_buildings_pypsa_eur_sec(
peak_load_100RE,
buildings_decentral_heating,
)
)
# ################ aggregated heat profiles ###################
logger.info(f"MVGD={mvgd} | Aggregate heat profiles.")
df_mvgd_ts_hp = df_heat_ts.loc[
:,
buildings_decentral_heating,
].sum(axis=1)
df_heat_mvgd_ts = pd.DataFrame(
data={
"carrier": "heat_pump",
"bus_id": mvgd,
"scenario": "eGon100RE",
"dist_aggregated_mw": [df_mvgd_ts_hp.to_list()],
}
)
# ################ collect results ##################
logger.info(f"MVGD={mvgd} | Collect results.")
df_peak_loads_db = pd.concat(
[df_peak_loads_db, peak_load_100RE.reset_index()],
axis=0,
ignore_index=True,
)
df_heat_mvgd_ts_db = pd.concat(
[df_heat_mvgd_ts_db, df_heat_mvgd_ts], axis=0, ignore_index=True
)
df_hp_min_cap_mv_grid_pypsa_eur_sec.loc[mvgd] = (
hp_min_cap_mv_grid_pypsa_eur_sec
)
# ################ export to db and csv ######################
logger.info(f"MVGD={min(mvgd_ids)} : {max(mvgd_ids)} | Write data to db.")
export_to_db(df_peak_loads_db, df_heat_mvgd_ts_db, drop=False)
logger.info(
f"MVGD={min(mvgd_ids)} : {max(mvgd_ids)} | Write "
f"pypsa-eur-sec min "
f"HP capacities to csv."
)
export_min_cap_to_csv(df_hp_min_cap_mv_grid_pypsa_eur_sec)
[docs]
def split_mvgds_into_bulks(n, max_n, func, scenario=None):
"""
Generic function to split task into multiple parallel tasks,
dividing the number of MVGDs into even bulks.
Parameters
-----------
n : int
Number of bulk
max_n: int
Maximum number of bulks
func : function
The funnction which is then called with the list of MVGD as
parameter.
"""
with db.session_scope() as session:
query = (
session.query(
MapZensusGridDistricts.bus_id,
)
.filter(
MapZensusGridDistricts.zensus_population_id
== EgonPetaHeat.zensus_population_id
)
.distinct(MapZensusGridDistricts.bus_id)
)
mvgd_ids = pd.read_sql(
query.statement, query.session.bind, index_col=None
)
mvgd_ids = mvgd_ids.sort_values("bus_id").reset_index(drop=True)
mvgd_ids = np.array_split(mvgd_ids["bus_id"].values, max_n)
# Only take split n
mvgd_ids = mvgd_ids[n]
logger.info(f"Bulk takes care of MVGD: {min(mvgd_ids)} : {max(mvgd_ids)}")
if scenario is not None:
func(mvgd_ids, scenario=scenario)
else:
func(mvgd_ids)
[docs]
def delete_hp_capacity(scenario):
"""Remove all hp capacities for the selected scenario
Parameters
-----------
scenario : string
Either eGon2035 or eGon100RE
"""
with db.session_scope() as session:
# Buses
session.query(EgonHpCapacityBuildings).filter(
EgonHpCapacityBuildings.scenario == scenario
).delete(synchronize_session=False)
[docs]
def delete_mvgd_ts(scenario):
"""Remove all hp capacities for the selected scenario
Parameters
-----------
scenario : string
Either eGon2035 or eGon100RE
"""
with db.session_scope() as session:
# Buses
session.query(EgonEtragoTimeseriesIndividualHeating).filter(
EgonEtragoTimeseriesIndividualHeating.scenario == scenario
).delete(synchronize_session=False)
[docs]
def delete_hp_capacity_100RE():
"""Remove all hp capacities for the selected eGon100RE"""
EgonHpCapacityBuildings.__table__.create(bind=engine, checkfirst=True)
delete_hp_capacity(scenario="eGon100RE")
[docs]
def delete_hp_capacity_status_quo(scenario):
"""Remove all hp capacities for the selected status quo"""
EgonHpCapacityBuildings.__table__.create(bind=engine, checkfirst=True)
delete_hp_capacity(scenario=scenario)
[docs]
def delete_hp_capacity_2035():
"""Remove all hp capacities for the selected eGon2035"""
EgonHpCapacityBuildings.__table__.create(bind=engine, checkfirst=True)
delete_hp_capacity(scenario="eGon2035")
[docs]
def delete_mvgd_ts_status_quo(scenario):
"""Remove all mvgd ts for the selected status quo"""
EgonEtragoTimeseriesIndividualHeating.__table__.create(
bind=engine, checkfirst=True
)
delete_mvgd_ts(scenario=scenario)
[docs]
def delete_mvgd_ts_2035():
"""Remove all mvgd ts for the selected eGon2035"""
EgonEtragoTimeseriesIndividualHeating.__table__.create(
bind=engine, checkfirst=True
)
delete_mvgd_ts(scenario="eGon2035")
[docs]
def delete_mvgd_ts_100RE():
"""Remove all mvgd ts for the selected eGon100RE"""
EgonEtragoTimeseriesIndividualHeating.__table__.create(
bind=engine, checkfirst=True
)
delete_mvgd_ts(scenario="eGon100RE")
[docs]
def delete_heat_peak_loads_status_quo(scenario):
"""Remove all heat peak loads for status quo."""
BuildingHeatPeakLoads.__table__.create(bind=engine, checkfirst=True)
with db.session_scope() as session:
# Buses
session.query(BuildingHeatPeakLoads).filter(
BuildingHeatPeakLoads.scenario == scenario
).delete(synchronize_session=False)
[docs]
def delete_heat_peak_loads_2035():
"""Remove all heat peak loads for eGon2035."""
BuildingHeatPeakLoads.__table__.create(bind=engine, checkfirst=True)
with db.session_scope() as session:
# Buses
session.query(BuildingHeatPeakLoads).filter(
BuildingHeatPeakLoads.scenario == "eGon2035"
).delete(synchronize_session=False)
[docs]
def delete_heat_peak_loads_100RE():
"""Remove all heat peak loads for eGon100RE."""
BuildingHeatPeakLoads.__table__.create(bind=engine, checkfirst=True)
with db.session_scope() as session:
# Buses
session.query(BuildingHeatPeakLoads).filter(
BuildingHeatPeakLoads.scenario == "eGon100RE"
).delete(synchronize_session=False)