"""
Module that contains the command line app.
Why does this file exist, and why not put this in __main__?
You might be tempted to import things from __main__ later, but that will
cause problems: the code will get executed twice:
- When you run `python -megon.data` python will execute
``__main__.py`` as a script. That means there won't be any
``egon.data.__main__`` in ``sys.modules``.
- When you import __main__ it will get executed again (as a module) because
there's no ``egon.data.__main__`` in ``sys.modules``.
Also see (1) from http://click.pocoo.org/5/setuptools/#setuptools-integration
"""
import os
import socket
import subprocess
import sys
import time
from multiprocessing import Process
from pathlib import Path
import click
import yaml
from psycopg2 import OperationalError as PSPGOE
import egon.data
import egon.data.airflow
import egon.data.config as config
import importlib_resources as resources
from egon.data import logger
from sqlalchemy import create_engine
from sqlalchemy.exc import OperationalError as SQLAOE
from sqlalchemy.orm import Session
@click.group(
name="egon-data", context_settings={"help_option_names": ["-h", "--help"]}
)
@click.option(
"--airflow-database-name",
default="airflow",
metavar="DB",
help=("Specify the name of the airflow metadata database."),
show_default=True,
)
@click.option(
"--database-name",
"--database",
default="egon-data",
metavar="DB",
help=(
"Specify the name of the local database. The database will be"
" created if it doesn't already exist.\n\n\b"
' Note: "--database" is deprecated and will be removed in the'
" future. Please use the longer but consistent"
' "--database-name".'
),
show_default=True,
)
@click.option(
"--database-user",
default="egon",
metavar="USERNAME",
help=("Specify the user used to access the local database."),
show_default=True,
)
@click.option(
"--database-host",
default="127.0.0.1",
metavar="HOST",
help=("Specify the host on which the local database is running."),
show_default=True,
)
@click.option(
"--database-port",
default="59734",
metavar="PORT",
help=("Specify the port on which the local DBMS is listening."),
show_default=True,
)
@click.option(
"--database-password",
default="data",
metavar="PW",
help=("Specify the password used to access the local database."),
show_default=True,
)
@click.option(
"--dataset-boundary",
type=click.Choice(["Everything", "Schleswig-Holstein"]),
default="Everything",
help=(
"Choose to limit the processed data to one of the available"
" built-in boundaries."
),
show_default=True,
)
@click.option(
"--jobs",
default=1,
metavar="N",
help=(
"Spawn at maximum N tasks in parallel. Remember that in addition"
" to that, there's always the scheduler and probably the server"
" running."
),
show_default=True,
)
@click.option(
"--processes-per-task",
default=1,
metavar="N_PROCESS",
help=(
"Each task can use at maximum N_PROCESS parallel processes. Remember"
" that in addition to that, tasks can run in parallel (see N) and"
" there's always the scheduler and probably the serverrunning."
),
show_default=True,
)
@click.option(
"--docker-container-name",
default="egon-data-local-database-container",
metavar="NAME",
help=(
"The name of the Docker container containing the local database."
" You usually can stick to the default, unless you run into errors"
" due to clashing names and don't want to delete or rename your old"
" containers."
),
show_default=True,
)
@click.option(
"--compose-project-name",
default="egon-data",
metavar="PROJECT",
help=(
"The name of the Docker project."
" Different compose_project_names are needed to run multiple instances"
" of egon-data on the same machine."
),
show_default=True,
)
@click.option(
"--airflow-port",
default=8080,
metavar="AIRFLOW_PORT",
help=("Specify the port on which airflow runs."),
show_default=True,
)
@click.option(
"--random-seed",
default=42,
metavar="RANDOM_SEED",
help=(
"Random seed used by some tasks in the pipeline to ensure "
" deterministic behaviour. All published results in the eGon project "
" will be created with the default value so keep it if you want to "
" make sure to get the same results."
),
show_default=True,
)
@click.version_option(version=egon.data.__version__)
@click.pass_context
def egon_data(context, **kwargs):
"""Run and control the eGo^n data processing pipeline.
It is recommended to create a dedicated working directory in which to
run `egon-data` because `egon-data` will use it's working directory to
store configuration files and other data generated during a workflow
run. Go to to a location where you want to store eGon-data project data
and create a new directory via:
`mkdir egon-data-production && cd egon-data-production`
Of course you are free to choose a different directory name.
It is also recommended to use separate directories for production and
test mode. In test mode, you should also use a different database. This
will be created and used by typing e.g.:
`egon-data --database-name 'test-egon-data' serve`
It is important that options (e.g. `--database-name`) are placed before
commands (e.g. `serve`).
For using a smaller dataset in the test mode, use the option
`--dataset-boundary`. The complete command for starting Aiflow in test
mode with using a separate database is
`egon-data --database-name 'test-egon-data' --dataset-boundary
'Schleswig-Holstein' serve`
Whenever `egon-data` is executed, it searches for the configuration file
"egon-data.configuration.yaml" in CWD. If that file doesn't exist,
`egon-data` will create one, containing the command line parameters
supplied, as well as the defaults for those switches for which no value
was supplied.
This means, run the above command that specifies a custom database once.
Afterwards, it's sufficient to execute `egon-data serve` in the same
directory and the same configuration will be used. You can also edit the
configuration the file "egon-data.configuration.yaml" manually.
Last but not least, if you're using the default behaviour of setting
up the database in a Docker container, the working directory will
also contain a directory called "docker", containing the database
data as well as other volumes used by the "Docker"ed database.
"""
# Adapted from the `merge_copy` implementation at:
#
# https://stackoverflow.com/questions/29847098/the-best-way-to-merge-multi-nested-dictionaries-in-python-2-7
#
def merge(d1, d2):
return {
k: d1[k]
if k in d1 and k not in d2
else d2[k]
if k not in d1 and k in d2
else merge(d1[k], d2[k])
if isinstance(d1[k], dict) and isinstance(d2[k], dict)
else d2[k]
for k in set(d1).union(d2)
}
def options(value, check=None):
check = value if check is None else check
flags = {p.opts[0]: value(p) for p in egon_data.params if check(p)}
return {"egon-data": flags}
options = {
"cli": options(lambda o: kwargs[o.name], lambda o: o.name in kwargs),
"defaults": options(lambda o: o.default),
}
combined = merge(options["defaults"], options["cli"])
if not config.paths()[0].exists():
with open(config.paths()[0], "w") as f:
f.write(yaml.safe_dump(combined))
else:
with open(config.paths()[0], "r") as f:
stored = yaml.safe_load(f)
with open(config.paths()[0], "w") as f:
f.write(yaml.safe_dump(merge(combined, stored)))
# Alternatively:
# `if config.paths(pid="*") != [config.paths(pid="current")]:`
# or compare file contents.
if len(config.paths(pid="*")) > 1:
logger.error(
"Found more than one configuration file belonging to a"
" specific `egon-data` process. Unable to decide which one"
" to use.\nExiting."
)
sys.exit(1)
if len(config.paths(pid="*")) == 1:
logger.info(
"Ignoring supplied options. Found a configuration file"
" belonging to a different `egon-data` process. Using that"
" one."
)
with open(config.paths(pid="*")[0]) as f:
options = yaml.load(f, Loader=yaml.SafeLoader)
else: # len(config.paths(pid="*")) == 0, so need to create one.
with open(config.paths()[0]) as f:
options["file"] = yaml.load(f, Loader=yaml.SafeLoader)
options = dict(
options.get("file", {}),
**{
flag: options["cli"][flag]
for flag in options["cli"]
if options["cli"][flag] != options["defaults"][flag]
},
)
with open(config.paths(pid="current")[0], "w") as f:
f.write(yaml.safe_dump(options))
def render(template, target, update=True, inserts={}, **more_inserts):
os.makedirs(target.parent, exist_ok=True)
rendered = resources.read_text(egon.data.airflow, template).format(
**dict(inserts, **more_inserts)
)
if not target.exists():
with open(target, "w") as f:
f.write(rendered)
elif update:
with open(target, "r") as f:
old = f.read()
if old != rendered:
with open(target, "w") as f:
f.write(rendered)
os.environ["AIRFLOW_HOME"] = str((Path(".") / "airflow").absolute())
options = options["egon-data"]
render(
"airflow.cfg",
Path(".") / "airflow" / "airflow.cfg",
inserts=options,
dags=str(resources.files(egon.data.airflow).absolute()),
)
render(
"docker-compose.yml",
Path(".") / "docker" / "docker-compose.yml",
update=False,
inserts=options,
airflow=resources.files(egon.data.airflow),
gid=os.getgid(),
uid=os.getuid(),
)
(Path(".") / "docker" / "database-data").mkdir(parents=True, exist_ok=True)
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
code = s.connect_ex(
(options["--database-host"], int(options["--database-port"]))
)
if code != 0:
subprocess.run(
[
"docker-compose",
"-p",
options["--compose-project-name"],
"up",
"-d",
"--build",
],
cwd=str((Path(".") / "docker").absolute()),
)
time.sleep(1.5) # Give the container time to boot.
# TODO: Since "AIRFLOW_HOME" needs to be set before importing `conf`, the
# import can only be done inside this function, which is generally
# frowned upon, instead of at the module level. Maybe there's a
# better way to encapsulate this?
from airflow.configuration import conf as airflow_cfg
from airflow.models import Connection
engine = create_engine(
(
"postgresql+psycopg2://{--database-user}:{--database-password}"
"@{--database-host}:{--database-port}"
"/{--airflow-database-name}"
).format(**options),
echo=False,
)
while True: # Might still not be done booting. Poke it it's up.
try:
connection = engine.connect()
break
except PSPGOE:
pass
except SQLAOE:
pass
with connection.execution_options(
isolation_level="AUTOCOMMIT"
) as connection:
databases = [
row[0]
for row in connection.execute("SELECT datname FROM pg_database;")
]
if not options["--database-name"] in databases:
connection.execute(
f'CREATE DATABASE "{options["--database-name"]}";'
)
subprocess.run(["airflow", "db", "init"])
# TODO: Constrain SQLAlchemy's lower version to 1.4 and use a `with` block
# like the one in the last commented line to avoid an explicit
# `commit`. This can then also be used to get rid of the
# `egon.data.db.session_scope` context manager and use the new
# buil-in one instead. And we can migrate to the SQLA 2.0 query
# API.
# with Session(engine) as airflow, airflow.begin():
engine = create_engine(airflow_cfg.get("core", "SQL_ALCHEMY_CONN"))
airflow = Session(engine)
connection = (
airflow.query(Connection).filter_by(conn_id="egon_data").one_or_none()
)
connection = connection if connection else Connection(conn_id="egon_data")
connection.login = options["--database-user"]
connection.password = options["--database-password"]
connection.host = options["--database-host"]
connection.port = options["--database-port"]
connection.schema = options["--database-name"]
airflow.add(connection)
airflow.commit()
# TODO: This should probably rather be done during the database
# initialization workflow task.
from egon.data.datasets import setup
setup()
@egon_data.command(
add_help_option=False,
context_settings=dict(allow_extra_args=True, ignore_unknown_options=True),
)
@click.pass_context
def airflow(context):
subprocess.run(["airflow"] + context.args)
@egon_data.command(
context_settings={
"allow_extra_args": True,
"help_option_names": ["-h", "--help"],
"ignore_unknown_options": True,
}
)
@click.pass_context
def serve(context):
"""Start the airflow webapp controlling the egon-data pipeline.
Airflow needs, among other things, a metadata database and a running
scheduler. This command acts as a shortcut, creating the database if it
doesn't exist and starting the scheduler in the background before starting
the webserver.
Any OPTIONS other than `-h`/`--help` will be forwarded to
`airflow webserver`, so you can for example specify an alternate port
for the webapp to listen on via `egon-data serve -p PORT_NUMBER`.
Find out more about the possible webapp options via:
`egon-data airflow webserver --help`.
"""
scheduler = Process(
target=subprocess.run,
args=(["airflow", "scheduler"],),
kwargs=dict(stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL),
)
scheduler.start()
subprocess.run(["airflow", "webserver"] + context.args)
[docs]def main():
try:
egon_data.main(sys.argv[1:])
finally:
try:
config.paths(pid="current")[0].unlink()
except FileNotFoundError:
pass