"""The central module containing all code dealing with processing and
forecast Zensus data.
"""
from sqlalchemy import Column, Float, Integer
from sqlalchemy.ext.declarative import declarative_base
import numpy as np
import pandas as pd
from egon.data import db
from egon.data.datasets import Dataset
import egon.data.config
# will be later imported from another file ###
Base = declarative_base()
[docs]class SocietyPrognosis(Dataset):
def __init__(self, dependencies):
super().__init__(
name="SocietyPrognosis",
version="0.0.1",
dependencies=dependencies,
tasks=(create_tables, {zensus_population, zensus_household}),
)
[docs]class EgonPopulationPrognosis(Base):
__tablename__ = "egon_population_prognosis"
__table_args__ = {"schema": "society"}
zensus_population_id = Column(Integer, primary_key=True)
year = Column(Integer, primary_key=True)
population = Column(Float)
[docs]class EgonHouseholdPrognosis(Base):
__tablename__ = "egon_household_prognosis"
__table_args__ = {"schema": "society"}
zensus_population_id = Column(Integer, primary_key=True)
year = Column(Integer, primary_key=True)
households = Column(Float)
[docs]def create_tables():
"""Create table to map zensus grid and administrative districts (nuts3)"""
engine = db.engine()
db.execute_sql("CREATE SCHEMA IF NOT EXISTS society;")
EgonPopulationPrognosis.__table__.create(bind=engine, checkfirst=True)
EgonHouseholdPrognosis.__table__.create(bind=engine, checkfirst=True)
[docs]def zensus_population():
"""Bring population prognosis from DemandRegio to Zensus grid"""
cfg = egon.data.config.datasets()["society_prognosis"]
local_engine = db.engine()
# Input: Zensus2011 population data including the NUTS3-Code
zensus_district = db.select_dataframe(
f"""SELECT zensus_population_id, vg250_nuts3
FROM {cfg['soucres']['map_zensus_vg250']['schema']}.
{cfg['soucres']['map_zensus_vg250']['table']}
WHERE zensus_population_id IN (
SELECT id
FROM {cfg['soucres']['zensus_population']['schema']}.
{cfg['soucres']['zensus_population']['table']})""",
index_col="zensus_population_id",
)
zensus = db.select_dataframe(
f"""SELECT id, population
FROM {cfg['soucres']['zensus_population']['schema']}.
{cfg['soucres']['zensus_population']['table']}
WHERE population > 0""",
index_col="id",
)
zensus["nuts3"] = zensus_district.vg250_nuts3
# Rename index
zensus.index = zensus.index.rename("zensus_population_id")
# Replace population value of uninhabited cells for calculation
zensus.population = zensus.population.replace(-1, 0)
# Calculate share of population per cell in nuts3-region
zensus["share"] = (
zensus.groupby(zensus.nuts3)
.population.apply(lambda grp: grp / grp.sum())
.fillna(0)
).values
db.execute_sql(
f"""DELETE FROM {cfg['target']['population_prognosis']['schema']}.
{cfg['target']['population_prognosis']['table']}"""
)
# Scale to pogosis values from demandregio
for year in [2035, 2050]:
# Input: dataset on population prognosis on district-level (NUTS3)
prognosis = db.select_dataframe(
f"""SELECT nuts3, population
FROM {cfg['soucres']['demandregio_population']['schema']}.
{cfg['soucres']['demandregio_population']['table']}
WHERE year={year}""",
index_col="nuts3",
)
df = pd.DataFrame(
zensus["share"]
.mul(prognosis.population[zensus["nuts3"]].values)
.replace(0, -1)
).rename({"share": "population"}, axis=1)
df["year"] = year
# Insert to database
df.to_sql(
cfg["target"]["population_prognosis"]["table"],
schema=cfg["target"]["population_prognosis"]["schema"],
con=local_engine,
if_exists="append",
)
[docs]def household_prognosis_per_year(prognosis_nuts3, zensus, year):
"""Calculate household prognosis for a specitic year"""
prognosis_total = prognosis_nuts3.groupby(
prognosis_nuts3.index
).households.sum()
prognosis = pd.DataFrame(index=zensus.index)
prognosis["nuts3"] = zensus.nuts3
prognosis["quantity"] = zensus["share"].mul(
prognosis_total[zensus["nuts3"]].values
)
prognosis["rounded"] = prognosis["quantity"].astype(int)
prognosis["rest"] = prognosis["quantity"] - prognosis["rounded"]
# Set seed for reproducibility
np.random.seed(
seed=egon.data.config.settings()["egon-data"]["--random-seed"]
)
# Rounding process to meet exact values from demandregio on nuts3-level
for name, group in prognosis.groupby(prognosis.nuts3):
print(f"start progosis nuts3 {name}")
while prognosis_total[name] > group["rounded"].sum():
index = np.random.choice(
group["rest"].index.values[group["rest"] == max(group["rest"])]
)
group.at[index, "rounded"] += 1
group.at[index, "rest"] = 0
print(f"finished progosis nuts3 {name}")
prognosis[prognosis.index.isin(group.index)] = group
prognosis = prognosis.drop(["nuts3", "quantity", "rest"], axis=1).rename(
{"rounded": "households"}, axis=1
)
prognosis["year"] = year
return prognosis
[docs]def zensus_household():
"""Bring household prognosis from DemandRegio to Zensus grid"""
cfg = egon.data.config.datasets()["society_prognosis"]
local_engine = db.engine()
# Input: Zensus2011 household data including the NUTS3-Code
district = db.select_dataframe(
f"""SELECT zensus_population_id, vg250_nuts3
FROM {cfg['soucres']['map_zensus_vg250']['schema']}.
{cfg['soucres']['map_zensus_vg250']['table']}""",
index_col="zensus_population_id",
)
zensus = db.select_dataframe(
f"""SELECT zensus_population_id, quantity
FROM {cfg['soucres']['zensus_households']['schema']}.
{cfg['soucres']['zensus_households']['table']}""",
index_col="zensus_population_id",
)
# Group all household types
zensus = zensus.groupby(zensus.index).sum()
zensus["nuts3"] = district.loc[zensus.index, "vg250_nuts3"]
# Calculate share of households per nuts3 region in each zensus cell
zensus["share"] = (
zensus.groupby(zensus.nuts3)
.quantity.apply(lambda grp: grp / grp.sum())
.fillna(0)
.values
)
db.execute_sql(
f"""DELETE FROM {cfg['target']['household_prognosis']['schema']}.
{cfg['target']['household_prognosis']['table']}"""
)
# Apply prognosis function
for year in [2035, 2050]:
print(f"start prognosis for year {year}")
# Input: dataset on household prognosis on district-level (NUTS3)
prognosis_nuts3 = db.select_dataframe(
f"""SELECT nuts3, hh_size, households
FROM {cfg['soucres']['demandregio_households']['schema']}.
{cfg['soucres']['demandregio_households']['table']}
WHERE year={year}""",
index_col="nuts3",
)
# Insert into database
household_prognosis_per_year(prognosis_nuts3, zensus, year).to_sql(
cfg["target"]["household_prognosis"]["table"],
schema=cfg["target"]["household_prognosis"]["schema"],
con=local_engine,
if_exists="append",
)
print(f"finished prognosis for year {year}")