如何使用Kubernetes上部署的Flink运行Beam Python管道? [英] How do I run Beam Python pipelines using Flink deployed on Kubernetes?
问题描述
当Flink在Kubernetes中作为Pod运行时,有人知道如何通过Flink运行Beam Python管道吗?
Does anybody know how to run Beam Python pipelines with Flink when Flink is running as pods in Kubernetes?
我已经成功地使用可移植运行器和作业服务成功运行了Beam Python管道,该服务指向在Docker容器中运行的本地Flink服务器.
I have successfully managed to run a Beam Python pipeline using the Portable runner and the job service pointing to a local Flink server running in Docker containers.
我能够在Flink容器中安装Docker套接字,并以root进程运行Flink,因此DockerEnvironmentFactory类可以创建Python框架容器.
I was able to achieve that mounting the Docker socket in my Flink containers, and running Flink as root process, so the class DockerEnvironmentFactory can create the Python harness container.
不幸的是,当Flink在Kubernetes中运行时,我不能使用相同的解决方案.而且,我不想使用我的pod中的Docker命令创建Python框架容器.
Unfortunately, I can't use the same solution when Flink is running in Kubernetes. Moreover, I don't want to create the Python harness container using the Docker command from my pods.
似乎BeanRunner会自动选择Docker来执行Python管道.但是,我注意到有一个名为ExternalEnvironmentFactory的实现,但是我不确定如何使用它.
It seems that Bean runner automatically selects Docker for executing Python pipelines. However, I noticed there is an implementation called ExternalEnvironmentFactory, but I am not sure how to use it.
是否有一种方法可以部署侧容器并使用其他工厂来运行Python应用程序进程?正确的方法是什么?
Is there a way to deploy a side container and use a different factory to run the Python harness process? What is the correct approach?
这是DockerEnvironmentFactory的补丁:
This is the patch for DockerEnvironmentFactory:
diff -pr beam-release-2.15.0/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/DockerEnvironmentFactory.java beam-release-2.15.0-1/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/DockerEnvironmentFactory.java
*** beam-release-2.15.0/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/DockerEnvironmentFactory.java 2019-08-14 22:33:41.000000000 +0100
--- beam-release-2.15.0-1/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/DockerEnvironmentFactory.java 2019-09-09 16:02:07.000000000 +0100
*************** package org.apache.beam.runners.fnexecut
*** 19,24 ****
--- 19,26 ----
import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.MoreObjects.firstNonNull;
+ import java.net.InetAddress;
+ import java.net.UnknownHostException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.time.Duration;
*************** public class DockerEnvironmentFactory im
*** 127,133 ****
ImmutableList.<String>builder()
.addAll(gcsCredentialArgs())
// NOTE: Host networking does not work on Mac, but the command line flag is accepted.
! .add("--network=host")
// We need to pass on the information about Docker-on-Mac environment (due to missing
// host networking on Mac)
.add("--env=DOCKER_MAC_CONTAINER=" + System.getenv("DOCKER_MAC_CONTAINER"));
--- 129,135 ----
ImmutableList.<String>builder()
.addAll(gcsCredentialArgs())
// NOTE: Host networking does not work on Mac, but the command line flag is accepted.
! .add("--network=flink")
// We need to pass on the information about Docker-on-Mac environment (due to missing
// host networking on Mac)
.add("--env=DOCKER_MAC_CONTAINER=" + System.getenv("DOCKER_MAC_CONTAINER"));
*************** public class DockerEnvironmentFactory im
*** 222,228 ****
private static ServerFactory getServerFactory() {
ServerFactory.UrlFactory dockerUrlFactory =
! (host, port) -> HostAndPort.fromParts(DOCKER_FOR_MAC_HOST, port).toString();
if (RUNNING_INSIDE_DOCKER_ON_MAC) {
// If we're already running in a container, we need to use a fixed port range due to
// non-existing host networking in Docker-for-Mac. The port range needs to be published
--- 224,230 ----
private static ServerFactory getServerFactory() {
ServerFactory.UrlFactory dockerUrlFactory =
! (host, port) -> HostAndPort.fromParts(getCanonicalHostName(), port).toString();
if (RUNNING_INSIDE_DOCKER_ON_MAC) {
// If we're already running in a container, we need to use a fixed port range due to
// non-existing host networking in Docker-for-Mac. The port range needs to be published
*************** public class DockerEnvironmentFactory im
*** 237,242 ****
--- 239,252 ----
}
}
+ private static String getCanonicalHostName() throws RuntimeException {
+ try {
+ return InetAddress.getLocalHost().getCanonicalHostName();
+ } catch (UnknownHostException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
/** Provider for DockerEnvironmentFactory. */
public static class Provider implements EnvironmentFactory.Provider {
private final boolean retainDockerContainer;
*************** public class DockerEnvironmentFactory im
*** 269,275 ****
public ServerFactory getServerFactory() {
switch (getPlatform()) {
case LINUX:
! return ServerFactory.createDefault();
case MAC:
return DockerOnMac.getServerFactory();
default:
--- 279,286 ----
public ServerFactory getServerFactory() {
switch (getPlatform()) {
case LINUX:
! return DockerOnMac.getServerFactory();
! // return ServerFactory.createDefault();
case MAC:
return DockerOnMac.getServerFactory();
default:
这是我用于运行Flink的Docker组合文件:
This is the Docker compose file I use to run Flink:
version: '3.4'
services:
jobmanager:
image: tenx/flink:1.8.1
command: 'jobmanager'
environment:
JOB_MANAGER_RPC_ADDRESS: 'jobmanager'
DOCKER_MAC_CONTAINER: 1
FLINK_JM_HEAP: 128
volumes:
- jobmanager-data:/data
- /var/run/docker.sock:/var/run/docker.sock
ports:
- target: 8081
published: 8081
protocol: tcp
mode: ingress
networks:
- flink
taskmanager:
image: tenx/flink:1.8.1
command: 'taskmanager'
environment:
JOB_MANAGER_RPC_ADDRESS: 'jobmanager'
DOCKER_MAC_CONTAINER: 1
FLINK_TM_HEAP: 1024
TASK_MANAGER_NUMBER_OF_TASK_SLOTS: 2
networks:
- flink
volumes:
- taskmanager-data:/data
- /var/run/docker.sock:/var/run/docker.sock
- /var/folders:/var/folders
volumes:
jobmanager-data:
taskmanager-data:
networks:
flink:
external: true
这是我的Python管道:
This is my Python pipeline:
import apache_beam as beam
import logging
class LogElements(beam.PTransform):
class _LoggingFn(beam.DoFn):
def __init__(self, prefix=''):
super(LogElements._LoggingFn, self).__init__()
self.prefix = prefix
def process(self, element, **kwargs):
logging.info(self.prefix + str(element))
yield element
def __init__(self, label=None, prefix=''):
super(LogElements, self).__init__(label)
self.prefix = prefix
def expand(self, input):
input | beam.ParDo(self._LoggingFn(self.prefix))
from apache_beam.options.pipeline_options import PipelineOptions
options = PipelineOptions(["--runner=PortableRunner", "--job_endpoint=localhost:8099"])
p = beam.Pipeline(options=options)
(p | beam.Create([1, 2, 3, 4, 5]) | LogElements())
p.run()
这是我运行求职服务的方式:
This is how I run the job service:
gradle:runners:flink:1.8:job-server:runShadow -PflinkMasterUrl = localhost:8081
gradle :runners:flink:1.8:job-server:runShadow -PflinkMasterUrl=localhost:8081
将自动选择Docker来执行Python工具.
Docker is automatically selected for executing the Python harness.
我可以更改用于运行Python容器的图像:
I can change the image used to run the Python container:
options = PipelineOptions([-runner = PortableRunner","--job_endpoint = localhost:8099","--environment_type = DOCKER","--environment_config = beam/python:latest"])
options = PipelineOptions(["--runner=PortableRunner", "--job_endpoint=localhost:8099", "--environment_type=DOCKER", "--environment_config=beam/python:latest"])
我可以禁用Docker并启用ExternalEnvironmentFactory:
I can disable Docker and enable the ExternalEnvironmentFactory:
options = PipelineOptions([-runner = PortableRunner","--job_endpoint = localhost:8099","--environment_type = EXTERNAL","--environment_config = server"])
options = PipelineOptions(["--runner=PortableRunner", "--job_endpoint=localhost:8099", "--environment_type=EXTERNAL", "--environment_config=server"])
但是我必须在 http://server:80 上实现一些回调应答.
but I have to implement some callback answering on http://server:80.
有可用的实现吗?
推荐答案
要回答上述问题,基本上,您想在同一吊舱中将beam_worker_pool容器与flink任务管理器容器一起添加.因此,在用于部署flink任务管理器的yaml文件中,添加一个新容器:
To answer the question above, basically you want to add beam_worker_pool container along side with the flink task manager container in the same pods. So in the yaml file that you use to deploy flink task managers, add a new container:
- name: beam-worker-pool
image: apache/beam_python3.7_sdk:2.22.0
args: ["--worker_pool"]
ports:
- containerPort: 50000
name: pool
livenessProbe:
tcpSocket:
port: 50000
initialDelaySeconds: 30
periodSeconds: 60
volumeMounts:
- name: flink-config-volume
mountPath: /opt/flink/conf/
securityContext:
runAsUser: 9999
这篇关于如何使用Kubernetes上部署的Flink运行Beam Python管道?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!