如何使用 docker-compose 在分布式气流架构上配置 celery worker? [英] How to configure celery worker on distributed airflow architecture using docker-compose?

查看:17
本文介绍了如何使用 docker-compose 在分布式气流架构上配置 celery worker?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在建立一个分布式 Airflow 集群,其中除了 celery 工作人员之外的所有其他内容都在一台主机上运行,​​而处理则在多台主机上完成.使用 Airflow 文档 https://airflow.apache.org/docs/apache-airflow/stable/docker-compose.yaml .在我最初的测试中,当我在同一台主机上运行所有东西时,我让架构很好地工作.问题是,如何在远程主机上启动 celery worker?

I’m setting up a distributed Airflow cluster where everything else except the celery workers are run on one host and processing is done on several hosts. The airflow2.0 setup is configured using the yaml file given at the Airflow documentation https://airflow.apache.org/docs/apache-airflow/stable/docker-compose.yaml . In my initial tests I got the architecture to work nicely when I run everything at the same host. The question is, how to start the celery workers at the remote hosts?

到目前为止,我尝试创建上述 docker-compose 的修剪版本,我只在工作主机上启动 celery 工作人员,没有其他任何事情.但是我遇到了数据库连接的一些问题.在修剪后的版本中,我更改了 URL,以便它们指向运行 db 和 redis 的主机.

So far, I tried to create a trimmed version of the above docker-compose where I only start the celery workers at the worker host and nothing else. But I run into some issues with db connection. In the trimmed version I changed the URL so that they point to the host that runs the db and redis.

dag、日志、插件和 postgresql 数据库位于所有主机都可以看到的共享驱动器上.

dags, logs, plugins and the postgresql db are located on a shared drive that is visible to all hosts.

我应该如何进行配置?任何想法要检查什么?连接等?Celery worker docker-compose 配置:

How should I do the configuration? Any ideas what to check? Connections etc.? Celery worker docker-compose configuration:

---
version: '3'
x-airflow-common:
  &airflow-common
  image: ${AIRFLOW_IMAGE_NAME:-apache/airflow:2.1.0}
  environment:
    &airflow-common-env
    AIRFLOW_UID: 50000
    AIRFLOW_GID: 50000
    AIRFLOW__CORE__EXECUTOR: CeleryExecutor
    AIRFLOW__CORE__SQL_ALCHEMY_CONN: 
postgresql+psycopg2://airflow:airflow@airflowhost.example.com:8080/airflow
    AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@airflow@airflowhost.example.com:8080/airflow
    AIRFLOW__CELERY__BROKER_URL: redis://:@airflow@airflowhost.example.com:6380/0
    AIRFLOW__CORE__FERNET_KEY: ''
    AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: 'true'
    AIRFLOW__CORE__LOAD_EXAMPLES: 'true'
    AIRFLOW__API__AUTH_BACKEND: 'airflow.api.auth.backend.basic_auth'
    REDIS_PORT: 6380
   volumes:
    - /airflow/dev/dags:/opt/airflow/dags
    - /airflow/dev/logs:/opt/airflow/logs
    - /airflow/dev/plugins:/opt/airflow/plugins
   user: "${AIRFLOW_UID:-50000}:${AIRFLOW_GID:-50000}"
services:
  airflow-remote-worker:
    <<: *airflow-common
    command: celery worker
    healthcheck:
      test:
        - "CMD-SHELL"
        - 'celery --app airflow.executors.celery_executor.app inspect ping -d "celery@$${HOSTNAME}"'
      interval: 10s
      timeout: 10s
      retries: 5
    restart: always

编辑 1:我在处理日志文件时仍然遇到一些困难.看来共享日志目录并不能解决丢失日志文件的问题.我像建议的那样在 main 上添加了 extra_host 定义,并在工作机器上打开了端口 8793.工作任务失败并显示日志:

EDIT 1: I'm Still having some difficulties with the log files. It appears that sharing the log directory doesn't solve the issue of missing log files. I added the extra_host definition on main like suggested and opened the port 8793 on the worker machine. The worker tasks fail with log:

*** Log file does not exist: 
/opt/airflow/logs/tutorial/print_date/2021-07- 
01T13:57:11.087882+00:00/1.log
*** Fetching from: http://:8793/log/tutorial/print_date/2021-07-01T13:57:11.087882+00:00/1.log
*** Failed to fetch log file from worker. Unsupported URL protocol ''

推荐答案

这些设置远非终极设置",而是在核心节点和 Airflow 中使用 docker-compose 对我有用的一些设置工人:

Far from being the "ultimate set-up", these are some settings that worked for me using the docker-compose from Airflow in the core node and the workers:

  • 必须可以从运行 Webserver 的主节点访问工作节点.我找到了这张图表CeleryExecutor 架构非常有助于解决问题.

  • The worker nodes have to be reachable from the main node where the Webserver runs. I found this diagram of the CeleryExecutor architecture to be very helpful to sort things out.

当尝试读取日志时,如果在本地找不到它们,它将尝试从远程工作者那里检索它们.因此,您的主节点可能不知道您的工作人员的主机名,因此您要么更改主机名的解析方式(hostname_callable 设置,默认为 socket.getfqdn )或您只需将名称解析功能添加到 Webserver.这可以通过在 x-airflow-common 定义中添加 extra_hosts 配置键来完成:

When trying to read the logs, if they are not found locally, it will try to retrieve them from the remote worker. Thus your main node may not know the hostname of your workers, so you either change how the hostnames are being resolved (hostname_callable setting, which defaults to socket.getfqdn ) or you just simply add name resolution capability to the Webserver. This could be done by adding the extra_hosts config key in the x-airflow-common definition:

---
version: "3"
x-airflow-common: &airflow-common
  image: ${AIRFLOW_IMAGE_NAME:-apache/airflow:2.1.0}
  environment: &airflow-common-env
    ...# env vars
  extra_hosts:
    - "worker-01-hostname:worker-01-ip-address" # "worker-01-hostname:192.168.0.11"
    - "worker-02-hostname:worker-02-ip-address"

*请注意,在您拥有共享驱动器的特定情况下,我认为日志将在本地找到.

  • 定义并行DAG并发调度程序解析过程.可以通过使用环境变量来完成:
  • Define parallelism, DAG concurrency, and scheduler parsing processes. Could be done by using env vars:
x-airflow-common: &airflow-common
  image: ${AIRFLOW_IMAGE_NAME:-apache/airflow:2.1.0}
  environment: &airflow-common-env
    AIRFLOW__CORE__PARALLELISM: 64
    AIRFLOW__CORE__DAG_CONCURRENCY: 32
    AIRFLOW__SCHEDULER__PARSING_PROCESSES: 4

当然,要设置的值取决于您的具体情况和可用资源.这篇文章对该主题进行了很好的概述.DAG 设置也可以在 DAG 定义中被覆盖.

Of course, the values to be set depend on your specific case and available resources. This article has a good overview of the subject. DAG settings could also be overridden at DAG definition.

  • 定义 worker CELERY__WORKER_CONCURRENCY,默认可以是机器上可用的 CPU 数量(文档).

  • Define worker CELERY__WORKER_CONCURRENCY, default could be the numbers of CPUs available on the machine (docs).

定义如何访问主节点中运行的服务.设置 IP 或主机名并注意主节点中匹配的暴露端口:

Define how to reach the services running in the main node. Set an IP or hostname and watch out for matching exposed ports in the main node:

x-airflow-common: &airflow-common
  image: ${AIRFLOW_IMAGE_NAME:-apache/airflow:2.1.0}
  environment: &airflow-common-env
  AIRFLOW__CORE__EXECUTOR: CeleryExecutor
  AIRFLOW__CELERY__WORKER_CONCURRENCY: 8
  AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@main_node_ip_or_hostname:5432/airflow # 5432 is default postgres port
  AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@main_node_ip_or_hostname:5432/airflow
  AIRFLOW__CELERY__BROKER_URL: redis://:@main_node_ip_or_hostname:6379/0

  • 共享相同的 Fernet KeySecret Key 从.env"文件:
    • Share the same Fernet Key and Secret Key reading them from an ".env" file:
    •   environment: &airflow-common-env
          AIRFLOW__CORE__FERNET_KEY: ${FERNET_KEY}
          AIRFLOW__WEBSERVER__SECRET_KEY: ${SECRET_KEY}
      
        env_file:
          - .env
      

      .env 文件:FERNET_KEY=jvYUaxxxxxxxxxxxxx=

      • 关键集群中的每个节点(主节点和工作节点)都应用相同的设置.

      • It's critical that every node in the cluster (main and workers) has the same settings applied.

      为工作服务定义一个主机名,以避免自动生成匹配容器 ID.

      Define a hostname to the worker service to avoid autogenerated matching the container id.

      公开端口 8793,这是用于从 worker 获取日志的默认端口 (文档):

      Expose port 8793, which is the default port used to fetch the logs from the worker (docs):

      services:
        airflow-worker:
          <<: *airflow-common
          hostname: ${HOSTNAME}
          ports:
            - 8793:8793
          command: celery worker
          restart: always
      

      • 确保每个工作节点主机都以相同的时间配置运行,几分钟的差异可能会导致严重的执行错误,而这些错误可能不太容易找到.考虑在主机操作系统上启用 NTP 服务.
      • 如果您有繁重的工作负载和高并发,您可能需要调整 Postgres 设置,例如 max_connectionsshared_buffers.这同样适用于主机操作系统网络设置,例如 ip_local_port_rangesomaxconn.

        If you have heavy workloads and high concurrency in general, you may need to tune Postgres settings such as max_connections and shared_buffers. The same applies to the host OS network settings such as ip_local_port_range or somaxconn.

        在我在初始集群设置过程中遇到的任何问题中,Flower 和工作程序执行日志始终提供有用的详细信息和错误消息,包括任务级日志和 Docker-Compose 服务日志,即:docker-compose logs --tail=10000airflow-worker >worker_logs.log.

        In any issues I encountered during the initial cluster setup, Flower and the worker execution logs always provided helpful details and error messages, both the task-level logs and the Docker-Compose service log i.e: docker-compose logs --tail=10000 airflow-worker > worker_logs.log.

        希望对你有用!

        这篇关于如何使用 docker-compose 在分布式气流架构上配置 celery worker?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆