如何使用Kubernetes上部署的Flink运行Beam Python管道? [英] How do I run Beam Python pipelines using Flink deployed on Kubernetes?

查看:81
本文介绍了如何使用Kubernetes上部署的Flink运行Beam Python管道?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

当Flink在Kubernetes中作为Pod运行时,有人知道如何通过Flink运行Beam Python管道吗?

Does anybody know how to run Beam Python pipelines with Flink when Flink is running as pods in Kubernetes?

我已经成功地使用可移植运行器和作业服务成功运行了Beam Python管道,该服务指向在Docker容器中运行的本地Flink服务器.

I have successfully managed to run a Beam Python pipeline using the Portable runner and the job service pointing to a local Flink server running in Docker containers.

我能够在Flink容器中安装Docker套接字,并以root进程运行Flink,因此DockerEnvironmentFactory类可以创建Python框架容器.

I was able to achieve that mounting the Docker socket in my Flink containers, and running Flink as root process, so the class DockerEnvironmentFactory can create the Python harness container.

不幸的是,当Flink在Kubernetes中运行时,我不能使用相同的解决方案.而且,我不想使用我的pod中的Docker命令创建Python框架容器.

Unfortunately, I can't use the same solution when Flink is running in Kubernetes. Moreover, I don't want to create the Python harness container using the Docker command from my pods.

似乎BeanRunner会自动选择Docker来执行Python管道.但是,我注意到有一个名为ExternalEnvironmentFactory的实现,但是我不确定如何使用它.

It seems that Bean runner automatically selects Docker for executing Python pipelines. However, I noticed there is an implementation called ExternalEnvironmentFactory, but I am not sure how to use it.

是否有一种方法可以部署侧容器并使用其他工厂来运行Python应用程序进程?正确的方法是什么?

Is there a way to deploy a side container and use a different factory to run the Python harness process? What is the correct approach?

这是DockerEnvironmentFactory的补丁:

This is the patch for DockerEnvironmentFactory:

diff -pr beam-release-2.15.0/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/DockerEnvironmentFactory.java beam-release-2.15.0-1/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/DockerEnvironmentFactory.java
*** beam-release-2.15.0/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/DockerEnvironmentFactory.java   2019-08-14 22:33:41.000000000 +0100
--- beam-release-2.15.0-1/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/DockerEnvironmentFactory.java 2019-09-09 16:02:07.000000000 +0100
*************** package org.apache.beam.runners.fnexecut
*** 19,24 ****
--- 19,26 ----

  import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.MoreObjects.firstNonNull;

+ import java.net.InetAddress;
+ import java.net.UnknownHostException;
  import java.nio.file.Files;
  import java.nio.file.Paths;
  import java.time.Duration;
*************** public class DockerEnvironmentFactory im
*** 127,133 ****
          ImmutableList.<String>builder()
              .addAll(gcsCredentialArgs())
              // NOTE: Host networking does not work on Mac, but the command line flag is accepted.
!             .add("--network=host")
              // We need to pass on the information about Docker-on-Mac environment (due to missing
              // host networking on Mac)
              .add("--env=DOCKER_MAC_CONTAINER=" + System.getenv("DOCKER_MAC_CONTAINER"));
--- 129,135 ----
          ImmutableList.<String>builder()
              .addAll(gcsCredentialArgs())
              // NOTE: Host networking does not work on Mac, but the command line flag is accepted.
!             .add("--network=flink")
              // We need to pass on the information about Docker-on-Mac environment (due to missing
              // host networking on Mac)
              .add("--env=DOCKER_MAC_CONTAINER=" + System.getenv("DOCKER_MAC_CONTAINER"));
*************** public class DockerEnvironmentFactory im
*** 222,228 ****

      private static ServerFactory getServerFactory() {
        ServerFactory.UrlFactory dockerUrlFactory =
!           (host, port) -> HostAndPort.fromParts(DOCKER_FOR_MAC_HOST, port).toString();
        if (RUNNING_INSIDE_DOCKER_ON_MAC) {
          // If we're already running in a container, we need to use a fixed port range due to
          // non-existing host networking in Docker-for-Mac. The port range needs to be published
--- 224,230 ----

      private static ServerFactory getServerFactory() {
        ServerFactory.UrlFactory dockerUrlFactory =
!               (host, port) -> HostAndPort.fromParts(getCanonicalHostName(), port).toString();
        if (RUNNING_INSIDE_DOCKER_ON_MAC) {
          // If we're already running in a container, we need to use a fixed port range due to
          // non-existing host networking in Docker-for-Mac. The port range needs to be published
*************** public class DockerEnvironmentFactory im
*** 237,242 ****
--- 239,252 ----
      }
    }

+   private static String getCanonicalHostName() throws RuntimeException {
+     try {
+       return InetAddress.getLocalHost().getCanonicalHostName();
+     } catch (UnknownHostException e) {
+       throw new RuntimeException(e);
+     }
+   }
+
    /** Provider for DockerEnvironmentFactory. */
    public static class Provider implements EnvironmentFactory.Provider {
      private final boolean retainDockerContainer;
*************** public class DockerEnvironmentFactory im
*** 269,275 ****
      public ServerFactory getServerFactory() {
        switch (getPlatform()) {
          case LINUX:
!           return ServerFactory.createDefault();
          case MAC:
            return DockerOnMac.getServerFactory();
          default:
--- 279,286 ----
      public ServerFactory getServerFactory() {
        switch (getPlatform()) {
          case LINUX:
!           return DockerOnMac.getServerFactory();
! //          return ServerFactory.createDefault();
          case MAC:
            return DockerOnMac.getServerFactory();
          default:

这是我用于运行Flink的Docker组合文件:

This is the Docker compose file I use to run Flink:

version: '3.4'
services:
  jobmanager:
    image: tenx/flink:1.8.1
    command: 'jobmanager'
    environment:
      JOB_MANAGER_RPC_ADDRESS: 'jobmanager'
      DOCKER_MAC_CONTAINER: 1
      FLINK_JM_HEAP: 128
    volumes:
      - jobmanager-data:/data
      - /var/run/docker.sock:/var/run/docker.sock
    ports:
      - target: 8081
        published: 8081
        protocol: tcp
        mode: ingress
    networks:
      - flink
  taskmanager:
    image: tenx/flink:1.8.1
    command: 'taskmanager'
    environment:
      JOB_MANAGER_RPC_ADDRESS: 'jobmanager'
      DOCKER_MAC_CONTAINER: 1
      FLINK_TM_HEAP: 1024
      TASK_MANAGER_NUMBER_OF_TASK_SLOTS: 2
    networks:
      - flink
    volumes:
      - taskmanager-data:/data
      - /var/run/docker.sock:/var/run/docker.sock
      - /var/folders:/var/folders
volumes:
    jobmanager-data:
    taskmanager-data:
networks:
  flink:
    external: true

这是我的Python管道:

This is my Python pipeline:

import apache_beam as beam
import logging

class LogElements(beam.PTransform):

    class _LoggingFn(beam.DoFn):

        def __init__(self, prefix=''):
            super(LogElements._LoggingFn, self).__init__()
            self.prefix = prefix

        def process(self, element, **kwargs):
            logging.info(self.prefix + str(element))
            yield element

    def __init__(self, label=None, prefix=''):
        super(LogElements, self).__init__(label)
        self.prefix = prefix

    def expand(self, input):
        input | beam.ParDo(self._LoggingFn(self.prefix))


from apache_beam.options.pipeline_options import PipelineOptions

options = PipelineOptions(["--runner=PortableRunner", "--job_endpoint=localhost:8099"])

p = beam.Pipeline(options=options)

(p | beam.Create([1, 2, 3, 4, 5]) | LogElements())

p.run()

这是我运行求职服务的方式:

This is how I run the job service:

gradle:runners:flink:1.8:job-server:runShadow -PflinkMasterUrl = localhost:8081

gradle :runners:flink:1.8:job-server:runShadow -PflinkMasterUrl=localhost:8081

将自动选择Docker来执行Python工具.

Docker is automatically selected for executing the Python harness.

我可以更改用于运行Python容器的图像:

I can change the image used to run the Python container:

options = PipelineOptions([-runner = PortableRunner","--job_endpoint = localhost:8099","--environment_type = DOCKER","--environment_config = beam/python:latest"])

options = PipelineOptions(["--runner=PortableRunner", "--job_endpoint=localhost:8099", "--environment_type=DOCKER", "--environment_config=beam/python:latest"])

我可以禁用Docker并启用ExternalEnvironmentFactory:

I can disable Docker and enable the ExternalEnvironmentFactory:

options = PipelineOptions([-runner = PortableRunner","--job_endpoint = localhost:8099","--environment_type = EXTERNAL","--environment_config = server"])

options = PipelineOptions(["--runner=PortableRunner", "--job_endpoint=localhost:8099", "--environment_type=EXTERNAL", "--environment_config=server"])

但是我必须在 http://server:80 上实现一些回调应答.

but I have to implement some callback answering on http://server:80.

有可用的实现吗?

推荐答案

要回答上述问题,基本上,您想在同一吊舱中将beam_worker_pool容器与flink任务管理器容器一起添加.因此,在用于部署flink任务管理器的yaml文件中,添加一个新容器:

To answer the question above, basically you want to add beam_worker_pool container along side with the flink task manager container in the same pods. So in the yaml file that you use to deploy flink task managers, add a new container:

  - name: beam-worker-pool
    image: apache/beam_python3.7_sdk:2.22.0
    args: ["--worker_pool"]
    ports:
    - containerPort: 50000
      name: pool
    livenessProbe:
      tcpSocket:
        port: 50000
      initialDelaySeconds: 30
      periodSeconds: 60
    volumeMounts:
    - name: flink-config-volume
      mountPath: /opt/flink/conf/
    securityContext:
      runAsUser: 9999

这篇关于如何使用Kubernetes上部署的Flink运行Beam Python管道?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆