Source code for egon.data.datasets.heat_demand

# -*- coding: utf-8 -*-

# This script is part of eGon-data.

# license text - to be added.

"""
Central module containing all code dealing with the future heat demand import.

This module obtains the residential and service-sector heat demand data for
2015 from Peta5.0.1, calculates future heat demands and saves them in the
database with assigned census cell IDs.

"""

from pathlib import Path  # for database import
from urllib.request import urlretrieve

# for metadata creation
import json
import os
import time
import zipfile

from jinja2 import Template
from rasterio.mask import mask

# packages for ORM class definition
from sqlalchemy import Column, Float, Integer, Sequence, String
from sqlalchemy.ext.declarative import declarative_base
import geopandas as gpd

# for raster operations
import rasterio

from egon.data import db, subprocess
from egon.data.datasets import Dataset, DatasetSources, DatasetTargets
from egon.data.datasets.scenario_parameters import get_sector_parameters
from egon.data.metadata import context, license_ccby, meta_metadata, sources
import egon.data.config


[docs] class HeatDemandImport(Dataset): """ Insert the annual heat demand per census cell for each scenario This dataset downloads the heat demand raster data for private households and CTS from Peta 5.0.1 (https://s-eenergies-open-data-euf.hub.arcgis.com/maps/d7d18b63250240a49eb81db972aa573e/about) and stores it into files in the working directory. The data from Peta 5.0.1 represents the status quo of the year 2015. To model future heat demands, the data is scaled to meet target values from external sources. These target values are defined for each scenario in :py:class:`ScenarioParameters <egon.data.datasets.scenario_parameters.ScenarioParameters>`. *Dependencies* * :py:class:`ScenarioParameters <egon.data.datasets.scenario_parameters.ScenarioParameters>` * :py:class:`Vg250 <egon.data.datasets.vg250.Vg250>` * :py:class:`ZensusVg250 <egon.data.datasets.zensus_vg250.ZensusVg250>` *Resulting tables* * :py:class:`demand.egon_peta_heat <egon.data.datasets.heat_demand.EgonPetaHeat>` is created and filled """ #: name: str = "heat-demands" #: version: str = "0.0.7" sources = DatasetSources( tables={ "boundaries": "boundaries.vg250_sta_union", "zensus_population": "society.destatis_zensus_population_per_ha", }, urls={ "peta_res_zip": "https://arcgis.com/sharing/rest/content/items/d7d18b63250240a49eb81db972aa573e/data", "peta_ser_zip": "https://arcgis.com/sharing/rest/content/items/52ff5e02111142459ed5c2fe3d80b3a0/data", }, files={ "peta_res_zip": "Peta5_0_1_HD_res.zip", "peta_ser_zip": "Peta5_0_1_HD_ser.zip", "res_cutout_tif": "Peta_5_0_1/res_hd_2015_GER.tif", "ser_cutout_tif": "Peta_5_0_1/ser_hd_2015_GER.tif", "scenario_res_glob": "heat_scenario_raster/res_HD_*.tif", "scenario_ser_glob": "heat_scenario_raster/ser_HD_*.tif", }, ) targets = DatasetTargets( tables={ "heat_demand": "demand.egon_peta_heat", }, files={ "scenario_dir": "heat_scenario_raster", }, ) def __init__(self, dependencies): super().__init__( name=self.name, # version=self.target_files + "_0.0", version=self.version, # maybe rethink the naming dependencies=dependencies, tasks=(scenario_data_import), )
Base = declarative_base() # class for the final dataset in the database
[docs] class EgonPetaHeat(Base): __tablename__ = "egon_peta_heat" __table_args__ = {"schema": "demand"} id = Column( Integer, Sequence("egon_peta_heat_seq", schema="demand"), server_default=Sequence( "egon_peta_heat_seq", schema="demand" ).next_value(), primary_key=True, ) demand = Column(Float) sector = Column(String) scenario = Column(String) zensus_population_id = Column(Integer)
[docs] def download_peta5_0_1_heat_demands(): """ Download Peta5.0.1 tiff files. The downloaded data contain residential and service-sector heat demands per hectar grid cell for 2015. Parameters ---------- None Returns ------- None Notes ----- The heat demand data in the Peta5.0.1 dataset are assumed not change. An upgrade to a higher Peta version is currently not foreseen. Therefore, for the version management we can assume that the dataset will not change, unless the code is changed. """ target_file_res = HeatDemandImport.sources.files["peta_res_zip"] if not os.path.isfile(target_file_res): urlretrieve( HeatDemandImport.sources.urls["peta_res_zip"], target_file_res ) # service-sector heat demands 2015 target_file_ser = HeatDemandImport.sources.files["peta_ser_zip"] if not os.path.isfile(target_file_ser): urlretrieve( HeatDemandImport.sources.urls["peta_ser_zip"], target_file_ser ) return None
[docs] def unzip_peta5_0_1_heat_demands(): """ Unzip the downloaded Peta5.0.1 tiff files. Parameters ---------- None Returns ------- None Notes ----- It is assumed that the Peta5.0.1 dataset does not change and that the version number does not need to be checked. """ filepath_zip_res = HeatDemandImport.sources.files["peta_res_zip"] filepath_zip_ser = HeatDemandImport.sources.files["peta_ser_zip"] directory_to_extract_to = os.path.dirname( HeatDemandImport.sources.files["res_cutout_tif"] ) # Create the folder, if it does not exists already if not os.path.exists(directory_to_extract_to): os.mkdir(directory_to_extract_to) # Unzip the tiffs, if they do not exist if not os.path.isfile( directory_to_extract_to + "/HD_2015_res_Peta5_0_1_GJ.tif" ): with zipfile.ZipFile(filepath_zip_res, "r") as zf: zf.extractall(directory_to_extract_to) if not os.path.isfile( directory_to_extract_to + "/HD_2015_ser_Peta5_0_1_GJ.tif" ): with zipfile.ZipFile(filepath_zip_ser, "r") as zf: zf.extractall(directory_to_extract_to) return None
[docs] def cutout_heat_demand_germany(): """ Save cutouts of Germany's 2015 heat demand densities from Europe-wide tifs. 1. Get the German state boundaries 2. Load the unzip 2015 heat demand data (Peta5_0_1) and 3. Cutout Germany's residential and service-sector heat demand densities 4. Save the cutouts as tiffs Parameters ---------- None Returns ------- None Notes ---- The alternative of cutting out Germany from the pan-European raster based on German census cells, instead of using state boundaries with low resolution (to avoid inaccuracies), was not implemented in order to achieve consistency with other datasets (e.g. egon_mv_grid_district). Besides, all attempts to read, (union) and load cells from the local database failed, but were documented as commented code within this function and afterwards removed. If you want to have a look at the comments, please check out commit ec3391e182215b32cd8b741557a747118ab61664, which is the last commit still containing them. Also the usage of a buffer around the boundaries and the subsequent selection of German cells was not implemented. could be used, but then it must be ensured that later only heat demands of cells belonging to Germany are used. """ # Load the German boundaries from the local database using a dissolved # dataset which provides one multipolygon local_engine = db.engine() # Recommened way: gpd.read_postgis() # https://geopandas.readthedocs.io/en/latest/docs/reference/api/geopandas.GeoDataFrame.from_postgis.html?highlight=postgis#geopandas.GeoDataFrame.from_postgis # multipolygon can be converted into polygons for outcut function # using ST_Dump: https://postgis.net/docs/ST_Dump.html gdf_boundaries = gpd.read_postgis( f"SELECT (ST_Dump(geometry)).geom AS geometry FROM {HeatDemandImport.sources.tables['boundaries']}", local_engine, geom_col="geometry", ) # rasterio wants the mask to be a GeoJSON-like dict or an object that # implements the Python geo interface protocol (such as a Shapely Polygon) # look at the data, to understanding it # gdf_boundaries.head # len(gdf_boundaries) # type(gdf_boundaries) # gdf_boundaries.crs # gdf_boundaries.iloc[:,0] # gdf_boundaries.iloc[0,:] # gdf_boundaries.plot() # Load the unzipped heat demand data and cutout Germany # path to the downloaded and unzipped rediential heat demand 2015 data res_hd_2015 = "Peta_5_0_1/HD_2015_res_Peta5_0_1_GJ.tif" # path to the downloaded and unzipped service-sector heat demand 2015 data ser_hd_2015 = "Peta_5_0_1/HD_2015_ser_Peta5_0_1_GJ.tif" with rasterio.open(res_hd_2015) as dataset: # https://rasterio.readthedocs.io/en/latest/topics/masking-by-shapefile.html out_image, out_transform = mask( dataset, gdf_boundaries.iloc[:, 0], crop=True ) out_meta = dataset.meta # Understanding the outputs # show(out_image) # out_transform out_meta.update( { "driver": "GTiff", "height": out_image.shape[1], "width": out_image.shape[2], "transform": out_transform, } ) with rasterio.open( HeatDemandImport.sources.files["res_cutout_tif"], "w", **out_meta ) as dest: dest.write(out_image) # Do the same for the service-sector with rasterio.open(ser_hd_2015) as dataset: # https://rasterio.readthedocs.io/en/latest/topics/masking-by-shapefile.html out_image, out_transform = mask( dataset, gdf_boundaries.iloc[:, 0], crop=True ) out_meta = dataset.meta # Understanding the outputs # show(out_image) # out_transform out_meta.update( { "driver": "GTiff", "height": out_image.shape[1], "width": out_image.shape[2], "transform": out_transform, } ) with rasterio.open( HeatDemandImport.sources.files["ser_cutout_tif"], "w", **out_meta ) as dest: dest.write(out_image) return None
[docs] def future_heat_demand_germany(scenario_name): """ Calculate the future residential and service-sector heat demand per ha. The calculation is based on Peta5_0_1 heat demand densities, cutout for Germany, for the year 2015. The given scenario name is used to read the adjustment factors for the heat demand rasters from the scenario table. Parameters ---------- scenario_name: str Selected scenario name for which assumptions will be loaded. Returns ------- None Notes ----- None TODO ---- Specify the crs of the created heat demand tiffs: EPSG 3035 Check if the raster calculations are correct. Check, if there are populated cells without heat demand. """ # Load the values if scenario_name == "eGon2015": res_hd_reduction = 1 ser_hd_reduction = 1 elif scenario_name == "status2019": heat_parameters = get_sector_parameters("heat", scenario=scenario_name) # Calculate reduction share based on final energy demand and overall demand from Peta for 2015 res_hd_reduction = ( heat_parameters["DE_demand_residential_TJ"] / 3600 / 443.788483 ) ser_hd_reduction = ( heat_parameters["DE_demand_service_TJ"] / 3600 / 226.588158 ) elif scenario_name == "status2023": heat_parameters = get_sector_parameters( "heat", scenario=scenario_name ) # currently data for 2019 is used # see scenario_paramters/__init__ for this. # Calculate reduction share based on final energy demand and overall demand from Peta for 2015 res_hd_reduction = ( heat_parameters["DE_demand_residential_TJ"] / 3600 / 443.788483 # TODO status2023 can values stay same? ) ser_hd_reduction = ( heat_parameters["DE_demand_service_TJ"] / 3600 / 226.588158 # TODO status2023 can values stay same? ) elif scenario_name == "eGon100RE": heat_parameters = get_sector_parameters("heat", scenario=scenario_name) # Calculate reduction share based on final energy demand and overall demand from Peta for 2015 res_hd_reduction = heat_parameters["DE_demand_residential_MWh"] / ( 443.788483 * 1e6 ) ser_hd_reduction = heat_parameters["DE_demand_service_MWh"] / ( 226.588158 * 1e6 ) else: heat_parameters = get_sector_parameters("heat", scenario=scenario_name) res_hd_reduction = heat_parameters["DE_demand_reduction_residential"] ser_hd_reduction = heat_parameters["DE_demand_reduction_service"] # Define the directory where the created rasters will be saved scenario_raster_directory = HeatDemandImport.targets.files["scenario_dir"] if not os.path.exists(scenario_raster_directory): os.mkdir(scenario_raster_directory) # Open, read and adjust the cutout heat demand distributions for Germany # https://rasterio.readthedocs.io/en/latest/topics/writing.html # https://gis.stackexchange.com/questions/338282/applying-equation-to-a-numpy-array-while-preserving-tiff-metadata-coordinates # Write an array as a raster band to a new 16-bit file. For # the new file's profile, the profile of the source is adjusted. # Residential heat demands first res_cutout = HeatDemandImport.sources.files["res_cutout_tif"] with rasterio.open(res_cutout) as src: # open raster dataset res_hd_2015 = src.read(1) # read as numpy array; band 1; masked=True?? res_profile = src.profile # adjusting and connversion to MWh res_scenario_raster = res_hd_reduction * res_hd_2015 / 3.6 res_profile.update( dtype=rasterio.float32, # set the dtype to float32 count=1, # change the band count to 1 compress="lzw", # specify LZW compression ) # Save the scenario's residential heat demands as tif file # Define the filename for export res_result_filename = os.path.join( scenario_raster_directory, f"res_HD_{scenario_name}.tif" ) # Open raster dataset in 'w' write mode using the adjusted meta data with rasterio.open(res_result_filename, "w", **res_profile) as dst: dst.write(res_scenario_raster.astype(rasterio.float32), 1) # Do the same for the service-sector ser_cutout = HeatDemandImport.sources.files["ser_cutout_tif"] with rasterio.open(ser_cutout) as src: # open raster dataset ser_hd_2015 = src.read(1) # read as numpy array; band 1; masked=True?? ser_profile = src.profile # adjusting and connversion to MWh ser_scenario_raster = ser_hd_reduction * ser_hd_2015 / 3.6 ser_profile.update(dtype=rasterio.float32, count=1, compress="lzw") # Save the scenario's service-sector heat demands as tif file # Define the filename for export ser_result_filename = os.path.join( scenario_raster_directory, f"ser_HD_{scenario_name}.tif" ) # Open raster dataset in 'w' write mode using the adjusted meta data with rasterio.open(ser_result_filename, "w", **ser_profile) as dst: dst.write(ser_scenario_raster.astype(rasterio.float32), 1) return None
[docs] def heat_demand_to_db_table(): """ Import heat demand rasters and convert them to vector data. Specify the rasters to import as raster file patterns (file type and directory containing raster files, which all will be imported). The rasters are stored in a temporary table called "heat_demand_rasters". The final demand data, having the census IDs as foreign key (from the census population table), are genetated by the provided sql script (raster2cells-and-centroids.sql) and are stored in the table "demand.egon_peta_heat". Parameters ---------- None Returns ------- None Notes ----- Please note that the data from "demand.egon_peta_heat" is deleted prior to the import, so make sure you're not loosing valuable data. TODO ---- Check if data already exists in database or if the function needs to be executed: data version management. Define version number correctly """ sources = [ path for pattern in ( HeatDemandImport.sources.files["scenario_res_glob"], HeatDemandImport.sources.files["scenario_ser_glob"], ) for path in Path(".").glob(pattern) ] # Create the schema for the final table, if needed engine = db.engine() db.execute_sql("CREATE SCHEMA IF NOT EXISTS demand;") sql_script = os.path.join( os.path.dirname(__file__), "raster2cells-and-centroids.sql" ) db.execute_sql( f"DELETE FROM {HeatDemandImport.targets.tables['heat_demand']};" ) for source in sources: if not "2015" in source.stem: # Create a temporary table and fill the final table using the sql script rasters = f"heat_demand_rasters_{source.stem.lower()}" import_rasters = subprocess.run( ["raster2pgsql", "-e", "-s", "3035", "-I", "-C", "-F", "-a"] + [source] + [f"{rasters}"], text=True, ).stdout with engine.begin() as connection: print( f'CREATE TEMPORARY TABLE "{rasters}"' ' ("rid" serial PRIMARY KEY,"rast" raster,"filename" text);' ) connection.execute( f'CREATE TEMPORARY TABLE "{rasters}"' ' ("rid" serial PRIMARY KEY,"rast" raster,"filename" text);' ) connection.execute(import_rasters) connection.execute(f'ANALYZE "{rasters}"') with open(sql_script) as convert: connection.execute( Template(convert.read()).render(source=rasters) ) return None
[docs] def adjust_residential_heat_to_zensus(scenario): """ Adjust residential heat demands to fit to zensus population. In some cases, Peta assigns residential heat demand to unpopulated cells. This can be caused by the different population data used in Peta or buildings in zenus cells without a population (see :func:`egon.data.importing.zensus.adjust_zensus_misc`) Residential heat demand in cells without zensus population is dropped. Residential heat demand in cells with zensus population is scaled to meet the overall residential heat demands. Parameters ---------- scenario : str Name of the scenario. Returns ------- None """ # Select overall residential heat demand overall_demand = db.select_dataframe( f"""SELECT SUM(demand) as overall_demand FROM {HeatDemandImport.targets.tables['heat_demand']} WHERE scenario = {'scenario'} and sector = 'residential' """ ).overall_demand[0] # Select heat demand in populated cells df = db.select_dataframe( f"""SELECT * FROM {HeatDemandImport.targets.tables['heat_demand']} WHERE scenario = {'scenario'} and sector = 'residential' AND zensus_population_id IN ( SELECT id FROM {HeatDemandImport.sources.tables['zensus_population']} )""", index_col="id", ) # Scale heat demands in populated cells df.loc[:, "demand"] *= overall_demand / df.loc[:, "demand"].sum() # Drop residential heat demands db.execute_sql( f"""DELETE FROM {HeatDemandImport.targets.tables['heat_demand']} WHERE scenario = {'scenario'} and sector = 'residential'""" ) # Insert adjusted heat demands in populated cells df.to_sql( "egon_peta_heat", schema="demand", con=db.engine(), if_exists="append" ) return None
[docs] def add_metadata(): """ Writes metadata JSON string into table comment. """ # Metadata creation meta = { "name": "egon_peta_heat_metadata", "title": "eGo_n scenario-specific future heat demand data", "id": "WILL_BE_SET_AT_PUBLICATION", "description": "Future heat demands per hectare grid cell of " "the residential and service sector", "language": ["EN"], "context": context(), "spatial": { "location": None, "extent": "Germany", "resolution": "100x100m", }, "sources": [ sources()["egon-data"], sources()["peta"], sources()["vg250"], sources()["zensus"], ], "resources": [ { "profile": "tabular-data-resource", "name": "egon_peta_heat", "path": "", "format": "PostgreSQL", "encoding": "UTF-8", "schema": { "fields": [ { "name": "id", "description": "Unique identifier", "type": "serial", "unit": "none", }, { "name": "demand", "description": "annual heat demand", "type": "double precision", "unit": "MWh", }, { "name": "sector", "description": "sector e.g. residential", "type": "text", "unit": "none", }, { "name": "scenario", "description": "scenario name", "type": "text", "unit": "none", }, { "name": "zensus_population_id", "description": "census cell id", "type": "integer", "unit": "none", }, ], "primaryKey": ["id"], "foreignKeys": [ { "fields": ["zensus_population_id"], "reference": { "resource": "society.destatis_zensus_population_per_ha", "fields": ["id"], }, }, { "fields": ["scenario"], "reference": { "resource": "scenario.egon_scenario_parameters", "fields": ["name"], }, }, ], }, "dialect": {"delimiter": "none", "decimalSeparator": "."}, } ], "licenses": [license_ccby("© Europa-Universität Flensburg")], "contributors": [ { "title": "EvaWie", "email": "http://github.com/EvaWie", "date": time.strftime("%Y-%m-%d"), "object": None, "comment": "Imported data", }, { "title": "Clara Büttner", "email": "http://github.com/ClaraBuettner", "date": time.strftime("%Y-%m-%d"), "object": None, "comment": "Updated metadata", }, ], "metaMetadata": meta_metadata(), } meta_json = "'" + json.dumps(meta) + "'" db.submit_comment(meta_json, "demand", "egon_peta_heat")
[docs] def scenario_data_import(): """ Call all heat demand import related functions. This function executes the functions that download, unzip and adjust the heat demand distributions from Peta5.0.1 and that save the future heat demand distributions for Germany as tiffs as well as with census grid IDs as foreign key in the database. Parameters ---------- None Returns ------- None Notes ----- None """ # create schema if not exists db.execute_sql("CREATE SCHEMA IF NOT EXISTS demand;") # drop table if exists # can be removed when table structure doesn't change anymore db.execute_sql( f"DROP TABLE IF EXISTS {HeatDemandImport.targets.tables['heat_demand']} CASCADE" ) db.execute_sql( f"DROP SEQUENCE IF EXISTS {HeatDemandImport.targets.get_table_schema('heat_demand')}." f"{HeatDemandImport.targets.get_table_name('heat_demand')}_seq CASCADE" ) # create table EgonPetaHeat.__table__.create(bind=db.engine(), checkfirst=True) download_peta5_0_1_heat_demands() unzip_peta5_0_1_heat_demands() cutout_heat_demand_germany() # Specifiy the scenario names for loading factors from csv file for scenario in egon.data.config.settings()["egon-data"]["--scenarios"]: future_heat_demand_germany(scenario) # future_heat_demand_germany("eGon2015") heat_demand_to_db_table() for scenario in egon.data.config.settings()["egon-data"]["--scenarios"]: adjust_residential_heat_to_zensus(scenario) # future_heat_demand_germany("eGon2015") add_metadata() return None