Source code for egon.data.datasets.zensus

"""The central module containing all code dealing with importing Zensus data.
"""

from pathlib import Path
from urllib.request import urlretrieve
import csv
import json
import os
import zipfile

from shapely.geometry import Point, shape
from shapely.prepared import prep
import pandas as pd

from egon.data import db, subprocess
from egon.data.config import settings
from egon.data.datasets import Dataset
import egon.data.config


[docs]class ZensusPopulation(Dataset): def __init__(self, dependencies): super().__init__( name="ZensusPopulation", version="0.0.1", dependencies=dependencies, tasks=( download_zensus_pop, create_zensus_pop_table, population_to_postgres, ), )
[docs]class ZensusMiscellaneous(Dataset): def __init__(self, dependencies): super().__init__( name="ZensusMiscellaneous", version="0.0.1", dependencies=dependencies, tasks=( download_zensus_misc, create_zensus_misc_tables, zensus_misc_to_postgres, ), )
[docs]def download_and_check(url, target_file, max_iteration=5): """Download file from url (http) if it doesn't exist and check afterwards. If bad zip remove file and re-download. Repeat until file is fine or reached maximum iterations.""" bad_file = True count = 0 while bad_file: # download file if it doesn't exist if not os.path.isfile(target_file): # check if url if url.lower().startswith("http"): urlretrieve(url, target_file) else: raise ValueError("No http url") # check zipfile try: with zipfile.ZipFile(target_file): print(f"Zip file {target_file} is good.") bad_file = False except zipfile.BadZipFile as ex: os.remove(target_file) count += 1 if count > max_iteration: raise StopIteration( f"Max iteration of {max_iteration} is exceeded" ) from ex
[docs]def download_zensus_pop(): """Download Zensus csv file on population per hectare grid cell.""" data_config = egon.data.config.datasets() zensus_population_config = data_config["zensus_population"][ "original_data" ] download_directory = Path(".") / "zensus_population" # Create the folder, if it does not exist already if not os.path.exists(download_directory): os.mkdir(download_directory) target_file = ( download_directory / zensus_population_config["target"]["file"] ) url = zensus_population_config["source"]["url"] download_and_check(url, target_file, max_iteration=5)
[docs]def download_zensus_misc(): """Download Zensus csv files on data per hectare grid cell.""" # Get data config data_config = egon.data.config.datasets() download_directory = Path(".") / "zensus_population" # Create the folder, if it does not exist already if not os.path.exists(download_directory): os.mkdir(download_directory) # Download remaining zensus data set on households, buildings, apartments zensus_config = data_config["zensus_misc"]["original_data"] zensus_misc_processed = data_config["zensus_misc"]["processed"] zensus_url = zensus_config["source"]["url"] zensus_files = zensus_misc_processed["file_table_map"].keys() url_path_map = list(zip(zensus_url, zensus_files)) for url, path in url_path_map: target_file_misc = download_directory / path download_and_check(url, target_file_misc, max_iteration=5)
[docs]def create_zensus_pop_table(): """Create tables for zensus data in postgres database""" # Get information from data configuration file data_config = egon.data.config.datasets() zensus_population_processed = data_config["zensus_population"]["processed"] # Create target schema db.execute_sql( f"CREATE SCHEMA IF NOT EXISTS {zensus_population_processed['schema']};" ) # Create table for population data population_table = ( f"{zensus_population_processed['schema']}" f".{zensus_population_processed['table']}" ) db.execute_sql(f"DROP TABLE IF EXISTS {population_table} CASCADE;") db.execute_sql( f"CREATE TABLE {population_table}" f""" (id SERIAL NOT NULL, grid_id character varying(254) NOT NULL, x_mp int, y_mp int, population smallint, geom_point geometry(Point,3035), geom geometry (Polygon, 3035), CONSTRAINT {zensus_population_processed['table']}_pkey PRIMARY KEY (id) ); """ )
[docs]def create_zensus_misc_tables(): """Create tables for zensus data in postgres database""" # Get information from data configuration file data_config = egon.data.config.datasets() zensus_misc_processed = data_config["zensus_misc"]["processed"] # Create target schema db.execute_sql( f"CREATE SCHEMA IF NOT EXISTS {zensus_misc_processed['schema']};" ) # Create tables for household, apartment and building for table in zensus_misc_processed["file_table_map"].values(): misc_table = f"{zensus_misc_processed['schema']}.{table}" db.execute_sql(f"DROP TABLE IF EXISTS {misc_table} CASCADE;") db.execute_sql( f"CREATE TABLE {misc_table}" f""" (id SERIAL, grid_id VARCHAR(50), grid_id_new VARCHAR (50), attribute VARCHAR(50), characteristics_code smallint, characteristics_text text, quantity smallint, quantity_q smallint, zensus_population_id int, CONSTRAINT {table}_pkey PRIMARY KEY (id) ); """ )
[docs]def target(source, dataset): """Generate the target path corresponding to a source path. Parameters ---------- dataset: str Toggles between production (`dataset='Everything'`) and test mode e.g. (`dataset='Schleswig-Holstein'`). In production mode, data covering entire Germany is used. In the test mode a subset of this data is used for testing the workflow. Returns ------- Path Path to target csv-file """ return Path( os.path.join(Path("."), "zensus_population", source.stem) + "." + dataset + source.suffix )
[docs]def select_geom(): """Select the union of the geometries of Schleswig-Holstein from the database, convert their projection to the one used in the CSV file, output the result to stdout as a GeoJSON string and read it into a prepared shape for filtering. """ docker_db_config = db.credentials() geojson = subprocess.run( ["ogr2ogr"] + ["-s_srs", "epsg:4326"] + ["-t_srs", "epsg:3035"] + ["-f", "GeoJSON"] + ["/vsistdout/"] + [ f"PG:host={docker_db_config['HOST']}" f" user='{docker_db_config['POSTGRES_USER']}'" f" password='{docker_db_config['POSTGRES_PASSWORD']}'" f" port={docker_db_config['PORT']}" f" dbname='{docker_db_config['POSTGRES_DB']}'" ] + ["-sql", "SELECT ST_Union(geometry) FROM boundaries.vg250_lan"], text=True, ) features = json.loads(geojson.stdout)["features"] assert ( len(features) == 1 ), f"Found {len(features)} geometry features, expected exactly one." return prep(shape(features[0]["geometry"]))
[docs]def filter_zensus_population(filename, dataset): """This block filters lines in the source CSV file and copies the appropriate ones to the destination based on geometry. Parameters ---------- filename : str Path to input csv-file dataset: str, optional Toggles between production (`dataset='Everything'`) and test mode e.g. (`dataset='Schleswig-Holstein'`). In production mode, data covering entire Germany is used. In the test mode a subset of this data is used for testing the workflow. Returns ------- str Path to output csv-file """ csv_file = Path(filename).resolve(strict=True) schleswig_holstein = select_geom() if not os.path.isfile(target(csv_file, dataset)): with open(csv_file, mode="r", newline="") as input_lines: rows = csv.DictReader(input_lines, delimiter=";") gitter_ids = set() with open( target(csv_file, dataset), mode="w", newline="" ) as destination: output = csv.DictWriter( destination, delimiter=";", fieldnames=rows.fieldnames ) output.writeheader() output.writerows( gitter_ids.add(row["Gitter_ID_100m"]) or row for row in rows if schleswig_holstein.intersects( Point(float(row["x_mp_100m"]), float(row["y_mp_100m"])) ) ) return target(csv_file, dataset)
[docs]def filter_zensus_misc(filename, dataset): """This block filters lines in the source CSV file and copies the appropriate ones to the destination based on grid_id values. Parameters ---------- filename : str Path to input csv-file dataset: str, optional Toggles between production (`dataset='Everything'`) and test mode e.g. (`dataset='Schleswig-Holstein'`). In production mode, data covering entire Germany is used. In the test mode a subset of this data is used for testing the workflow. Returns ------- str Path to output csv-file """ csv_file = Path(filename).resolve(strict=True) gitter_ids = set( pd.read_sql( "SELECT grid_id from society.destatis_zensus_population_per_ha", con=db.engine(), ).grid_id.values ) if not os.path.isfile(target(csv_file, dataset)): with open( csv_file, mode="r", newline="", encoding="iso-8859-1" ) as inputs: rows = csv.DictReader(inputs, delimiter=",") with open( target(csv_file, dataset), mode="w", newline="", encoding="iso-8859-1", ) as destination: output = csv.DictWriter( destination, delimiter=",", fieldnames=rows.fieldnames ) output.writeheader() output.writerows( row for row in rows if row["Gitter_ID_100m"] in gitter_ids ) return target(csv_file, dataset)
[docs]def population_to_postgres(): """Import Zensus population data to postgres database""" # Get information from data configuration file data_config = egon.data.config.datasets() zensus_population_orig = data_config["zensus_population"]["original_data"] zensus_population_processed = data_config["zensus_population"]["processed"] input_file = ( Path(".") / "zensus_population" / zensus_population_orig["target"]["file"] ) dataset = settings()["egon-data"]["--dataset-boundary"] # Read database configuration from docker-compose.yml docker_db_config = db.credentials() population_table = ( f"{zensus_population_processed['schema']}" f".{zensus_population_processed['table']}" ) with zipfile.ZipFile(input_file) as zf: for filename in zf.namelist(): zf.extract(filename) if dataset == "Everything": filename_insert = filename else: filename_insert = filter_zensus_population(filename, dataset) host = ["-h", f"{docker_db_config['HOST']}"] port = ["-p", f"{docker_db_config['PORT']}"] pgdb = ["-d", f"{docker_db_config['POSTGRES_DB']}"] user = ["-U", f"{docker_db_config['POSTGRES_USER']}"] command = [ "-c", rf"\copy {population_table} (grid_id, x_mp, y_mp, population)" rf" FROM '{filename_insert}' DELIMITER ';' CSV HEADER;", ] subprocess.run( ["psql"] + host + port + pgdb + user + command, env={"PGPASSWORD": docker_db_config["POSTGRES_PASSWORD"]}, ) os.remove(filename) db.execute_sql( f"UPDATE {population_table} zs" " SET geom_point=ST_SetSRID(ST_MakePoint(zs.x_mp, zs.y_mp), 3035);" ) db.execute_sql( f"UPDATE {population_table} zs" """ SET geom=ST_SetSRID( (ST_MakeEnvelope(zs.x_mp-50,zs.y_mp-50,zs.x_mp+50,zs.y_mp+50)), 3035 ); """ ) db.execute_sql( f"CREATE INDEX {zensus_population_processed['table']}_geom_idx ON" f" {population_table} USING gist (geom);" ) db.execute_sql( f"CREATE INDEX" f" {zensus_population_processed['table']}_geom_point_idx" f" ON {population_table} USING gist (geom_point);" )
[docs]def zensus_misc_to_postgres(): """Import data on buildings, households and apartments to postgres db""" # Get information from data configuration file data_config = egon.data.config.datasets() zensus_misc_processed = data_config["zensus_misc"]["processed"] zensus_population_processed = data_config["zensus_population"]["processed"] file_path = Path(".") / "zensus_population" dataset = settings()["egon-data"]["--dataset-boundary"] population_table = ( f"{zensus_population_processed['schema']}" f".{zensus_population_processed['table']}" ) # Read database configuration from docker-compose.yml docker_db_config = db.credentials() for input_file, table in zensus_misc_processed["file_table_map"].items(): with zipfile.ZipFile(file_path / input_file) as zf: csvfiles = [n for n in zf.namelist() if n.lower()[-3:] == "csv"] for filename in csvfiles: zf.extract(filename) if dataset == "Everything": filename_insert = filename else: filename_insert = filter_zensus_misc(filename, dataset) host = ["-h", f"{docker_db_config['HOST']}"] port = ["-p", f"{docker_db_config['PORT']}"] pgdb = ["-d", f"{docker_db_config['POSTGRES_DB']}"] user = ["-U", f"{docker_db_config['POSTGRES_USER']}"] command = [ "-c", rf"\copy {zensus_population_processed['schema']}.{table}" f"""(grid_id, grid_id_new, attribute, characteristics_code, characteristics_text, quantity, quantity_q) FROM '{filename_insert}' DELIMITER ',' CSV HEADER ENCODING 'iso-8859-1';""", ] subprocess.run( ["psql"] + host + port + pgdb + user + command, env={"PGPASSWORD": docker_db_config["POSTGRES_PASSWORD"]}, ) os.remove(filename) db.execute_sql( f"""UPDATE {zensus_population_processed['schema']}.{table} as b SET zensus_population_id = zs.id FROM {population_table} zs WHERE b.grid_id = zs.grid_id;""" ) db.execute_sql( f"""ALTER TABLE {zensus_population_processed['schema']}.{table} ADD CONSTRAINT {table}_fkey FOREIGN KEY (zensus_population_id) REFERENCES {population_table}(id);""" ) # Create combined table create_combined_zensus_table() # Delete entries for unpopulated cells adjust_zensus_misc()
[docs]def create_combined_zensus_table(): """Create combined table with buildings, apartments and population per cell Only apartment and building data with acceptable data quality (quantity_q<2) is used, all other data is dropped. For more details on data quality see Zensus docs: https://www.zensus2011.de/DE/Home/Aktuelles/DemografischeGrunddaten.html If there's no data on buildings or apartments for a certain cell, the value for building_count resp. apartment_count contains NULL. """ sql_script = os.path.join( os.path.dirname(__file__), "create_combined_zensus_table.sql" ) db.execute_sql_script(sql_script)
[docs]def adjust_zensus_misc(): """Delete unpopulated cells in zensus-households, -buildings and -apartments Some unpopulated zensus cells are listed in: - egon_destatis_zensus_household_per_ha - egon_destatis_zensus_building_per_ha - egon_destatis_zensus_apartment_per_ha This can be caused by missing population information due to privacy or other special cases (e.g. holiday homes are listed as buildings but are not permanently populated.) In the following tasks of egon-data, only data of populated cells is used. Returns ------- None. """ # Get information from data configuration file data_config = egon.data.config.datasets() zensus_population_processed = data_config["zensus_population"]["processed"] zensus_misc_processed = data_config["zensus_misc"]["processed"] population_table = ( f"{zensus_population_processed['schema']}" f".{zensus_population_processed['table']}" ) for input_file, table in zensus_misc_processed["file_table_map"].items(): db.execute_sql( f""" DELETE FROM {zensus_population_processed['schema']}.{table} as b WHERE b.zensus_population_id IN ( SELECT id FROM {population_table} WHERE population < 0);""" )