Source code for egon.data.datasets.industrial_sites

"""The central module containing all code dealing with the spatial
distribution of industrial electricity demands.
Industrial demands from DemandRegio are distributed from nuts3 level down
to osm landuse polygons and/or industrial sites also identified within this
processing step bringing three different inputs together.

"""

from pathlib import Path
from urllib.request import urlretrieve
import os

from geoalchemy2.types import Geometry
from sqlalchemy import Column, Float, Integer, Sequence, String
from sqlalchemy.ext.declarative import declarative_base
import geopandas as gpd
import pandas as pd

from egon.data import db, subprocess
from egon.data.datasets import Dataset, DatasetSources, DatasetTargets
import egon.data.config

Base = declarative_base()


[docs] class HotmapsIndustrialSites(Base): __tablename__ = "egon_hotmaps_industrial_sites" __table_args__ = {"schema": "demand"} siteid = Column(Integer, primary_key=True) sitename = Column(String) companyname = Column(String(200)) address = Column(String(170)) citycode = Column(String(50)) city = Column(String(50)) country = Column(String(50)) location = Column(String(130)) subsector = Column(String(50)) datasource = Column(String) emissions_ets_2014 = Column(Float) emissions_eprtr_2014 = Column(Float) production = Column(Float) fuel_demand = Column(Float) excess_heat_100_200C = Column(Float) excess_heat_200_500C = Column(Float) excess_heat_500C = Column(Float) excess_heat_total = Column(Float) geom = Column(Geometry("POINT", 4326), index=True) wz = Column(Integer)
[docs] class SeenergiesIndustrialSites(Base): __tablename__ = "egon_seenergies_industrial_sites" __table_args__ = {"schema": "demand"} objectid = Column(Integer, primary_key=True) siteid = Column(Integer) companyname = Column(String(100)) address = Column(String(100)) country = Column(String(2)) eu28 = Column(String(3)) subsector = Column(String(30)) lat = Column(Float) lon = Column(Float) nuts1 = Column(String(3)) nuts3 = Column(String(5)) excess_heat = Column(String(3)) level_1_tj = Column(Float) level_2_tj = Column(Float) level_3_tj = Column(Float) level_1_r_tj = Column(Float) level_2_r_tj = Column(Float) level_3_r_tj = Column(Float) level_1_pj = Column(Float) level_2_pj = Column(Float) level_3_pj = Column(Float) level_1_r_pj = Column(Float) level_2_r_pj = Column(Float) level_3_r_pj = Column(Float) electricitydemand_tj = Column(Float) fueldemand_tj = Column(Float) globalid = Column(String(50)) geom = Column(Geometry("POINT", 4326), index=True) wz = Column(Integer)
[docs] class SchmidtIndustrialSites(Base): __tablename__ = "egon_schmidt_industrial_sites" __table_args__ = {"schema": "demand"} id = Column(Integer, primary_key=True) application = Column(String(50)) plant = Column(String(100)) landkreis_number = Column(String(5)) annual_tonnes = Column(Float) capacity_production = Column(String(10)) lat = Column(Float) lon = Column(Float) geom = Column(Geometry("POINT", 4326), index=True) wz = Column(Integer)
[docs] class IndustrialSites(Base): __tablename__ = "egon_industrial_sites" __table_args__ = {"schema": "demand"} id = Column( Integer, Sequence("industrial_sites_id_seq", schema="demand"), server_default=Sequence( "industrial_sites_id_seq", schema="demand" ).next_value(), primary_key=True, ) companyname = Column(String(100)) address = Column(String(170)) subsector = Column(String(100)) wz = Column(Integer) nuts3 = Column(String(10)) geom = Column(Geometry("POINT", 4326), index=True)
[docs] def create_tables(): """Create tables for industrial sites and distributed industrial demands Returns ------- None. """ # Create target schema db.execute_sql("CREATE SCHEMA IF NOT EXISTS demand;") db.execute_sql( f"""DROP TABLE IF EXISTS {MergeIndustrialSites.targets.tables['hotmaps']} CASCADE;""" ) db.execute_sql( f"""DROP TABLE IF EXISTS {MergeIndustrialSites.targets.tables['seenergies']} CASCADE;""" ) db.execute_sql( f"""DROP TABLE IF EXISTS {MergeIndustrialSites.targets.tables['schmidt']} CASCADE;""" ) db.execute_sql( f"""DROP TABLE IF EXISTS {MergeIndustrialSites.targets.tables['sites']} CASCADE;""" ) # Drop sequence db.execute_sql( f"""DROP SEQUENCE IF EXISTS {MergeIndustrialSites.targets.tables['sites']}_id_seq CASCADE;""" ) engine = db.engine() HotmapsIndustrialSites.__table__.create(bind=engine, checkfirst=True) SeenergiesIndustrialSites.__table__.create(bind=engine, checkfirst=True) SchmidtIndustrialSites.__table__.create(bind=engine, checkfirst=True) IndustrialSites.__table__.create(bind=engine, checkfirst=True)
[docs] def download_hotmaps(): download_directory = "industrial_sites" if not os.path.exists(download_directory): os.mkdir(download_directory) target_file = Path(MergeIndustrialSites.targets.files["hotmaps_download"]) url = MergeIndustrialSites.sources.urls["hotmaps"] if not os.path.isfile(target_file): subprocess.run(f"curl {url} > {target_file}", shell=True)
[docs] def download_seenergies(): """Download csv file on s-eenergies' industrial sites.""" download_directory = "industrial_sites" # Create the folder, if it does not exists already if not os.path.exists(download_directory): os.mkdir(download_directory) # Use the new class attributes for the target file and source URL target_file = Path( MergeIndustrialSites.targets.files["seenergies_download"] ) url = MergeIndustrialSites.sources.urls["seenergies"] if not os.path.isfile(target_file): urlretrieve(url, target_file)
[docs] def hotmaps_to_postgres(): """Import hotmaps data to postgres database""" input_file = Path(MergeIndustrialSites.targets.files["hotmaps_download"]) engine = db.engine() db.execute_sql( f"DELETE FROM {MergeIndustrialSites.targets.tables['hotmaps']}" ) # Read csv to dataframe df = pd.read_csv(input_file, delimiter=";") # Adjust column names df = df.rename( columns={ "SiteID": "siteid", "CompanyName": "companyname", "SiteName": "sitename", "Address": "address", "CityCode": "citycode", "City": "city", "Country": "country", "geom": "geom", "Subsector": "subsector", "DataSource": "datasource", "Emissions_ETS_2014": "emissions_ets_2014", "Emissions_EPRTR_2014": "emissions_eprtr_2014", "Production": "production", "Fuel_Demand": "fuel_demand", "Excess_Heat_100-200C": "excess_heat_100_200C", "Excess_Heat_200-500C": "excess_heat_200_500C", "Excess_Heat_500C": "excess_heat_500C", "Excess_Heat_Total": "excess_heat_total", } ) # Remove entries without geometry df = df[df.country == "Germany"] df = df[df.geom.notnull()] # From EWKT to WKT for i in df.index: df.loc[i, "geom"] = df.loc[i, "geom"].split(";")[1] # Create geometry with shapely geom = gpd.GeoSeries.from_wkt(df["geom"]) # Import as geodataframe gdf = gpd.GeoDataFrame( df, geometry=gpd.points_from_xy(geom.x, geom.y), crs="EPSG:4326" ) # Select boundaries boundaries = db.select_geodataframe( "SELECT * FROM boundaries.vg250_sta_union", geom_col="geometry", epsg=4326, ) # Choose only sites inside Germany or testmode boundaries gdf = gpd.sjoin(gdf, boundaries).drop( ["id", "bez", "area_ha", "index_right", "geom"], axis=1 ) # Rename geometry column gdf = gdf.rename(columns={"geometry": "geom"}).set_geometry("geom") # Remove duplicates on columns 'plant' and 'geom' gdf = gdf.drop_duplicates(subset=["subsector", "geom"]) # Add additional column for sector information (wz) gdf["wz"] = gdf["subsector"] # Map subsector information and WZ definition for hotmaps data wz_definition = pd.Series( { "Paper and printing": 1718, "Refineries": 19, "Cement": 23, "Glass": 23, "Iron and steel": 24, "Non-ferrous metals": 24, "Non-metallic mineral products": 23, "Chemical industry": 20, } ) # Map WZ ids and subsectors from hotmaps gdf["wz"] = gdf["wz"].map(wz_definition) # Write data to db gdf.to_postgis( MergeIndustrialSites.targets.get_table_name("hotmaps"), engine, schema=MergeIndustrialSites.targets.get_table_schema("hotmaps"), if_exists="append", index=df.index, )
[docs] def seenergies_to_postgres(): """Import seenergies data to postgres database""" # Get information from data configuration file input_file = Path( MergeIndustrialSites.targets.files["seenergies_download"] ) engine = db.engine() db.execute_sql( f"DELETE FROM {MergeIndustrialSites.targets.tables['seenergies']}" ) # Read csv to dataframe df = pd.read_csv(input_file, delimiter=",") df = df.drop(["X", "Y"], axis=1) # Adjust column names df = df.rename( columns={ "SiteName": "sitename", "OBJECTID": "objectid", "SiteId": "siteid", "CompanyName": "companyname", "StreetNameAndNumber": "address", "Country": "country", "EU28": "eu28", "Eurostat_Name": "subsector", "Latitude": "lat", "Longitude": "lon", "NUTS1ID": "nuts1", "NUTS3ID": "nuts3", "Excess_Heat": "excess_heat", "level_1_Tj": "level_1_tj", "level_2_Tj": "level_2_tj", "level_3_Tj": "level_3_tj", "level_1_r_Tj": "level_1_r_tj", "level_2_r_Tj": "level_2_r_tj", "level_3_r_Tj": "level_3_r_tj", "level_1_Pj": "level_1_pj", "level_2_Pj": "level_2_pj", "level_3_Pj": "level_3_pj", "level_1_r_Pj": "level_1_r_pj", "level_2_r_Pj": "level_2_r_pj", "level_3_r_Pj": "level_3_r_pj", "ElectricityDemand_TJ_a": "electricitydemand_tj", "FuelDemand_TJ_a": "fueldemand_tj", "GlobalID": "globalid", } ) gdf = gpd.GeoDataFrame( df, geometry=gpd.points_from_xy(df.lon, df.lat), crs="EPSG:4326" ) gdf = gdf.rename({"geometry": "geom"}, axis=1).set_geometry("geom") boundaries = db.select_geodataframe( "SELECT * FROM boundaries.vg250_sta_union", geom_col="geometry", epsg=4326, ) # Choose only sites inside Germany or testmode boundaries gdf = gpd.sjoin(gdf, boundaries).drop( ["id", "bez", "area_ha", "index_right"], axis=1 ) # Remove duplicates on columns 'plant' and 'geom' gdf = gdf.drop_duplicates(subset=["subsector", "geom"]) # Add additional column for sector information (wz) gdf["wz"] = gdf["subsector"] # Map subsector information and WZ definition for seenergies data wz_definition = pd.Series( { "Paper and printing": 1718, "Refineries": 19, "Cement": 23, "Glass": 23, "Iron and steel": 24, "Non-ferrous metals": 24, "Non-metallic minerals": 23, "Chemical industry": 20, } ) # Map WZ ids and subsectors from seenergies gdf["wz"] = gdf["wz"].map(wz_definition) # Write data to db gdf.to_postgis( MergeIndustrialSites.targets.get_table_name("seenergies"), engine, schema=MergeIndustrialSites.targets.get_table_schema("seenergies"), if_exists="append", index=df.index, )
[docs] def schmidt_to_postgres(): """Import data from Thesis by Danielle Schmidt to postgres database""" # Get information from data configuration file input_file = ( Path(".") / "data_bundle_egon_data" / MergeIndustrialSites.sources.files["schmidt"] ) engine = db.engine() db.execute_sql( f"DELETE FROM {MergeIndustrialSites.targets.tables['schmidt']}" ) # Read csv to dataframe df = pd.read_csv(input_file, delimiter=";") # Adjust column names df = df.rename( columns={ "Application": "application", "Plant": "plant", "Landkreis Number": "landkreis_number", "Annual Tonnes": "annual_tonnes", "Capacity or Production": "capacity_production", "Latitude": "lat", "Longitude": "lon", } ) gdf = gpd.GeoDataFrame( df, geometry=gpd.points_from_xy(df.lon, df.lat), crs="EPSG:4326" ) gdf = gdf.rename({"geometry": "geom"}, axis=1).set_geometry("geom") boundaries = db.select_geodataframe( "SELECT * FROM boundaries.vg250_sta_union", geom_col="geometry", epsg=4326, ) # Choose only sites inside Germany or testmode boundaries gdf = gpd.sjoin(gdf, boundaries).drop( ["id", "bez", "area_ha", "index_right"], axis=1 ) # Add additional column for sector information (wz) gdf["wz"] = gdf["application"] # Map subsector information and WZ definition for hotmaps data wz_definition = pd.Series( { "Mechanical Pulp": 1718, "Packing Paper and Board": 1718, "Cement Mill": 23, "Technical/Special Paper and Board": 1718, "Graphic Paper": 1718, "Hygiene Paper": 1718, "Recycled Paper": 1718, } ) # Map WZ ids and subsectors from hotmaps gdf["wz"] = gdf["wz"].map(wz_definition) # Write data to db gdf.to_postgis( MergeIndustrialSites.targets.get_table_name("schmidt"), engine, schema=MergeIndustrialSites.targets.get_table_schema("schmidt"), if_exists="append", index=df.index, )
[docs] def download_import_industrial_sites(): """ Wraps different functions to create tables, download csv files containing information on industrial sites in Germany and write this data to the local postgresql database Returns ------- None. """ create_tables() download_hotmaps() download_seenergies() hotmaps_to_postgres() seenergies_to_postgres() schmidt_to_postgres()
[docs] def merge_inputs(): """Merge and clean data from different sources (hotmaps, seenergies, Thesis Schmidt) """ # Insert data from Schmidt's Master thesis db.execute_sql( f"""INSERT INTO {MergeIndustrialSites.targets.tables['sites']} (companyname, subsector, wz, geom) SELECT h.plant, h.application, h.wz, h.geom FROM {MergeIndustrialSites.sources.tables['schmidt_processed']} h WHERE geom IS NOT NULL;""" ) # Insert data from s-EEnergies db.execute_sql( f"""INSERT INTO {MergeIndustrialSites.targets.tables['sites']} (companyname, address, subsector, wz, geom) SELECT s.companyname, s.address, s.subsector, s.wz, s.geom FROM {MergeIndustrialSites.sources.tables['seenergies_processed']} s WHERE s.country = 'DE' AND geom IS NOT NULL AND LOWER(SUBSTRING(s.companyname, 1, 3)) NOT IN (SELECT LOWER(SUBSTRING(h.companyname, 1, 3)) FROM {MergeIndustrialSites.targets.tables['sites']} h, {MergeIndustrialSites.sources.tables['seenergies_processed']} s WHERE ST_DWithin (h.geom, s.geom, 0.01) AND (h.wz = s.wz) AND (LOWER (SUBSTRING(h.companyname, 1, 3)) = LOWER (SUBSTRING(s.companyname, 1, 3))));""" ) # Insert data from Hotmaps db.execute_sql( f"""INSERT INTO {MergeIndustrialSites.targets.tables['sites']} (companyname, address, subsector, wz, geom) SELECT h.companyname, h.address, h.subsector, h.wz, h.geom FROM {MergeIndustrialSites.sources.tables['hotmaps_processed']} h WHERE h.country = 'Germany' AND h.geom IS NOT NULL AND h.siteid NOT IN (SELECT a.siteid FROM {MergeIndustrialSites.sources.tables['seenergies_processed']} a WHERE a.country = 'DE' AND a.geom IS NOT NULL) AND h.geom NOT IN (SELECT a.geom FROM {MergeIndustrialSites.sources.tables['seenergies_processed']} a WHERE a.country = 'DE' AND a.geom IS NOT NULL) AND LOWER(SUBSTRING(h.companyname, 1, 3)) NOT IN (SELECT LOWER(SUBSTRING(s.companyname, 1, 3)) FROM {MergeIndustrialSites.targets.tables['sites']} s, {MergeIndustrialSites.sources.tables['hotmaps_processed']} h WHERE ST_DWithin (s.geom, h.geom, 0.01) AND (h.wz = s.wz) AND (LOWER (SUBSTRING(h.companyname, 1, 3)) = LOWER (SUBSTRING(s.companyname, 1, 3))))""" ) db.execute_sql(f"""UPDATE {MergeIndustrialSites.targets.tables['sites']} s SET geom = g.geom FROM {MergeIndustrialSites.sources.tables['schmidt_processed']} g WHERE ST_DWithin (g.geom, s.geom, 0.01) AND (g.wz = s.wz) AND (LOWER (SUBSTRING(g.plant, 1, 3)) = LOWER (SUBSTRING(s.companyname, 1, 3)));""")
[docs] def map_nuts3(): """ Match resulting industrial sites with nuts3 codes and fill column 'nuts3' """ db.execute_sql(f"""UPDATE {MergeIndustrialSites.targets.tables['sites']} s SET nuts3 = krs.nuts FROM {MergeIndustrialSites.sources.tables['vg250_krs']} krs WHERE ST_WITHIN(s.geom, ST_TRANSFORM(krs.geometry,4326));""")
[docs] class MergeIndustrialSites(Dataset): sources = DatasetSources( urls={ "hotmaps": "https://gitlab.com/hotmaps/industrial_sites/industrial_sites_Industrial_Database/-/raw/388278c6df35889b1447a959fc3759e3d78bf659/data/Industrial_Database.csv?inline=false", "seenergies": "https://opendata.arcgis.com/datasets/5e36c0af918040ed936b4e4c101f611d_0.csv", }, files={ "schmidt": "industrial_sites/MA_Schmidt_Industriestandorte_georef.csv" }, tables={ # These tables are targets of earlier steps, but sources for the final merge "hotmaps_processed": "demand.egon_hotmaps_industrial_sites", "seenergies_processed": "demand.egon_seenergies_industrial_sites", "schmidt_processed": "demand.egon_schmidt_industrial_sites", "vg250_krs": "boundaries.vg250_krs", }, ) targets = DatasetTargets( files={ "hotmaps_download": "industrial_sites/data_Industrial_Database.csv", "seenergies_download": "industrial_sites/D5_1_Industry_Dataset_With_Demand_Data.csv", }, tables={ "hotmaps": "demand.egon_hotmaps_industrial_sites", "seenergies": "demand.egon_seenergies_industrial_sites", "schmidt": "demand.egon_schmidt_industrial_sites", "sites": "demand.egon_industrial_sites", }, ) def __init__(self, dependencies): super().__init__( name="Merge_industrial_sites", version="0.0.4", dependencies=dependencies, tasks=(download_import_industrial_sites, merge_inputs, map_nuts3), )