作业不通过使用 RabbitMQ 运行 celery 的 Airflow 执行 [英] Jobs not executing via Airflow that runs celery with RabbitMQ

查看:17
本文介绍了作业不通过使用 RabbitMQ 运行 celery 的 Airflow 执行的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

下面是我使用的配置

[core]
# The home folder for airflow, default is ~/airflow
airflow_home = /root/airflow

# The folder where your airflow pipelines live, most likely a
# subfolder in a code repository
dags_folder = /root/airflow/dags

# The folder where airflow should store its log files. This location
base_log_folder = /root/airflow/logs

# An S3 location can be provided for log backups
# For S3, use the full URL to the base folder (starting with "s3://...")
s3_log_folder = None

# The executor class that airflow should use. Choices include
# SequentialExecutor, LocalExecutor, CeleryExecutor
#executor = SequentialExecutor
#executor = LocalExecutor
executor = CeleryExecutor

# The SqlAlchemy connection string to the metadata database.
# SqlAlchemy supports many different database engine, more information
# their website
#sql_alchemy_conn = sqlite:////home/centos/airflow/airflow.db
sql_alchemy_conn = mysql://username:password@XXX.XXX.XXX.XXX:3306/airflow_prod

[celery]
# This section only applies if you are using the CeleryExecutor in
# [core] section above


# The app name that will be used by celery
celery_app_name = airflow.executors.celery_executor

# The concurrency that will be used when starting workers with the
# "airflow worker" command. This defines the number of task instances that
# a worker will take, so size up your workers based on the resources on
# your worker box and the nature of your tasks
celeryd_concurrency = 16

# When you start an airflow worker, airflow starts a tiny web server
# subprocess to serve the workers local log files to the airflow main
# web server, who then builds pages and sends them to users. This defines
# the port on which the logs are served. It needs to be unused, and open
# visible from the main web server to connect into the workers.
worker_log_server_port = 8793

# The Celery broker URL. Celery supports RabbitMQ, Redis and experimentally
# a sqlalchemy database. Refer to the Celery documentation for more
# information.
broker_url = pyamqp://guest:guest@XXX.XXX.XXX.XXX:5672/


# Another key Celery setting
celery_result_backend = db+mysql://username:password@XXX.XXX.XXX.XXX:3306/airflow_prod

# Celery Flower is a sweet UI for Celery. Airflow has a shortcut to start
# it `airflow flower`. This defines the port that Celery Flower runs on
flower_port = 5556

# Default queue that tasks get assigned to and that worker listen on.
default_queue = = default

但作业没有运行.. 调度程序显示它正在检查如下状态

But jobs dont run.. the scheduler shows that it is checking for the state as below

[2017-05-11 05:09:13,070] {models.py:2274} INFO - Checking state for <DagRun tutorial @ 2015-06-13 00:00:00: scheduled__2015-06-13T00:00:00, externally triggered: False>
[2017-05-11 05:09:13,072] {models.py:2274} INFO - Checking state for <DagRun tutorial @ 2015-06-14 00:00:00: scheduled__2015-06-14T00:00:00, externally triggered: False>
[2017-05-11 05:09:13,074] {models.py:2274} INFO - Checking state for <DagRun tutorial @ 2015-06-15 00:00:00: scheduled__2015-06-15T00:00:00, externally triggered: False>
[2017-05-11 05:09:13,076] {models.py:2274} INFO - Checking state for <DagRun tutorial @ 2015-06-16 00:00:00: scheduled__2015-06-16T00:00:00, externally triggered: False>
[2017-05-11 05:09:13,078] {models.py:2274} INFO - Checking state for <DagRun tutorial @ 2017-05-10 04:46:29: manual__2017-05-10T04:46:28.756946, externally triggered: True>
[2017-05-11 05:09:13,080] {models.py:2274} INFO - Checking state for <DagRun tutorial @ 2017-05-10 05:08:20: manual__2017-05-10T05:08:20.252573, externally triggered: True>

Airflow UI 已启动并正在运行.芹菜花不显示任何工人.我的作业没有运行.

Airflow UI is up and running. Cerlery flower doesnt show any workers. My jobs are not running.

以下是我开始遵循的顺序.

Below is the sequence i follow to get started.

气流调度器

气流网络服务器 -p 8080

气流工作者

我有什么遗漏吗?

推荐答案

在不知道你运行的是什么版本的 Airflow 以及你如何配置你的 rabbitmq-server 的情况下,肯定地回答你的问题有点困难.不过,我可以提供一些东西供您研究.

Without knowing what version of Airflow you are running and how you have configured your rabbitmq-server, it is somewhat difficult to answer your question with surety. However, I can offer some things for you to look into.

这里是指定代理 URL 的 Celery 文档.airflow.cfg 中的代理 URL 未指定虚拟主机,因此根据文档,将使用默认虚拟主机.我做了一些挖掘,但找不到 pyampq 的默认虚拟主机是什么,但这值得研究.

Here is the Celery documentation for specifying a broker URL. The broker URL in your airflow.cfg does not specify a virtual host, so per the documentation the default virtual host will be used. I did some digging but could not find what the default virtual host for pyampq is, but this is worth looking in to.

或者,您可以使用 rabbitmqctl 显式配置虚拟主机.这是一篇文章,有人介绍了如何使用 Airflow 进行此操作.我已复制并粘贴以下相关信息:

Alternatively, you could explicitly configure a virtual host using rabbitmqctl. Here is a post where someone lays out how to do this with Airflow. I have copy and pasted the relevant information below:

# Configure RabbitMQ: create user and grant privileges
rabbitmqctl add_user rabbitmq_user_name rabbitmq_password
rabbitmqctl add_vhost rabbitmq_virtual_host_name
rabbitmqctl set_user_tags rabbitmq_user_name rabbitmq_tag_name
rabbitmqctl set_permissions -p rabbitmq_virtual_host_name rabbitmq_user_name ".*" ".*" ".*"

最后,您使用的 Celery 版本可能会遇到问题.在发布时,Celery 4.X.X 不能很好地与 Airflow 配合使用.尝试卸载 Celery 并重新安装有效的版本.

Finally, you may be running into an issue with the Celery version you are using. At time of posting, Celery 4.X.X does not play nicely with Airflow. Try uninstalling Celery and reinstalling a version that works.

pip uninstall celery
pip install celery==3.1.7

这篇关于作业不通过使用 RabbitMQ 运行 celery 的 Airflow 执行的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆