"""The central module containing all code dealing with the spatial
distribution of industrial electricity demands.
Industrial demands from DemandRegio are distributed from nuts3 level down
to osm landuse polygons and/or industrial sites also identified within this
processing step bringing three different inputs together.
"""
from pathlib import Path
from urllib.request import urlretrieve
import os
from geoalchemy2.types import Geometry
from sqlalchemy import Column, Float, Integer, Sequence, String
from sqlalchemy.ext.declarative import declarative_base
import geopandas as gpd
import pandas as pd
from egon.data import db, subprocess
from egon.data.datasets import Dataset, DatasetSources, DatasetTargets
import egon.data.config
Base = declarative_base()
[docs]
class HotmapsIndustrialSites(Base):
__tablename__ = "egon_hotmaps_industrial_sites"
__table_args__ = {"schema": "demand"}
siteid = Column(Integer, primary_key=True)
sitename = Column(String)
companyname = Column(String(200))
address = Column(String(170))
citycode = Column(String(50))
city = Column(String(50))
country = Column(String(50))
location = Column(String(130))
subsector = Column(String(50))
datasource = Column(String)
emissions_ets_2014 = Column(Float)
emissions_eprtr_2014 = Column(Float)
production = Column(Float)
fuel_demand = Column(Float)
excess_heat_100_200C = Column(Float)
excess_heat_200_500C = Column(Float)
excess_heat_500C = Column(Float)
excess_heat_total = Column(Float)
geom = Column(Geometry("POINT", 4326), index=True)
wz = Column(Integer)
[docs]
class SeenergiesIndustrialSites(Base):
__tablename__ = "egon_seenergies_industrial_sites"
__table_args__ = {"schema": "demand"}
objectid = Column(Integer, primary_key=True)
siteid = Column(Integer)
companyname = Column(String(100))
address = Column(String(100))
country = Column(String(2))
eu28 = Column(String(3))
subsector = Column(String(30))
lat = Column(Float)
lon = Column(Float)
nuts1 = Column(String(3))
nuts3 = Column(String(5))
excess_heat = Column(String(3))
level_1_tj = Column(Float)
level_2_tj = Column(Float)
level_3_tj = Column(Float)
level_1_r_tj = Column(Float)
level_2_r_tj = Column(Float)
level_3_r_tj = Column(Float)
level_1_pj = Column(Float)
level_2_pj = Column(Float)
level_3_pj = Column(Float)
level_1_r_pj = Column(Float)
level_2_r_pj = Column(Float)
level_3_r_pj = Column(Float)
electricitydemand_tj = Column(Float)
fueldemand_tj = Column(Float)
globalid = Column(String(50))
geom = Column(Geometry("POINT", 4326), index=True)
wz = Column(Integer)
[docs]
class SchmidtIndustrialSites(Base):
__tablename__ = "egon_schmidt_industrial_sites"
__table_args__ = {"schema": "demand"}
id = Column(Integer, primary_key=True)
application = Column(String(50))
plant = Column(String(100))
landkreis_number = Column(String(5))
annual_tonnes = Column(Float)
capacity_production = Column(String(10))
lat = Column(Float)
lon = Column(Float)
geom = Column(Geometry("POINT", 4326), index=True)
wz = Column(Integer)
[docs]
class IndustrialSites(Base):
__tablename__ = "egon_industrial_sites"
__table_args__ = {"schema": "demand"}
id = Column(
Integer,
Sequence("industrial_sites_id_seq", schema="demand"),
server_default=Sequence(
"industrial_sites_id_seq", schema="demand"
).next_value(),
primary_key=True,
)
companyname = Column(String(100))
address = Column(String(170))
subsector = Column(String(100))
wz = Column(Integer)
nuts3 = Column(String(10))
geom = Column(Geometry("POINT", 4326), index=True)
[docs]
def create_tables():
"""Create tables for industrial sites and distributed industrial demands
Returns
-------
None.
"""
# Create target schema
db.execute_sql("CREATE SCHEMA IF NOT EXISTS demand;")
db.execute_sql(
f"""DROP TABLE IF EXISTS {MergeIndustrialSites.targets.tables['hotmaps']} CASCADE;"""
)
db.execute_sql(
f"""DROP TABLE IF EXISTS {MergeIndustrialSites.targets.tables['seenergies']} CASCADE;"""
)
db.execute_sql(
f"""DROP TABLE IF EXISTS {MergeIndustrialSites.targets.tables['schmidt']} CASCADE;"""
)
db.execute_sql(
f"""DROP TABLE IF EXISTS {MergeIndustrialSites.targets.tables['sites']} CASCADE;"""
)
# Drop sequence
db.execute_sql(
f"""DROP SEQUENCE IF EXISTS {MergeIndustrialSites.targets.tables['sites']}_id_seq CASCADE;"""
)
engine = db.engine()
HotmapsIndustrialSites.__table__.create(bind=engine, checkfirst=True)
SeenergiesIndustrialSites.__table__.create(bind=engine, checkfirst=True)
SchmidtIndustrialSites.__table__.create(bind=engine, checkfirst=True)
IndustrialSites.__table__.create(bind=engine, checkfirst=True)
[docs]
def download_hotmaps():
download_directory = "industrial_sites"
if not os.path.exists(download_directory):
os.mkdir(download_directory)
target_file = Path(MergeIndustrialSites.targets.files["hotmaps_download"])
url = MergeIndustrialSites.sources.urls["hotmaps"]
if not os.path.isfile(target_file):
subprocess.run(f"curl {url} > {target_file}", shell=True)
[docs]
def download_seenergies():
"""Download csv file on s-eenergies' industrial sites."""
download_directory = "industrial_sites"
# Create the folder, if it does not exists already
if not os.path.exists(download_directory):
os.mkdir(download_directory)
# Use the new class attributes for the target file and source URL
target_file = Path(
MergeIndustrialSites.targets.files["seenergies_download"]
)
url = MergeIndustrialSites.sources.urls["seenergies"]
if not os.path.isfile(target_file):
urlretrieve(url, target_file)
[docs]
def hotmaps_to_postgres():
"""Import hotmaps data to postgres database"""
input_file = Path(MergeIndustrialSites.targets.files["hotmaps_download"])
engine = db.engine()
db.execute_sql(
f"DELETE FROM {MergeIndustrialSites.targets.tables['hotmaps']}"
)
# Read csv to dataframe
df = pd.read_csv(input_file, delimiter=";")
# Adjust column names
df = df.rename(
columns={
"SiteID": "siteid",
"CompanyName": "companyname",
"SiteName": "sitename",
"Address": "address",
"CityCode": "citycode",
"City": "city",
"Country": "country",
"geom": "geom",
"Subsector": "subsector",
"DataSource": "datasource",
"Emissions_ETS_2014": "emissions_ets_2014",
"Emissions_EPRTR_2014": "emissions_eprtr_2014",
"Production": "production",
"Fuel_Demand": "fuel_demand",
"Excess_Heat_100-200C": "excess_heat_100_200C",
"Excess_Heat_200-500C": "excess_heat_200_500C",
"Excess_Heat_500C": "excess_heat_500C",
"Excess_Heat_Total": "excess_heat_total",
}
)
# Remove entries without geometry
df = df[df.country == "Germany"]
df = df[df.geom.notnull()]
# From EWKT to WKT
for i in df.index:
df.loc[i, "geom"] = df.loc[i, "geom"].split(";")[1]
# Create geometry with shapely
geom = gpd.GeoSeries.from_wkt(df["geom"])
# Import as geodataframe
gdf = gpd.GeoDataFrame(
df, geometry=gpd.points_from_xy(geom.x, geom.y), crs="EPSG:4326"
)
# Select boundaries
boundaries = db.select_geodataframe(
"SELECT * FROM boundaries.vg250_sta_union",
geom_col="geometry",
epsg=4326,
)
# Choose only sites inside Germany or testmode boundaries
gdf = gpd.sjoin(gdf, boundaries).drop(
["id", "bez", "area_ha", "index_right", "geom"], axis=1
)
# Rename geometry column
gdf = gdf.rename(columns={"geometry": "geom"}).set_geometry("geom")
# Remove duplicates on columns 'plant' and 'geom'
gdf = gdf.drop_duplicates(subset=["subsector", "geom"])
# Add additional column for sector information (wz)
gdf["wz"] = gdf["subsector"]
# Map subsector information and WZ definition for hotmaps data
wz_definition = pd.Series(
{
"Paper and printing": 1718,
"Refineries": 19,
"Cement": 23,
"Glass": 23,
"Iron and steel": 24,
"Non-ferrous metals": 24,
"Non-metallic mineral products": 23,
"Chemical industry": 20,
}
)
# Map WZ ids and subsectors from hotmaps
gdf["wz"] = gdf["wz"].map(wz_definition)
# Write data to db
gdf.to_postgis(
MergeIndustrialSites.targets.get_table_name("hotmaps"),
engine,
schema=MergeIndustrialSites.targets.get_table_schema("hotmaps"),
if_exists="append",
index=df.index,
)
[docs]
def seenergies_to_postgres():
"""Import seenergies data to postgres database"""
# Get information from data configuration file
input_file = Path(
MergeIndustrialSites.targets.files["seenergies_download"]
)
engine = db.engine()
db.execute_sql(
f"DELETE FROM {MergeIndustrialSites.targets.tables['seenergies']}"
)
# Read csv to dataframe
df = pd.read_csv(input_file, delimiter=",")
df = df.drop(["X", "Y"], axis=1)
# Adjust column names
df = df.rename(
columns={
"SiteName": "sitename",
"OBJECTID": "objectid",
"SiteId": "siteid",
"CompanyName": "companyname",
"StreetNameAndNumber": "address",
"Country": "country",
"EU28": "eu28",
"Eurostat_Name": "subsector",
"Latitude": "lat",
"Longitude": "lon",
"NUTS1ID": "nuts1",
"NUTS3ID": "nuts3",
"Excess_Heat": "excess_heat",
"level_1_Tj": "level_1_tj",
"level_2_Tj": "level_2_tj",
"level_3_Tj": "level_3_tj",
"level_1_r_Tj": "level_1_r_tj",
"level_2_r_Tj": "level_2_r_tj",
"level_3_r_Tj": "level_3_r_tj",
"level_1_Pj": "level_1_pj",
"level_2_Pj": "level_2_pj",
"level_3_Pj": "level_3_pj",
"level_1_r_Pj": "level_1_r_pj",
"level_2_r_Pj": "level_2_r_pj",
"level_3_r_Pj": "level_3_r_pj",
"ElectricityDemand_TJ_a": "electricitydemand_tj",
"FuelDemand_TJ_a": "fueldemand_tj",
"GlobalID": "globalid",
}
)
gdf = gpd.GeoDataFrame(
df, geometry=gpd.points_from_xy(df.lon, df.lat), crs="EPSG:4326"
)
gdf = gdf.rename({"geometry": "geom"}, axis=1).set_geometry("geom")
boundaries = db.select_geodataframe(
"SELECT * FROM boundaries.vg250_sta_union",
geom_col="geometry",
epsg=4326,
)
# Choose only sites inside Germany or testmode boundaries
gdf = gpd.sjoin(gdf, boundaries).drop(
["id", "bez", "area_ha", "index_right"], axis=1
)
# Remove duplicates on columns 'plant' and 'geom'
gdf = gdf.drop_duplicates(subset=["subsector", "geom"])
# Add additional column for sector information (wz)
gdf["wz"] = gdf["subsector"]
# Map subsector information and WZ definition for seenergies data
wz_definition = pd.Series(
{
"Paper and printing": 1718,
"Refineries": 19,
"Cement": 23,
"Glass": 23,
"Iron and steel": 24,
"Non-ferrous metals": 24,
"Non-metallic minerals": 23,
"Chemical industry": 20,
}
)
# Map WZ ids and subsectors from seenergies
gdf["wz"] = gdf["wz"].map(wz_definition)
# Write data to db
gdf.to_postgis(
MergeIndustrialSites.targets.get_table_name("seenergies"),
engine,
schema=MergeIndustrialSites.targets.get_table_schema("seenergies"),
if_exists="append",
index=df.index,
)
[docs]
def schmidt_to_postgres():
"""Import data from Thesis by Danielle Schmidt to postgres database"""
# Get information from data configuration file
input_file = (
Path(".")
/ "data_bundle_egon_data"
/ MergeIndustrialSites.sources.files["schmidt"]
)
engine = db.engine()
db.execute_sql(
f"DELETE FROM {MergeIndustrialSites.targets.tables['schmidt']}"
)
# Read csv to dataframe
df = pd.read_csv(input_file, delimiter=";")
# Adjust column names
df = df.rename(
columns={
"Application": "application",
"Plant": "plant",
"Landkreis Number": "landkreis_number",
"Annual Tonnes": "annual_tonnes",
"Capacity or Production": "capacity_production",
"Latitude": "lat",
"Longitude": "lon",
}
)
gdf = gpd.GeoDataFrame(
df, geometry=gpd.points_from_xy(df.lon, df.lat), crs="EPSG:4326"
)
gdf = gdf.rename({"geometry": "geom"}, axis=1).set_geometry("geom")
boundaries = db.select_geodataframe(
"SELECT * FROM boundaries.vg250_sta_union",
geom_col="geometry",
epsg=4326,
)
# Choose only sites inside Germany or testmode boundaries
gdf = gpd.sjoin(gdf, boundaries).drop(
["id", "bez", "area_ha", "index_right"], axis=1
)
# Add additional column for sector information (wz)
gdf["wz"] = gdf["application"]
# Map subsector information and WZ definition for hotmaps data
wz_definition = pd.Series(
{
"Mechanical Pulp": 1718,
"Packing Paper and Board": 1718,
"Cement Mill": 23,
"Technical/Special Paper and Board": 1718,
"Graphic Paper": 1718,
"Hygiene Paper": 1718,
"Recycled Paper": 1718,
}
)
# Map WZ ids and subsectors from hotmaps
gdf["wz"] = gdf["wz"].map(wz_definition)
# Write data to db
gdf.to_postgis(
MergeIndustrialSites.targets.get_table_name("schmidt"),
engine,
schema=MergeIndustrialSites.targets.get_table_schema("schmidt"),
if_exists="append",
index=df.index,
)
[docs]
def download_import_industrial_sites():
"""
Wraps different functions to create tables, download csv files containing
information on industrial sites in Germany and write this data to the
local postgresql database
Returns
-------
None.
"""
create_tables()
download_hotmaps()
download_seenergies()
hotmaps_to_postgres()
seenergies_to_postgres()
schmidt_to_postgres()
[docs]
def map_nuts3():
"""
Match resulting industrial sites with nuts3 codes and fill column 'nuts3'
"""
db.execute_sql(f"""UPDATE {MergeIndustrialSites.targets.tables['sites']} s
SET nuts3 = krs.nuts
FROM {MergeIndustrialSites.sources.tables['vg250_krs']} krs
WHERE ST_WITHIN(s.geom, ST_TRANSFORM(krs.geometry,4326));""")
[docs]
class MergeIndustrialSites(Dataset):
sources = DatasetSources(
urls={
"hotmaps": "https://gitlab.com/hotmaps/industrial_sites/industrial_sites_Industrial_Database/-/raw/388278c6df35889b1447a959fc3759e3d78bf659/data/Industrial_Database.csv?inline=false",
"seenergies": "https://opendata.arcgis.com/datasets/5e36c0af918040ed936b4e4c101f611d_0.csv",
},
files={
"schmidt": "industrial_sites/MA_Schmidt_Industriestandorte_georef.csv"
},
tables={
# These tables are targets of earlier steps, but sources for the final merge
"hotmaps_processed": "demand.egon_hotmaps_industrial_sites",
"seenergies_processed": "demand.egon_seenergies_industrial_sites",
"schmidt_processed": "demand.egon_schmidt_industrial_sites",
"vg250_krs": "boundaries.vg250_krs",
},
)
targets = DatasetTargets(
files={
"hotmaps_download": "industrial_sites/data_Industrial_Database.csv",
"seenergies_download": "industrial_sites/D5_1_Industry_Dataset_With_Demand_Data.csv",
},
tables={
"hotmaps": "demand.egon_hotmaps_industrial_sites",
"seenergies": "demand.egon_seenergies_industrial_sites",
"schmidt": "demand.egon_schmidt_industrial_sites",
"sites": "demand.egon_industrial_sites",
},
)
def __init__(self, dependencies):
super().__init__(
name="Merge_industrial_sites",
version="0.0.4",
dependencies=dependencies,
tasks=(download_import_industrial_sites, merge_inputs, map_nuts3),
)