Source code for egon.data.datasets.electricity_demand_timeseries.tools

from io import StringIO
import csv
import time

from shapely.geometry import Point
import geopandas as gpd
import numpy as np
import pandas as pd

from egon.data import db, logger

engine = db.engine()


[docs]def timeit(func): """ Decorator for measuring function's running time. """ def measure_time(*args, **kw): start_time = time.time() result = func(*args, **kw) print( "Processing time of %s(): %.2f seconds." % (func.__qualname__, time.time() - start_time) ) return result return measure_time
[docs]def random_point_in_square(geom, tol): """ Generate a random point within a square Parameters ---------- geom: gpd.Series Geometries of square tol: float tolerance to square bounds Returns ------- points: gpd.Series Series of random points """ # cell bounds - half edge_length to not build buildings on the cell border xmin = geom.bounds["minx"] + tol / 2 xmax = geom.bounds["maxx"] - tol / 2 ymin = geom.bounds["miny"] + tol / 2 ymax = geom.bounds["maxy"] - tol / 2 # generate random coordinates within bounds - half edge_length x = (xmax - xmin) * np.random.rand(geom.shape[0]) + xmin y = (ymax - ymin) * np.random.rand(geom.shape[0]) + ymin points = pd.Series([Point(cords) for cords in zip(x, y)]) points = gpd.GeoSeries(points, crs="epsg:3035") return points
# distribute amenities evenly
[docs]def specific_int_until_sum(s_sum, i_int): """ Generate list `i_int` summing to `s_sum`. Last value will be <= `i_int` """ list_i = [] if [s_sum % i_int] == [0] else [s_sum % i_int] list_i += s_sum // i_int * [i_int] return list_i
[docs]def random_ints_until_sum(s_sum, m_max): """ Generate non-negative random integers < `m_max` summing to `s_sum`. """ list_r = [] while s_sum > 0: r = np.random.randint(1, m_max + 1) r = r if r <= m_max and r < s_sum else s_sum list_r.append(r) s_sum -= r return list_r
[docs]def write_table_to_postgis(gdf, table, engine=db.engine(), drop=True): """ Helper function to append df data to table in db. Only predefined columns are passed. Error will raise if column is missing. Dtype of columns are taken from table definition. Parameters ---------- gdf: gpd.DataFrame Table of data table: declarative_base Metadata of db table to export to engine: connection to database db.engine() drop: bool Drop table before appending """ # Only take in db table defined columns columns = [column.key for column in table.__table__.columns] gdf = gdf.loc[:, columns] if drop: table.__table__.drop(bind=engine, checkfirst=True) table.__table__.create(bind=engine) dtypes = { i: table.__table__.columns[i].type for i in table.__table__.columns.keys() } # Write new buildings incl coord into db gdf.to_postgis( name=table.__tablename__, con=engine, if_exists="append", schema=table.__table_args__["schema"], dtype=dtypes, )
[docs]def psql_insert_copy(table, conn, keys, data_iter): """ Execute SQL statement inserting data Parameters ---------- table : pandas.io.sql.SQLTable conn : sqlalchemy.engine.Engine or sqlalchemy.engine.Connection keys : list of str Column names data_iter : Iterable that iterates the values to be inserted """ # gets a DBAPI connection that can provide a cursor dbapi_conn = conn.connection with dbapi_conn.cursor() as cur: s_buf = StringIO() writer = csv.writer(s_buf) writer.writerows(data_iter) s_buf.seek(0) columns = ", ".join('"{}"'.format(k) for k in keys) if table.schema: table_name = "{}.{}".format(table.schema, table.name) else: table_name = table.name sql = "COPY {} ({}) FROM STDIN WITH CSV".format(table_name, columns) cur.copy_expert(sql=sql, file=s_buf)
[docs]def write_table_to_postgres( df, db_table, drop=False, index=False, if_exists="append" ): """ Helper function to append df data to table in db. Fast string-copy is used. Only predefined columns are passed. If column is missing in dataframe a warning is logged. Dtypes of columns are taken from table definition. The writing process happens in a scoped session. Parameters ---------- df: pd.DataFrame Table of data db_table: declarative_base Metadata of db table to export to drop: boolean, default False Drop db-table before appending index: boolean, default False Write DataFrame index as a column. if_exists: {'fail', 'replace', 'append'}, default 'append' - fail: If table exists, do nothing. - replace: If table exists, drop it, recreate it, and insert data. - append: If table exists, insert data. Create if does not exist. """ logger.info("Write table to db") # Only take in db table defined columns and dtypes columns = { column.key: column.type for column in db_table.__table__.columns } # Take only the columns defined in class # pandas raises an error if column is missing try: df = df.loc[:, columns.keys()] except KeyError: same = df.columns.intersection(columns.keys()) missing = same.symmetric_difference(df.columns) logger.warning(f"Columns: {missing.values} missing!") df = df.loc[:, same] if drop: db_table.__table__.drop(bind=engine, checkfirst=True) db_table.__table__.create(bind=engine) else: db_table.__table__.create(bind=engine, checkfirst=True) with db.session_scope() as session: df.to_sql( name=db_table.__table__.name, schema=db_table.__table__.schema, con=session.connection(), if_exists=if_exists, index=index, method=psql_insert_copy, dtype=columns, )