如何使用部署在 Kubernetes 上的 Flink 运行 Beam Python 管道? [英] How do I run Beam Python pipelines using Flink deployed on Kubernetes?

查看:20
本文介绍了如何使用部署在 Kubernetes 上的 Flink 运行 Beam Python 管道?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

当 Flink 在 Kubernetes 中作为 Pod 运行时,有人知道如何使用 Flink 运行 Beam Python 管道吗?

Does anybody know how to run Beam Python pipelines with Flink when Flink is running as pods in Kubernetes?

我已经成功地使用便携式运行器和指向在 Docker 容器中运行的本地 Flink 服务器的作业服务成功地运行了一个 Beam Python 管道.

I have successfully managed to run a Beam Python pipeline using the Portable runner and the job service pointing to a local Flink server running in Docker containers.

我能够在我的 Flink 容器中安装 Docker 套接字,并以 root 进程运行 Flink,因此 DockerEnvironmentFactory 类可以创建 Python 线束容器.

I was able to achieve that mounting the Docker socket in my Flink containers, and running Flink as root process, so the class DockerEnvironmentFactory can create the Python harness container.

不幸的是,当 Flink 在 Kubernetes 中运行时,我无法使用相同的解决方案.此外,我不想使用 Pod 中的 Docker 命令创建 Python 线束容器.

Unfortunately, I can't use the same solution when Flink is running in Kubernetes. Moreover, I don't want to create the Python harness container using the Docker command from my pods.

似乎 Bean runner 会自动选择 Docker 来执行 Python 管道.但是,我注意到有一个名为 ExternalEnvironmentFactory 的实现,但我不确定如何使用它.

It seems that Bean runner automatically selects Docker for executing Python pipelines. However, I noticed there is an implementation called ExternalEnvironmentFactory, but I am not sure how to use it.

有没有办法部署一个侧容器并使用不同的工厂来运行 Python 线束进程?正确的做法是什么?

Is there a way to deploy a side container and use a different factory to run the Python harness process? What is the correct approach?

这是 DockerEnvironmentFactory 的补丁:

This is the patch for DockerEnvironmentFactory:

diff -pr beam-release-2.15.0/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/DockerEnvironmentFactory.java beam-release-2.15.0-1/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/DockerEnvironmentFactory.java
*** beam-release-2.15.0/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/DockerEnvironmentFactory.java   2019-08-14 22:33:41.000000000 +0100
--- beam-release-2.15.0-1/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/DockerEnvironmentFactory.java 2019-09-09 16:02:07.000000000 +0100
*************** package org.apache.beam.runners.fnexecut
*** 19,24 ****
--- 19,26 ----

  import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.MoreObjects.firstNonNull;

+ import java.net.InetAddress;
+ import java.net.UnknownHostException;
  import java.nio.file.Files;
  import java.nio.file.Paths;
  import java.time.Duration;
*************** public class DockerEnvironmentFactory im
*** 127,133 ****
          ImmutableList.<String>builder()
              .addAll(gcsCredentialArgs())
              // NOTE: Host networking does not work on Mac, but the command line flag is accepted.
!             .add("--network=host")
              // We need to pass on the information about Docker-on-Mac environment (due to missing
              // host networking on Mac)
              .add("--env=DOCKER_MAC_CONTAINER=" + System.getenv("DOCKER_MAC_CONTAINER"));
--- 129,135 ----
          ImmutableList.<String>builder()
              .addAll(gcsCredentialArgs())
              // NOTE: Host networking does not work on Mac, but the command line flag is accepted.
!             .add("--network=flink")
              // We need to pass on the information about Docker-on-Mac environment (due to missing
              // host networking on Mac)
              .add("--env=DOCKER_MAC_CONTAINER=" + System.getenv("DOCKER_MAC_CONTAINER"));
*************** public class DockerEnvironmentFactory im
*** 222,228 ****

      private static ServerFactory getServerFactory() {
        ServerFactory.UrlFactory dockerUrlFactory =
!           (host, port) -> HostAndPort.fromParts(DOCKER_FOR_MAC_HOST, port).toString();
        if (RUNNING_INSIDE_DOCKER_ON_MAC) {
          // If we're already running in a container, we need to use a fixed port range due to
          // non-existing host networking in Docker-for-Mac. The port range needs to be published
--- 224,230 ----

      private static ServerFactory getServerFactory() {
        ServerFactory.UrlFactory dockerUrlFactory =
!               (host, port) -> HostAndPort.fromParts(getCanonicalHostName(), port).toString();
        if (RUNNING_INSIDE_DOCKER_ON_MAC) {
          // If we're already running in a container, we need to use a fixed port range due to
          // non-existing host networking in Docker-for-Mac. The port range needs to be published
*************** public class DockerEnvironmentFactory im
*** 237,242 ****
--- 239,252 ----
      }
    }

+   private static String getCanonicalHostName() throws RuntimeException {
+     try {
+       return InetAddress.getLocalHost().getCanonicalHostName();
+     } catch (UnknownHostException e) {
+       throw new RuntimeException(e);
+     }
+   }
+
    /** Provider for DockerEnvironmentFactory. */
    public static class Provider implements EnvironmentFactory.Provider {
      private final boolean retainDockerContainer;
*************** public class DockerEnvironmentFactory im
*** 269,275 ****
      public ServerFactory getServerFactory() {
        switch (getPlatform()) {
          case LINUX:
!           return ServerFactory.createDefault();
          case MAC:
            return DockerOnMac.getServerFactory();
          default:
--- 279,286 ----
      public ServerFactory getServerFactory() {
        switch (getPlatform()) {
          case LINUX:
!           return DockerOnMac.getServerFactory();
! //          return ServerFactory.createDefault();
          case MAC:
            return DockerOnMac.getServerFactory();
          default:

这是我用来运行 Flink 的 Docker compose 文件:

This is the Docker compose file I use to run Flink:

version: '3.4'
services:
  jobmanager:
    image: tenx/flink:1.8.1
    command: 'jobmanager'
    environment:
      JOB_MANAGER_RPC_ADDRESS: 'jobmanager'
      DOCKER_MAC_CONTAINER: 1
      FLINK_JM_HEAP: 128
    volumes:
      - jobmanager-data:/data
      - /var/run/docker.sock:/var/run/docker.sock
    ports:
      - target: 8081
        published: 8081
        protocol: tcp
        mode: ingress
    networks:
      - flink
  taskmanager:
    image: tenx/flink:1.8.1
    command: 'taskmanager'
    environment:
      JOB_MANAGER_RPC_ADDRESS: 'jobmanager'
      DOCKER_MAC_CONTAINER: 1
      FLINK_TM_HEAP: 1024
      TASK_MANAGER_NUMBER_OF_TASK_SLOTS: 2
    networks:
      - flink
    volumes:
      - taskmanager-data:/data
      - /var/run/docker.sock:/var/run/docker.sock
      - /var/folders:/var/folders
volumes:
    jobmanager-data:
    taskmanager-data:
networks:
  flink:
    external: true

这是我的 Python 管道:

This is my Python pipeline:

import apache_beam as beam
import logging

class LogElements(beam.PTransform):

    class _LoggingFn(beam.DoFn):

        def __init__(self, prefix=''):
            super(LogElements._LoggingFn, self).__init__()
            self.prefix = prefix

        def process(self, element, **kwargs):
            logging.info(self.prefix + str(element))
            yield element

    def __init__(self, label=None, prefix=''):
        super(LogElements, self).__init__(label)
        self.prefix = prefix

    def expand(self, input):
        input | beam.ParDo(self._LoggingFn(self.prefix))


from apache_beam.options.pipeline_options import PipelineOptions

options = PipelineOptions(["--runner=PortableRunner", "--job_endpoint=localhost:8099"])

p = beam.Pipeline(options=options)

(p | beam.Create([1, 2, 3, 4, 5]) | LogElements())

p.run()

这是我运行作业服务的方式:

This is how I run the job service:

gradle :runners:flink:1.8:job-server:runShadow -PflinkMasterUrl=localhost:8081

gradle :runners:flink:1.8:job-server:runShadow -PflinkMasterUrl=localhost:8081

自动选择 Docker 来执行 Python 工具.

Docker is automatically selected for executing the Python harness.

我可以更改用于运行 Python 容器的图像:

I can change the image used to run the Python container:

options = PipelineOptions(["--runner=PortableRunner", "--job_endpoint=localhost:8099", "--environment_type=DOCKER", "--environment_config=beam/python:latest"])

options = PipelineOptions(["--runner=PortableRunner", "--job_endpoint=localhost:8099", "--environment_type=DOCKER", "--environment_config=beam/python:latest"])

我可以禁用 Docker 并启用 ExternalEnvironmentFactory:

I can disable Docker and enable the ExternalEnvironmentFactory:

options = PipelineOptions(["--runner=PortableRunner", "--job_endpoint=localhost:8099", "--environment_type=EXTERNAL", "--environment_config=server"])

options = PipelineOptions(["--runner=PortableRunner", "--job_endpoint=localhost:8099", "--environment_type=EXTERNAL", "--environment_config=server"])

但我必须在 http://server:80 上实现一些回调应答.

but I have to implement some callback answering on http://server:80.

有可用的实现吗?

推荐答案

我知道它有点过时,但现在有一个适用于 Kubernetes 的 Flink 算子.

I know it is a bit outdated but there is a Flink operator for Kubernetes now.

以下是如何使用运算符通过 Flink 运行 Apache Beam 的示例:

Here are examples how to run Apache Beam with Flink using an operator:

https://github.com/GoogleCloudPlatform/flink-on-k8s-operator/tree/master/examples/beam

这篇关于如何使用部署在 Kubernetes 上的 Flink 运行 Beam Python 管道?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆