如何使用CeleryExecutor在自定义Docker映像上运行气流 [英] How to run airflow with CeleryExecutor on a custom docker image

查看:51
本文介绍了如何使用CeleryExecutor在自定义Docker映像上运行气流的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在向Web应用程序添加气流,该应用程序将包含业务逻辑的目录手动添加到 PYTHON_PATH env var,以及我希望在所有服务器上保持一致的其他系统级设置在我的集群中.我已经使用RMQ作为代理成功运行了此应用程序的celery,并以Redis作为任务结果后端运行了一段时间,并且具有使用 LocalExecutor 运行Airflow的经验.

I am adding airflow to a web application that manually adds a directory containing business logic to the PYTHON_PATH env var, as well as does additional system-level setup that I want to be consistent across all servers in my cluster. I've been successfully running celery for this application with RMQ as the broker and redis as the task results backend for awhile, and have prior experience running Airflow with LocalExecutor.

我没有使用Pukel的图像,而是有一个基本的后端图像的入口,该图像基于 SERVICE env var运行其他服务.看起来像这样:

Instead of using Pukel's image, I have a an entry point for a base backend image that runs a different service based on the SERVICE env var. That looks like this:

if [ $SERVICE == "api" ]; then
    # upgrade to the data model
    flask db upgrade

    # start the web application
    python wsgi.py
fi

if [ $SERVICE == "worker" ]; then
    celery -A tasks.celery.celery worker --loglevel=info --uid=nobody
fi

if [ $SERVICE == "scheduler" ]; then
    celery -A tasks.celery.celery beat --loglevel=info
fi

if [ $SERVICE == "airflow" ]; then
    airflow initdb
    airflow scheduler
    airflow webserver

我有一个 .env 文件,该文件使用定义我的气流参数的方式来构建容器:

I have an .env file that I build the containers with the defines my airflow parameters:

AIRFLOW_HOME=/home/backend/airflow
AIRFLOW__CORE__LOAD_EXAMPLES=False
AIRFLOW__CORE__EXECUTOR=CeleryExecutor
AIRFLOW__CORE__SQL_ALCHEMY_CONN=mysql+pymysql://${MYSQL_USER}:${MYSQL_ROOT_PASSWORD}@${MYSQL_HOST}:${MYSQL_PORT}/airflow?charset=utf8mb4
AIRFLOW__CELERY__BROKER_URL=amqp://${RABBITMQ_DEFAULT_USER}:${RABBITMQ_DEFAULT_PASS}@${RABBITMQ_HOST}:5672
AIRFLOW__CELERY__RESULT_BACKEND=redis://${REDIS_HOST}

关于当前如何设置我的入口点,它不会进入 webserver .而是通过调用Web服务器在前台运行该 scheduler .我可以将其更改为

With how my entrypoint is setup currently, it doesn't make it to the webserver. Instead, it runs that scheduler in the foreground with invoking the web server. I can change this to

airflow initdb
airflow scheduler -D
airflow webserver

现在,Web服务器正在运行,但是它不知道调度程序,该调度程序现在正在作为守护程序运行:

Now the webserver runs, but it isn't aware of the scheduler, which is now running as a daemon:

Airflow确实知道我正在使用 CeleryExecutor ,并在正确的位置查找中断:

Airflow does, however, know that I'm using a CeleryExecutor and looks for the dags in the right place:

airflow      | [2020-07-29 21:48:35,006] {default_celery.py:88} WARNING - You have configured a result_backend of redis://redis, it is highly recommended to use an alternative result_backend (i.e. a database).
airflow      | [2020-07-29 21:48:35,010] {__init__.py:50} INFO - Using executor CeleryExecutor
airflow      | [2020-07-29 21:48:35,010] {dagbag.py:396} INFO - Filling up the DagBag from /home/backend/airflow/dags
airflow      | [2020-07-29 21:48:35,113] {default_celery.py:88} WARNING - You have configured a result_backend of redis://redis, it is highly recommended to use an alternative result_backend (i.e. a database).

我可以通过进入容器内部并手动启动调度程序来解决此问题:

I can solve this by going inside the container and manually firing up the scheduler:

技巧似乎是在容器内的前台同时运行两个进程,但我仍在如何在入口点内执行此操作.我已经检查了Pukel的入口点代码,但是对我来说他在做什么并不明显.我敢肯定,只要稍加调整,就可以开始比赛了……预先感谢您的帮助.另外,如果有任何主要的反模式可能会碰到这里,我很乐意获得反馈,以便我能够正确实现气流.这是我第一次实现 CeleryExecutor ,涉及的费用很多.

The trick seems to be running both processes in the foreground within the container, but I'm stuck on how to do that inside the entrypoint. I've checked out Pukel's entrypoint code, but it's not obvious to me what he's doing. I'm sure that with just a slight tweak this will be off to the races... Thanks in advance for the help. Also, if there's any major anti-pattern that I'm at risk of running into here I'd love to get the feedback so that I can implement airflow properly. This is my first time implementing CeleryExecutor, and there's a decent amount involved.

推荐答案

尝试使用nohup. https://en.wikipedia.org/wiki/Nohup

try using nohup. https://en.wikipedia.org/wiki/Nohup

nohup气流计划程序> scheduler.log&

在您的情况下,您将更新入口点,如下所示:

in your case, you would update your entrypoint as follows:

if [ $SERVICE == "airflow" ]; then
    airflow initdb
    nohup airflow scheduler > scheduler.log &
    nohup airflow webserver
fi

这篇关于如何使用CeleryExecutor在自定义Docker映像上运行气流的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆