无法通过通过RabbitMQ运行celery的Airflow执行作业 [英] Jobs not executing via Airflow that runs celery with RabbitMQ

查看:159
本文介绍了无法通过通过RabbitMQ运行celery的Airflow执行作业的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

以下是配置即时消息使用

  [core] 
#用于通风的主文件夹,默认为〜 / airflow
airflow_home = / root / airflow

#您的气流管道所在的文件夹,很可能是代码存储库中的
#子文件夹
dags_folder = / root / airflow / dags

#气流应存储其日志文件的文件夹。此位置
base_log_folder = / root / airflow / logs

#可以为日志备份提供S3位置
#对于S3,请使用基本文件夹的完整URL(开始与 s3:// ...)
s3_log_folder = None

#气流应使用的执行器类。选项包括
#SequentialExecutor,LocalExecutor,CeleryExecutor
#executor = SequentialExecutor
#executor = LocalExecutor
executor = CeleryExecutor

#与SqlAlchemy连接的字符串元数据数据库。
#SqlAlchemy支持许多不同的数据库引擎,更多信息
#他们的网站
#sql_alchemy_conn = sqlite://///home/centos/airflow/airflow.db
sql_alchemy_conn = mysql :// username:password@XXX.XXX.XXX.XXX:3306 / airflow_prod

[celery]
#本节仅在
中使用CeleryExecutor时适用。 [core]以上


#部分celery
将使用的应用程序名称celery_app_name = airflow.executors.celery_executor

#并发性将在使用
# airflow worker命令启动工作程序时使用。这定义了一个工作人员将承担的
#个任务实例的数量,因此请根据
#工作框上的资源和任务的性质来确定工作人员的大小
celeryd_concurrency = 16

#启动气流工作人员时,气流会启动一个微型Web服务器
#子进程,将工作人员的本地日志文件提供给气流主要的
#Web服务器,然后由该Web服务器构建页面并将其发送给用户。这定义了
#提供日志的端口。它不需要使用,并打开从主Web服务器可见的
#以连接到工作服务器。
worker_log_server_port = 8793

#芹菜经纪人URL。 Celery支持RabbitMQ,Redis和实验性的
#sqlalchemy数据库。有关更多
#信息,请参考Celery文档。
broker_url = pyamqp:// guest:guest@XXX.XXX.XXX.XXX:5672 /


#另一个Celery设置
celery_result_backend = db + mysql :// username:password@XXX.XXX.XXX.XXX:3306 / airflow_prod

#Celery Flower是Celery的甜美UI。 Airflow有一个启动
#it`airflow flower`的快捷方式。这定义了Celery Flower在
flower_port = 5556

#上运行的端口#将任务分配给该默认队列并且该工作人员侦听的默认队列。
default_queue = =默认

但是作业没有运行..调度程序显示它正在检查对于以下状态

  [2017-05-11 05:09:13,070] {models.py:2274}信息-检查< DagRun教程@ 2015-06-13 00:00:00的时间:预定__2015-06-13T00:00:00,外部触发:False> 
[2017-05-11 05:09:13,072] {models.py:2274}信息-检查< DagRun教程@ 2015-06-14 00:00:00的状态:预定__2015-06-14T00: 00:00,从外部触发:False>
[2017-05-11 05:09:13,074] {models.py:2274}信息-检查< DagRun教程@ 2015-06-15 00:00:00的状态:预定__2015-06-15T00: 00:00,从外部触发:False>
[2017-05-11 05:09:13,076] {models.py:2274}信息-检查< DagRun教程@ 2015-06-16 00:00:00的状态:预定__2015-06-16T00: 00:00,从外部触发:False>
[2017-05-11 05:09:13,078] {models.py:2274}信息-检查状态< DagRun教程@ 2017-05-10 04:46:29:manual__2017-05-10T04: 46:28.756946,外部触发:True>
[2017-05-11 05:09:13,080] {models.py:2274}信息-检查状态< DagRun教程@ 2017-05-10 05:08:20:manual__2017-05-10T05: 08:20.252573,外部触发:True>

Airflow UI已启动并正在运行。餐具花不显示任何工人。 我的作业没有运行。



以下是我开始使用的顺序。



气流计划程序



气流网络服务器-p 8080



气流工作者



我缺少什么吗?

解决方案

在不知道您运行的是哪个版本的Airflow以及如何配置Rabbitmq服务器的情况下,很难确定地回答您的问题。但是,我可以为您提供一些参考。



此处是用于指定代理URL的Celery文档 airflow.cfg 中的代理URL未指定虚拟主机,因此根据文档,将使用默认虚拟主机。我做了一些挖掘,但是找不到pyampq的默认虚拟主机是什么,但这值得一看。



或者,您可以使用以下方式显式配置虚拟主机: rabbitmqctl 此处是某人发布有关如何使用Airflow做到这一点的帖子。我已经复制并粘贴了以下相关信息:

 #配置RabbitMQ:创建用户并授予权限
rabbitmqctl add_user rabbitmq_user_name * *。 *。。 / pre>

最后,您使用的Celery版本可能遇到问题。发布时,Celery 4.X.X在Airflow上不能很好地播放。尝试卸载Celery并重新安装可用的版本。

  pip卸载celery 
pip install celery == 3.1.7


Below is the config im using

[core]
# The home folder for airflow, default is ~/airflow
airflow_home = /root/airflow

# The folder where your airflow pipelines live, most likely a
# subfolder in a code repository
dags_folder = /root/airflow/dags

# The folder where airflow should store its log files. This location
base_log_folder = /root/airflow/logs

# An S3 location can be provided for log backups
# For S3, use the full URL to the base folder (starting with "s3://...")
s3_log_folder = None

# The executor class that airflow should use. Choices include
# SequentialExecutor, LocalExecutor, CeleryExecutor
#executor = SequentialExecutor
#executor = LocalExecutor
executor = CeleryExecutor

# The SqlAlchemy connection string to the metadata database.
# SqlAlchemy supports many different database engine, more information
# their website
#sql_alchemy_conn = sqlite:////home/centos/airflow/airflow.db
sql_alchemy_conn = mysql://username:password@XXX.XXX.XXX.XXX:3306/airflow_prod

[celery]
# This section only applies if you are using the CeleryExecutor in
# [core] section above


# The app name that will be used by celery
celery_app_name = airflow.executors.celery_executor

# The concurrency that will be used when starting workers with the
# "airflow worker" command. This defines the number of task instances that
# a worker will take, so size up your workers based on the resources on
# your worker box and the nature of your tasks
celeryd_concurrency = 16

# When you start an airflow worker, airflow starts a tiny web server
# subprocess to serve the workers local log files to the airflow main
# web server, who then builds pages and sends them to users. This defines
# the port on which the logs are served. It needs to be unused, and open
# visible from the main web server to connect into the workers.
worker_log_server_port = 8793

# The Celery broker URL. Celery supports RabbitMQ, Redis and experimentally
# a sqlalchemy database. Refer to the Celery documentation for more
# information.
broker_url = pyamqp://guest:guest@XXX.XXX.XXX.XXX:5672/


# Another key Celery setting
celery_result_backend = db+mysql://username:password@XXX.XXX.XXX.XXX:3306/airflow_prod

# Celery Flower is a sweet UI for Celery. Airflow has a shortcut to start
# it `airflow flower`. This defines the port that Celery Flower runs on
flower_port = 5556

# Default queue that tasks get assigned to and that worker listen on.
default_queue = = default

But jobs dont run.. the scheduler shows that it is checking for the state as below

[2017-05-11 05:09:13,070] {models.py:2274} INFO - Checking state for <DagRun tutorial @ 2015-06-13 00:00:00: scheduled__2015-06-13T00:00:00, externally triggered: False>
[2017-05-11 05:09:13,072] {models.py:2274} INFO - Checking state for <DagRun tutorial @ 2015-06-14 00:00:00: scheduled__2015-06-14T00:00:00, externally triggered: False>
[2017-05-11 05:09:13,074] {models.py:2274} INFO - Checking state for <DagRun tutorial @ 2015-06-15 00:00:00: scheduled__2015-06-15T00:00:00, externally triggered: False>
[2017-05-11 05:09:13,076] {models.py:2274} INFO - Checking state for <DagRun tutorial @ 2015-06-16 00:00:00: scheduled__2015-06-16T00:00:00, externally triggered: False>
[2017-05-11 05:09:13,078] {models.py:2274} INFO - Checking state for <DagRun tutorial @ 2017-05-10 04:46:29: manual__2017-05-10T04:46:28.756946, externally triggered: True>
[2017-05-11 05:09:13,080] {models.py:2274} INFO - Checking state for <DagRun tutorial @ 2017-05-10 05:08:20: manual__2017-05-10T05:08:20.252573, externally triggered: True>

Airflow UI is up and running. Cerlery flower doesnt show any workers. My jobs are not running.

Below is the sequence i follow to get started.

Airflow scheduler

airflow webserver -p 8080

Airflow worker

Is there anything i am missing?

解决方案

Without knowing what version of Airflow you are running and how you have configured your rabbitmq-server, it is somewhat difficult to answer your question with surety. However, I can offer some things for you to look into.

Here is the Celery documentation for specifying a broker URL. The broker URL in your airflow.cfg does not specify a virtual host, so per the documentation the default virtual host will be used. I did some digging but could not find what the default virtual host for pyampq is, but this is worth looking in to.

Alternatively, you could explicitly configure a virtual host using rabbitmqctl. Here is a post where someone lays out how to do this with Airflow. I have copy and pasted the relevant information below:

# Configure RabbitMQ: create user and grant privileges
rabbitmqctl add_user rabbitmq_user_name rabbitmq_password
rabbitmqctl add_vhost rabbitmq_virtual_host_name
rabbitmqctl set_user_tags rabbitmq_user_name rabbitmq_tag_name
rabbitmqctl set_permissions -p rabbitmq_virtual_host_name rabbitmq_user_name ".*" ".*" ".*"

Finally, you may be running into an issue with the Celery version you are using. At time of posting, Celery 4.X.X does not play nicely with Airflow. Try uninstalling Celery and reinstalling a version that works.

pip uninstall celery
pip install celery==3.1.7

这篇关于无法通过通过RabbitMQ运行celery的Airflow执行作业的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆