Source code for egon.data.datasets.society_prognosis

"""The central module containing all code dealing with processing and
forecast Zensus data.
"""

from sqlalchemy import Column, Float, Integer
from sqlalchemy.ext.declarative import declarative_base
import numpy as np
import pandas as pd

from egon.data import db
from egon.data.datasets import Dataset, DatasetSources, DatasetTargets
import egon.data.config

# will be later imported from another file ###
Base = declarative_base()


# ############################################################
[docs] class SocietyPrognosis(Dataset): name: str = "SocietyPrognosis" version: str = "0.0.5" sources = DatasetSources( tables={ "map_zensus_vg250": "boundaries.egon_map_zensus_vg250", "zensus_population": "society.destatis_zensus_population_per_ha", "zensus_households": "society.egon_destatis_zensus_household_per_ha", "demandregio_population": "society.egon_demandregio_population", "demandregio_households": "society.egon_demandregio_household", } ) targets = DatasetTargets( tables={ "population_prognosis": "society.egon_population_prognosis", "household_prognosis": "society.egon_household_prognosis", } ) def __init__(self, dependencies): super().__init__( name=self.name, version=self.version, dependencies=dependencies, tasks=(create_tables, {zensus_population, zensus_household}), )
[docs] class EgonPopulationPrognosis(Base): __tablename__ = "egon_population_prognosis" __table_args__ = {"schema": "society"} zensus_population_id = Column(Integer, primary_key=True) year = Column(Integer, primary_key=True) population = Column(Float)
[docs] class EgonHouseholdPrognosis(Base): __tablename__ = "egon_household_prognosis" __table_args__ = {"schema": "society"} zensus_population_id = Column(Integer, primary_key=True) year = Column(Integer, primary_key=True) households = Column(Float)
[docs] def create_tables(): """Create table to map zensus grid and administrative districts (nuts3)""" engine = db.engine() db.execute_sql( f"CREATE SCHEMA IF NOT EXISTS {SocietyPrognosis.targets.get_table_schema('population_prognosis')};" ) EgonPopulationPrognosis.__table__.create(bind=engine, checkfirst=True) EgonHouseholdPrognosis.__table__.create(bind=engine, checkfirst=True)
[docs] def zensus_population(): """Bring population prognosis from DemandRegio to Zensus grid""" local_engine = db.engine() # Input: Zensus2011 population data including the NUTS3-Code zensus_district = db.select_dataframe( f"""SELECT zensus_population_id, vg250_nuts3 FROM {SocietyPrognosis.sources.tables['map_zensus_vg250']} WHERE zensus_population_id IN ( SELECT id FROM {SocietyPrognosis.sources.tables['zensus_population']})""", index_col="zensus_population_id", ) zensus = db.select_dataframe( f"""SELECT id, population FROM {SocietyPrognosis.sources.tables['zensus_population']} WHERE population > 0""", index_col="id", ) zensus["nuts3"] = zensus_district.vg250_nuts3 # Rename index zensus.index = zensus.index.rename("zensus_population_id") # Replace population value of uninhabited cells for calculation zensus.population = zensus.population.replace(-1, 0) # Calculate share of population per cell in nuts3-region zensus["share"] = ( zensus.groupby(zensus.nuts3) .population.apply(lambda grp: grp / grp.sum()) .fillna(0) ).values targets = SocietyPrognosis.targets db.execute_sql(f"DELETE FROM {targets.tables['population_prognosis']}") # Scale to pogosis values from demandregio for year in [2035, 2050]: # Input: dataset on population prognosis on district-level (NUTS3) prognosis = db.select_dataframe( f"""SELECT nuts3, population FROM {SocietyPrognosis.sources.tables['demandregio_population']} WHERE year={year}""", index_col="nuts3", ) df = pd.DataFrame( zensus["share"] .mul(prognosis.population[zensus["nuts3"]].values) .replace(0, -1) ).rename({"share": "population"}, axis=1) df["year"] = year # Insert to database df.to_sql( targets.get_table_name("population_prognosis"), schema=targets.get_table_schema("population_prognosis"), con=local_engine, if_exists="append", )
[docs] def household_prognosis_per_year(prognosis_nuts3, zensus, year): """Calculate household prognosis for a specitic year""" prognosis_total = prognosis_nuts3.groupby( prognosis_nuts3.index ).households.sum() prognosis = pd.DataFrame(index=zensus.index) prognosis["nuts3"] = zensus.nuts3 prognosis["quantity"] = zensus["share"].mul( prognosis_total[zensus["nuts3"]].values ) prognosis["rounded"] = prognosis["quantity"].astype(int) prognosis["rest"] = prognosis["quantity"] - prognosis["rounded"] # Set seed for reproducibility np.random.seed( seed=egon.data.config.settings()["egon-data"]["--random-seed"] ) # Rounding process to meet exact values from demandregio on nuts3-level for name, group in prognosis.groupby(prognosis.nuts3): print(f"start prognosis nuts3 {name}") while prognosis_total[name] > group["rounded"].sum(): index = np.random.choice( group["rest"].index.values[group["rest"] == max(group["rest"])] ) group.at[index, "rounded"] += 1 group.at[index, "rest"] = 0 print(f"finished prognosis nuts3 {name}") prognosis[prognosis.index.isin(group.index)] = group prognosis = prognosis.drop(["nuts3", "quantity", "rest"], axis=1).rename( {"rounded": "households"}, axis=1 ) prognosis["year"] = year return prognosis
[docs] def zensus_household(): """Bring household prognosis from DemandRegio to Zensus grid""" local_engine = db.engine() # Input: Zensus2011 household data including the NUTS3-Code district = db.select_dataframe( f"""SELECT zensus_population_id, vg250_nuts3 FROM {SocietyPrognosis.sources.tables['map_zensus_vg250']}""", index_col="zensus_population_id", ) zensus = db.select_dataframe( f"""SELECT zensus_population_id, quantity FROM {SocietyPrognosis.sources.tables['zensus_households']}""", index_col="zensus_population_id", ) # Group all household types zensus = zensus.groupby(zensus.index).sum() zensus["nuts3"] = district.loc[zensus.index, "vg250_nuts3"] # Calculate share of households per nuts3 region in each zensus cell zensus["share"] = ( zensus.groupby(zensus.nuts3) .quantity.apply(lambda grp: grp / grp.sum()) .fillna(0) .values ) targets = SocietyPrognosis.targets db.execute_sql(f"DELETE FROM {targets.tables['household_prognosis']}") # Apply prognosis function for year in [2035, 2050]: print(f"start prognosis for year {year}") # Input: dataset on household prognosis on district-level (NUTS3) prognosis_nuts3 = db.select_dataframe( f"""SELECT nuts3, hh_size, households FROM {SocietyPrognosis.sources.tables['demandregio_households']} WHERE year={year}""", index_col="nuts3", ) # Insert into database household_prognosis_per_year(prognosis_nuts3, zensus, year).to_sql( targets.get_table_name("household_prognosis"), schema=targets.get_table_schema("household_prognosis"), con=local_engine, if_exists="append", ) print(f"finished prognosis for year {year}")