"""The central module containing all code dealing with importing Zensus data."""
from pathlib import Path
import csv
import json
import os
import zipfile
from shapely.geometry import Point, shape
from shapely.prepared import prep
import pandas as pd
import requests
from egon.data import db, subprocess
from egon.data.config import settings
from egon.data.datasets import Dataset, DatasetSources, DatasetTargets
[docs]
class ZensusPopulation(Dataset):
sources = DatasetSources(
files={
"zensus_population": "data_bundle_egon_data/zensus_population/csv_Bevoelkerung_100m_Gitter.zip"
},
tables={
"boundaries_vg250_lan": "boundaries.vg250_lan",
},
)
targets = DatasetTargets(
tables={
"zensus_population": "society.destatis_zensus_population_per_ha"
},
)
def __init__(self, dependencies):
super().__init__(
name="ZensusPopulation",
version="0.0.4",
dependencies=dependencies,
tasks=(
create_zensus_pop_table,
population_to_postgres,
),
)
[docs]
class ZensusMiscellaneous(Dataset):
sources = DatasetSources(
urls={
"zensus_households": (
"https://www.zensus2011.de/SharedDocs/Downloads/DE/"
"Pressemitteilung/DemografischeGrunddaten/"
"csv_Haushalte_100m_Gitter.zip?__blob=publicationFile&v=2"
),
"zensus_buildings": (
"https://www.zensus2011.de/SharedDocs/Downloads/DE/"
"Pressemitteilung/DemografischeGrunddaten/"
"csv_Gebaeude_100m_Gitter.zip?__blob=publicationFile&v=2"
),
"zensus_apartments": (
"https://www.zensus2011.de/SharedDocs/Downloads/DE/"
"Pressemitteilung/DemografischeGrunddaten/"
"csv_Wohnungen_100m_Gitter.zip?__blob=publicationFile&v=5"
),
}
)
targets = DatasetTargets(
files={
"zensus_households": "data_bundle_egon_data/zensus_population/csv_Haushalte_100m_Gitter.zip",
"zensus_buildings": "data_bundle_egon_data/zensus_population/csv_Gebaeude_100m_Gitter.zip",
"zensus_apartments": "data_bundle_egon_data/zensus_population/csv_Wohnungen_100m_Gitter.zip",
},
tables={
"zensus_households": "society.egon_destatis_zensus_household_per_ha",
"zensus_buildings": "society.egon_destatis_zensus_building_per_ha",
"zensus_apartments": "society.egon_destatis_zensus_apartment_per_ha",
},
)
def __init__(self, dependencies):
super().__init__(
name="ZensusMiscellaneous",
version="0.0.2",
dependencies=dependencies,
tasks=(
create_zensus_misc_tables,
zensus_misc_to_postgres,
),
)
[docs]
def create_zensus_pop_table():
"""Create tables for zensus data in postgres database"""
# Create table for population data
population_table = ZensusPopulation.targets.tables["zensus_population"]
db.execute_sql(f"""
CREATE SCHEMA IF NOT EXISTS
{ZensusPopulation.targets.get_table_schema("zensus_population")};
""")
db.execute_sql(f"DROP TABLE IF EXISTS {population_table} CASCADE;")
db.execute_sql(
f"CREATE TABLE {population_table}" f""" (id SERIAL NOT NULL,
grid_id character varying(254) NOT NULL,
x_mp int,
y_mp int,
population smallint,
geom_point geometry(Point,3035),
geom geometry (Polygon, 3035),
CONSTRAINT {population_table.split('.')[1]}_pkey
PRIMARY KEY (id)
);
"""
)
[docs]
def create_zensus_misc_tables():
"""Create tables for zensus data in postgres database"""
# Create tables for household, apartment and building
for table in ZensusMiscellaneous.targets.tables:
table_name = ZensusMiscellaneous.targets.tables[table]
# Create target schema
db.execute_sql(
f"CREATE SCHEMA IF NOT EXISTS {table_name.split('.')[0]};"
)
db.execute_sql(f"DROP TABLE IF EXISTS {table_name} CASCADE;")
db.execute_sql(
f"CREATE TABLE {table_name}" f""" (id SERIAL,
grid_id VARCHAR(50),
grid_id_new VARCHAR (50),
attribute VARCHAR(50),
characteristics_code smallint,
characteristics_text text,
quantity smallint,
quantity_q smallint,
zensus_population_id int,
CONSTRAINT {table_name.split('.')[1]}_pkey PRIMARY KEY (id)
);
"""
)
[docs]
def select_geom():
"""Select the union of the geometries of Schleswig-Holstein from the
database, convert their projection to the one used in the CSV file,
output the result to stdout as a GeoJSON string and read it into a
prepared shape for filtering.
"""
docker_db_config = db.credentials()
geojson = subprocess.run(
["ogr2ogr"]
+ ["-s_srs", "epsg:4326"]
+ ["-t_srs", "epsg:3035"]
+ ["-f", "GeoJSON"]
+ ["/vsistdout/"]
+ [
f"PG:host={docker_db_config['HOST']}"
f" user='{docker_db_config['POSTGRES_USER']}'"
f" password='{docker_db_config['POSTGRES_PASSWORD']}'"
f" port={docker_db_config['PORT']}"
f" dbname='{docker_db_config['POSTGRES_DB']}'"
]
+ [
"-sql",
f"SELECT ST_Union(geometry) FROM {ZensusPopulation.sources.tables['boundaries_vg250_lan']}",
],
text=True,
)
features = json.loads(geojson.stdout)["features"]
assert (
len(features) == 1
), f"Found {len(features)} geometry features, expected exactly one."
return prep(shape(features[0]["geometry"]))
[docs]
def filter_zensus_population(filename, dataset):
"""This block filters lines in the source CSV file and copies
the appropriate ones to the destination based on geometry.
Parameters
----------
filename : str
Path to input csv-file
dataset: str, optional
Toggles between production (`dataset='Everything'`) and test mode e.g.
(`dataset='Schleswig-Holstein'`).
In production mode, data covering entire Germany
is used. In the test mode a subset of this data is used for testing the
workflow.
Returns
-------
str
Path to output csv-file
"""
csv_file = Path(filename).resolve(strict=True)
schleswig_holstein = select_geom()
# compute the filtered file path inline
filtered_target = (
csv_file.parent / f"{csv_file.stem}.{dataset}{csv_file.suffix}"
)
filtered_target.parent.mkdir(parents=True, exist_ok=True)
if not filtered_target.exists():
with open(csv_file, mode="r", newline="") as input_lines:
rows = csv.DictReader(input_lines, delimiter=";")
gitter_ids = set()
with open(filtered_target, mode="w", newline="") as destination:
output = csv.DictWriter(
destination, delimiter=";", fieldnames=rows.fieldnames
)
output.writeheader()
output.writerows(
gitter_ids.add(row["Gitter_ID_100m"]) or row
for row in rows
if schleswig_holstein.intersects(
Point(float(row["x_mp_100m"]), float(row["y_mp_100m"]))
)
)
return filtered_target
[docs]
def filter_zensus_misc(filename, dataset):
"""This block filters lines in the source CSV file and copies
the appropriate ones to the destination based on grid_id values.
Parameters
----------
filename : str
Path to input csv-file
dataset: str, optional
Toggles between production (`dataset='Everything'`) and test mode e.g.
(`dataset='Schleswig-Holstein'`).
In production mode, data covering entire Germany
is used. In the test mode a subset of this data is used for testing the
workflow.
Returns
-------
str
Path to output csv-file
"""
csv_file = Path(filename).resolve(strict=True)
gitter_ids = set(
pd.read_sql(
f"SELECT grid_id FROM {ZensusPopulation.targets.tables['zensus_population']}",
con=db.engine(),
).grid_id.values
)
# inline target path (no helper)
filtered_target = (
csv_file.parent / f"{csv_file.stem}.{dataset}{csv_file.suffix}"
)
filtered_target.parent.mkdir(parents=True, exist_ok=True)
if not filtered_target.exists():
with open(
csv_file, mode="r", newline="", encoding="iso-8859-1"
) as inputs:
rows = csv.DictReader(inputs, delimiter=",")
with open(
filtered_target,
mode="w",
newline="",
encoding="iso-8859-1",
) as destination:
output = csv.DictWriter(
destination, delimiter=",", fieldnames=rows.fieldnames
)
output.writeheader()
output.writerows(
row for row in rows if row["Gitter_ID_100m"] in gitter_ids
)
return filtered_target
[docs]
def population_to_postgres():
"""Import Zensus population data to postgres database"""
input_file = Path(
ZensusPopulation.sources.files["zensus_population"]
).resolve()
dataset = settings()["egon-data"]["--dataset-boundary"]
docker_db_config = db.credentials()
population_table = ZensusPopulation.targets.tables["zensus_population"]
with zipfile.ZipFile(input_file) as zf:
for filename in zf.namelist():
if not filename.lower().endswith(".csv"):
continue
zf.extract(filename)
if dataset == "Everything":
filename_insert = filename
else:
filename_insert = filter_zensus_population(filename, dataset)
host = ["-h", f"{docker_db_config['HOST']}"]
port = ["-p", f"{docker_db_config['PORT']}"]
pgdb = ["-d", f"{docker_db_config['POSTGRES_DB']}"]
user = ["-U", f"{docker_db_config['POSTGRES_USER']}"]
command = [
"-c",
rf"\copy {population_table} (grid_id, x_mp, y_mp, population) "
rf"FROM '{filename_insert}' DELIMITER ';' CSV HEADER;",
]
subprocess.run(
["psql"] + host + port + pgdb + user + command,
env={"PGPASSWORD": docker_db_config["POSTGRES_PASSWORD"]},
)
os.remove(filename)
db.execute_sql(
f"UPDATE {population_table} zs"
" SET geom_point=ST_SetSRID(ST_MakePoint(zs.x_mp, zs.y_mp), 3035);"
)
db.execute_sql(f"UPDATE {population_table} zs" """ SET geom=ST_SetSRID(
(ST_MakeEnvelope(zs.x_mp-50,zs.y_mp-50,zs.x_mp+50,zs.y_mp+50)),
3035
);
""")
db.execute_sql(
f"CREATE INDEX {population_table.split('.')[1]}_geom_idx ON"
f" {population_table} USING gist (geom);"
)
db.execute_sql(
f"CREATE INDEX"
f" {population_table.split('.')[1]}_geom_point_idx"
f" ON {population_table} USING gist (geom_point);"
)
[docs]
def zensus_misc_to_postgres():
"""Import data on buildings, households and apartments to postgres db"""
dataset = settings()["egon-data"]["--dataset-boundary"]
docker_db_config = db.credentials()
for key, file_path in ZensusMiscellaneous.targets.files.items():
zip_path = Path(file_path).resolve()
with zipfile.ZipFile(zip_path) as zf:
csvfiles = [n for n in zf.namelist() if n.lower().endswith(".csv")]
for filename in csvfiles:
zf.extract(filename)
if dataset == "Everything":
filename_insert = filename
else:
filename_insert = filter_zensus_misc(filename, dataset)
host = ["-h", f"{docker_db_config['HOST']}"]
port = ["-p", f"{docker_db_config['PORT']}"]
pgdb = ["-d", f"{docker_db_config['POSTGRES_DB']}"]
user = ["-U", f"{docker_db_config['POSTGRES_USER']}"]
command = [
"-c",
rf"\copy {ZensusMiscellaneous.targets.tables[key]}"
f"""(grid_id,
grid_id_new,
attribute,
characteristics_code,
characteristics_text,
quantity,
quantity_q)
FROM '{filename_insert}' DELIMITER ','
CSV HEADER
ENCODING 'iso-8859-1';""",
]
subprocess.run(
["psql"] + host + port + pgdb + user + command,
env={"PGPASSWORD": docker_db_config["POSTGRES_PASSWORD"]},
)
os.remove(filename)
db.execute_sql(
f"""UPDATE {ZensusMiscellaneous.targets.tables[key]} as b
SET zensus_population_id = zs.id
FROM {ZensusPopulation.targets.tables["zensus_population"]} zs
WHERE b.grid_id = zs.grid_id;"""
)
db.execute_sql(
f"""ALTER TABLE {ZensusMiscellaneous.targets.tables[key]}
ADD CONSTRAINT
{ZensusMiscellaneous.targets.get_table_name(key)}_fkey
FOREIGN KEY (zensus_population_id)
REFERENCES {ZensusPopulation.targets.tables["zensus_population"]}(id);"""
)
# combined table
create_combined_zensus_table()
adjust_zensus_misc()
[docs]
def create_combined_zensus_table():
"""Create combined table with buildings, apartments and population per cell
Only apartment and building data with acceptable data quality
(quantity_q<2) is used, all other data is dropped. For more details on data
quality see Zensus docs:
https://www.zensus2011.de/DE/Home/Aktuelles/DemografischeGrunddaten.html
If there's no data on buildings or apartments for a certain cell, the value
for building_count resp. apartment_count contains NULL.
"""
sql_script = os.path.join(
os.path.dirname(__file__), "create_combined_zensus_table.sql"
)
db.execute_sql_script(sql_script)
[docs]
def adjust_zensus_misc():
"""Delete unpopulated cells in zensus-households, -buildings and -apartments
Some unpopulated zensus cells are listed in:
- egon_destatis_zensus_household_per_ha
- egon_destatis_zensus_building_per_ha
- egon_destatis_zensus_apartment_per_ha
This can be caused by missing population
information due to privacy or other special cases (e.g. holiday homes
are listed as buildings but are not permanently populated.)
In the following tasks of egon-data, only data of populated cells is used.
Returns
-------
None.
"""
for table in ZensusMiscellaneous.targets.tables:
db.execute_sql(f"""
DELETE FROM {ZensusMiscellaneous.targets.tables[table]} as b
WHERE b.zensus_population_id IN (
SELECT id FROM {
ZensusPopulation.targets.tables["zensus_population"]}
WHERE population < 0);""")