如何在Kubernetes环境中使用Spark配置Beam python sdk [英] How to configure beam python sdk with spark in a kubernetes environment

查看:97
本文介绍了如何在Kubernetes环境中使用Spark配置Beam python sdk的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

如何使用"environment_type"配置Apache Beam管道选项.=外部还是过程?

How to configure Apache Beam pipelines options with "environment_type" = EXTERNAL or PROCESS?

目前,我们在 4种不同的方式运行python SDK :

  • "DOCKER"-默认并且在Kubernetes集群中不可能(将使用``容器内的容器'')
  • 回送"-仅用于测试,不能使用超过1个工作人员吊舱
  • 外部"-理想的设置,正好"创建一个Sidecar容器,使其与Spark Workers在同一吊舱中运行
  • 过程"-在spark worker中执行一个过程,虽然不理想,但也可以.
  1. 使用外部"-在同一容器上使用python sdk实现spark worker:

apiVersion: apps/v1
kind: Deployment
metadata:
  name: spark-worker
  labels:
    app: spark-worker
spec:
  selector:
    matchLabels:
      app: spark-worker
  template:
    metadata:
      labels:
        app: spark-worker
    spec:
      containers:
      - name: spark-worker
        image: spark-py-custom:latest
        imagePullPolicy: Never
        ports:
        - containerPort: 8081
          protocol: TCP
        command: ['/bin/bash',"-c","--"]
        args: ["/start-worker.sh" ]
        resources :
          requests :
            cpu : 4
            memory : "5Gi"
          limits :
             cpu : 4
             memory : "5Gi"
        volumeMounts:
          - name: spark-jars
            mountPath: "/tmp"
      - name: python-beam-sdk
        image: apachebeam/python3.7_sdk:latest
        command: ["/opt/apache/beam/boot", "--worker_pool"]
        ports:
        - containerPort: 50000
        resources:
          limits:
            cpu: "1"
            memory: "1Gi"
      volumes:
        - name: spark-jars
          persistentVolumeClaim:
            claimName: spark-jars

还有,如果我们执行命令

And them, if we execute the command

python3 wordcount.py \
--output ./data_test/counts \
--runner=SparkRunner \
--spark_submit_uber_jar \
--spark_job_server_jar=beam-runners-spark-job-server-2.28.0.jar \
--spark_master_url=spark://spark-master:7077 \
--spark_rest_url=http://spark-master:6066 \
--environment_type=EXTERNAL \
--environment_config=localhost:50000

我们的终端处于运行中"状态:

We get a stuck terminal in the state of "RUNNING":

INFO:apache_beam.internal.gcp.auth:Setting socket default timeout to 60 seconds.
INFO:apache_beam.internal.gcp.auth:socket default timeout is 60.0 seconds.
INFO:oauth2client.client:Timeout attempting to reach GCE metadata service.
WARNING:apache_beam.internal.gcp.auth:Unable to find default credentials to use: The Application Default Credentials are not available. They are available if running in Google Compute Engine. Otherwise, the environment variable GOOGLE_APPLICATION_CREDENTIALS must be defined pointing to a file defining the credentials. See https://developers.google.com/accounts/docs/application-default-credentials for more information.
Connecting anonymously.
WARNING:root:Make sure that locally built Python SDK docker image has Python 3.7 interpreter.
INFO:root:Default Python SDK image for environment is apache/beam_python3.7_sdk:2.28.0
INFO:apache_beam.runners.portability.fn_api_runner.translations:==================== <function lift_combiners at 0x7fc360c0b8c8> ====================
INFO:apache_beam.runners.portability.fn_api_runner.translations:==================== <function sort_stages at 0x7fc360c0f048> ====================
INFO:apache_beam.runners.portability.abstract_job_service:Artifact server started on port 36369
INFO:apache_beam.runners.portability.abstract_job_service:Running job 'job-2448721e-e686-41d4-b924-5f8c5ae73ac2'
INFO:apache_beam.runners.portability.spark_uber_jar_job_server:Submitted Spark job with ID driver-20210305172421-0000
INFO:apache_beam.runners.portability.portable_runner:Job state changed to STOPPED
INFO:apache_beam.runners.portability.portable_runner:Job state changed to RUNNING

在spark worker日志中:

And in the spark worker log:

21/03/05 17:24:25 INFO ExecutorRunner: Launch command: "/usr/local/openjdk-8/bin/java" "-cp" "/opt/spark/conf/:/opt/spark/jars/*" "-Xmx1024M" "-Dspark.driver.port=45203" "org.apache.spark.executor.CoarseGrainedExecutorBackend" "--driver-url" "spark://CoarseGrainedScheduler@spark-worker-64fd4ddd6-tqdrs:45203" "--executor-id" "0" "--hostname" "172.18.0.20" "--cores" "3" "--app-id" "app-20210305172425-0000" "--worker-url" "spark://Worker@172.18.0.20:44365"

在python sdk上:

And on the python sdk:

2021/03/05 17:19:52 Starting worker pool 1: python -m apache_beam.runners.worker.worker_pool_main --service_port=50000 --container_executable=/opt/apache/beam/boot
Starting worker with command ['/opt/apache/beam/boot', '--id=1-1', '--logging_endpoint=', '--artifact_endpoint=', '--provision_endpoint=', '--control_endpoint=']
2021/03/05 17:24:32 No logging endpoint provided.

检查spark worker stderr(在localhost 8081上):

Checking the spark worker stderr (on localhost 8081):

Spark Executor Command: "/usr/local/openjdk-8/bin/java" "-cp" "/opt/spark/conf/:/opt/spark/jars/*" "-Xmx1024M" "-Dspark.driver.port=45203" "org.apache.spark.executor.CoarseGrainedExecutorBackend" "--driver-url" "spark://CoarseGrainedScheduler@spark-worker-64fd4ddd6-tqdrs:45203" "--executor-id" "0" "--hostname" "172.18.0.20" "--cores" "3" "--app-id" "app-20210305172425-0000" "--worker-url" "spark://Worker@172.18.0.20:44365"
========================================

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
21/03/05 17:24:26 INFO CoarseGrainedExecutorBackend: Started daemon with process name: 230@spark-worker-64fd4ddd6-tqdrs
21/03/05 17:24:26 INFO SignalUtils: Registered signal handler for TERM
21/03/05 17:24:26 INFO SignalUtils: Registered signal handler for HUP
21/03/05 17:24:26 INFO SignalUtils: Registered signal handler for INT
21/03/05 17:24:27 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
21/03/05 17:24:27 INFO SecurityManager: Changing view acls to: root
21/03/05 17:24:27 INFO SecurityManager: Changing modify acls to: root
21/03/05 17:24:27 INFO SecurityManager: Changing view acls groups to: 
21/03/05 17:24:27 INFO SecurityManager: Changing modify acls groups to: 
21/03/05 17:24:27 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users  with view permissions: Set(root); groups with view permissions: Set(); users  with modify permissions: Set(root); groups with modify permissions: Set()
21/03/05 17:24:27 INFO TransportClientFactory: Successfully created connection to spark-worker-64fd4ddd6-tqdrs/172.18.0.20:45203 after 50 ms (0 ms spent in bootstraps)
21/03/05 17:24:27 INFO SecurityManager: Changing view acls to: root
21/03/05 17:24:27 INFO SecurityManager: Changing modify acls to: root
21/03/05 17:24:27 INFO SecurityManager: Changing view acls groups to: 
21/03/05 17:24:27 INFO SecurityManager: Changing modify acls groups to: 
21/03/05 17:24:27 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users  with view permissions: Set(root); groups with view permissions: Set(); users  with modify permissions: Set(root); groups with modify permissions: Set()
21/03/05 17:24:28 INFO TransportClientFactory: Successfully created connection to spark-worker-64fd4ddd6-tqdrs/172.18.0.20:45203 after 1 ms (0 ms spent in bootstraps)
21/03/05 17:24:28 INFO DiskBlockManager: Created local directory at /tmp/spark-bdffc2b3-f57a-42fa-a720-e22274b86b67/executor-f1eff7ca-d2cd-4ff4-b18b-c8d6a520f590/blockmgr-c61fb65f-ea97-4bd5-bf15-e0025845a251
21/03/05 17:24:28 INFO MemoryStore: MemoryStore started with capacity 366.3 MB
21/03/05 17:24:28 INFO CoarseGrainedExecutorBackend: Connecting to driver: spark://CoarseGrainedScheduler@spark-worker-64fd4ddd6-tqdrs:45203
21/03/05 17:24:28 INFO WorkerWatcher: Connecting to worker spark://Worker@172.18.0.20:44365
21/03/05 17:24:28 INFO TransportClientFactory: Successfully created connection to /172.18.0.20:44365 after 1 ms (0 ms spent in bootstraps)
21/03/05 17:24:28 INFO WorkerWatcher: Successfully connected to spark://Worker@172.18.0.20:44365
21/03/05 17:24:28 INFO CoarseGrainedExecutorBackend: Successfully registered with driver
21/03/05 17:24:28 INFO Executor: Starting executor ID 0 on host 172.18.0.20
21/03/05 17:24:28 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 42561.
21/03/05 17:24:28 INFO NettyBlockTransferService: Server created on 172.18.0.20:42561
21/03/05 17:24:28 INFO BlockManager: Using org.apache.spark.storage.RandomBlockReplicationPolicy for block replication policy
21/03/05 17:24:28 INFO BlockManagerMaster: Registering BlockManager BlockManagerId(0, 172.18.0.20, 42561, None)
21/03/05 17:24:28 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(0, 172.18.0.20, 42561, None)
21/03/05 17:24:28 INFO BlockManager: Initialized BlockManager: BlockManagerId(0, 172.18.0.20, 42561, None)

永远卡住的地方.检查python SDK的源代码我们可以看到未提供日志记录端点"是致命的,这是因为缺少发送给他的配置(没有日志记录/工件/设置/控制端点).如果我尝试添加"--artifact_endpoint"到python命令时,由于作业服务器创建了自己的工件端点,因此我收到了通信失败的grcp错误.在此设置中,有必要使用固定端口配置所有这些终结点(可能将localhost与SDK和worker放在同一个容器中),但是我找不到如何配置它.进行检查,我可以找到相关的问题,但在他的情况下,他得到了python SDK配置自动(可能是火花运行程序问题?)

Where it gets stuck forever. Checking the source code of the python SDK we can see that "no logging endpoint provided" is fatal and it comes from the lack of configuration sent to him (no logging/artifact/provision/control endpoints). If I try to add the "--artifact_endpoint" to the python command I get grcp error of failed communication because the jobserver creates its own artifact endpoint. In this setup would be necessary to configure all these endpoints (probably as localhost as the SDK and the worker are in the same pod) with fixed ports but I can't find how to configure it. Checking SO I can find a related issue but in his case he gets the python SDK configurations automatically (maybe a spark runner issue?)

  1. 使用过程"-尝试在一个进程中运行python SDK,我用 ./gradlew:sdks:python:container:py37:docker 构建了python SDK,复制了sdks/python/container/build/target/launcher/linux_amd64/boot可执行文件到spark worker容器中的/python_sdk/boot并使用以下命令:
  1. Using "PROCESS" - Trying to run the python SDK within a process, I built the python SDK with ./gradlew :sdks:python:container:py37:docker, copied the sdks/python/container/build/target/launcher/linux_amd64/boot executable to /python_sdk/boot inside the spark worker container and used the command:

python3 wordcount.py \
--output ./data_test/counts \
--runner=SparkRunner \
--spark_submit_uber_jar \
--spark_master_url=spark://spark-master:7077 \
--spark_rest_url=http://spark-master:6066 \
--environment_type=PROCESS \
--spark_job_server_jar=beam-runners-spark-job-server-2.28.0.jar \
--environment_config='{"os":"linux","arch":"x84_64","command":"/python_sdk/boot"}'

导致运行时异常";在终端中:

Resulting in "run time exception" in the terminal:

INFO:apache_beam.runners.portability.portable_runner:Job state changed to FAILED
Traceback (most recent call last):
  File "wordcount.py", line 91, in <module>
    run()
  File "wordcount.py", line 86, in run
    output | "Write" >> WriteToText(known_args.output)
  File "/usr/local/lib/python3.7/dist-packages/apache_beam/pipeline.py", line 581, in __exit__
    self.result.wait_until_finish()
  File "/usr/local/lib/python3.7/dist-packages/apache_beam/runners/portability/portable_runner.py", line 608, in wait_until_finish
    raise self._runtime_exception
RuntimeError: Pipeline job-95c13aa5-96ab-4d1d-bc68-7f9d203c8251 failed in state FAILED: unknown error

并再次检查spark stderr worker日志,我可以看到问题是 java.lang.IllegalArgumentException:找不到方案类路径的文件系统,我不知道原因.

and checking again the spark stderr worker log I can see that the problem is java.lang.IllegalArgumentException: No filesystem found for scheme classpath which I don't know the reason.

21/03/05 18:33:12 INFO Executor: Adding file:/opt/spark/work/app-20210305183309-0000/0/./javax.servlet-api-3.1.0.jar to class loader
21/03/05 18:33:12 INFO TorrentBroadcast: Started reading broadcast variable 0
21/03/05 18:33:12 INFO TransportClientFactory: Successfully created connection to spark-worker-89c5c4c87-5q45s/172.18.0.20:34783 after 1 ms (0 ms spent in bootstraps)
21/03/05 18:33:12 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 12.3 KB, free 366.3 MB)
21/03/05 18:33:12 INFO TorrentBroadcast: Reading broadcast variable 0 took 63 ms
21/03/05 18:33:12 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 32.5 KB, free 366.3 MB)
21/03/05 18:33:13 INFO MemoryStore: Block rdd_13_0 stored as values in memory (estimated size 16.0 B, free 366.3 MB)
21/03/05 18:33:13 INFO MemoryStore: Block rdd_17_0 stored as values in memory (estimated size 16.0 B, free 366.3 MB)
21/03/05 18:33:13 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 5427 bytes result sent to driver
21/03/05 18:33:14 ERROR SerializingExecutor: Exception while executing runnable org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1HalfClosed@5f917914
java.lang.IllegalArgumentException: No filesystem found for scheme classpath
    at org.apache.beam.sdk.io.FileSystems.getFileSystemInternal(FileSystems.java:467)
    at org.apache.beam.sdk.io.FileSystems.matchNewResource(FileSystems.java:537)
    at org.apache.beam.runners.fnexecution.artifact.ArtifactRetrievalService.getArtifact(ArtifactRetrievalService.java:125)
    at org.apache.beam.runners.fnexecution.artifact.ArtifactRetrievalService.getArtifact(ArtifactRetrievalService.java:99)
    at org.apache.beam.model.jobmanagement.v1.ArtifactRetrievalServiceGrpc$MethodHandlers.invoke(ArtifactRetrievalServiceGrpc.java:327)
    at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.ServerCalls$UnaryServerCallHandler$UnaryServerCallListener.onHalfClose(ServerCalls.java:172)
    at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.PartialForwardingServerCallListener.onHalfClose(PartialForwardingServerCallListener.java:35)
    at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.ForwardingServerCallListener.onHalfClose(ForwardingServerCallListener.java:23)
    at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.ForwardingServerCallListener$SimpleForwardingServerCallListener.onHalfClose(ForwardingServerCallListener.java:40)
    at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.Contexts$ContextualizedServerCallListener.onHalfClose(Contexts.java:86)
    at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.halfClosed(ServerCallImpl.java:331)
    at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1HalfClosed.runInContext(ServerImpl.java:817)
    at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
    at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
21/03/05 18:33:16 ERROR SerializingExecutor: Exception while executing runnable org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1HalfClosed@67fb2b2c
java.lang.IllegalArgumentException: No filesystem found for scheme classpath
    at org.apache.beam.sdk.io.FileSystems.getFileSystemInternal(FileSystems.java:467)
    at org.apache.beam.sdk.io.FileSystems.matchNewResource(FileSystems.java:537)
    at org.apache.beam.runners.fnexecution.artifact.ArtifactRetrievalService.getArtifact(ArtifactRetrievalService.java:125)
    at org.apache.beam.runners.fnexecution.artifact.ArtifactRetrievalService.getArtifact(ArtifactRetrievalService.java:99)
    at org.apache.beam.model.jobmanagement.v1.ArtifactRetrievalServiceGrpc$MethodHandlers.invoke(ArtifactRetrievalServiceGrpc.java:327)
    at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.ServerCalls$UnaryServerCallHandler$UnaryServerCallListener.onHalfClose(ServerCalls.java:172)
    at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.PartialForwardingServerCallListener.onHalfClose(PartialForwardingServerCallListener.java:35)
    at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.ForwardingServerCallListener.onHalfClose(ForwardingServerCallListener.java:23)
    at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.ForwardingServerCallListener$SimpleForwardingServerCallListener.onHalfClose(ForwardingServerCallListener.java:40)
    at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.Contexts$ContextualizedServerCallListener.onHalfClose(Contexts.java:86)
    at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.halfClosed(ServerCallImpl.java:331)
    at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1HalfClosed.runInContext(ServerImpl.java:817)
    at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
    at org.apache.beam.vendor.grpc.v1p26p0.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
21/03/05 18:33:19 INFO ProcessEnvironmentFactory: Still waiting for startup of environment '/python_sdk/boot' for worker id 1-1

可能缺少某些配置参数.

Probably is missing some configuration parameters.

如果我执行命令

python3 wordcount.py \
--output ./data_test/counts \
--runner=SparkRunner \
--spark_submit_uber_jar \
--spark_job_server_jar=beam-runners-spark-job-server-2.28.0.jar \
--spark_master_url=spark://spark-master:7077 \
--spark_rest_url=http://spark-master:6066 \
--environment_type=LOOPBACK

在我们的火花工人内部(火花集群中只有一个工人),我们拥有完整的工作光束管道,其中这些日志.

inside our spark worker (having only one worker in the spark cluster) we have a full working beam pipeline with these logs.

推荐答案

  1. 使用外部"-这绝对像是Beam中的bug.应该将工作程序端点设置为使用本地主机;我认为无法对其进行配置.我不确定为什么会丢失它们;一种有根据的猜测是,服务器在无提示的情况下无法启动,从而使端点为空.我为此问题提交了错误报告( BEAM-11957 ).
  2. 使用处理"-方案 classpath 对应于
  1. Using "External" - this definitely seems like a bug in Beam. The worker endpoints are supposed to be set up to use localhost; I don't think it is possible to configure them. I'm not sure why they would be missing; one educated guess is that the servers silently fail to start, leaving the endpoints empty. I filed a bug report (BEAM-11957) for this issue.
  2. Using "Process" - The scheme classpath corresponds to ClassLoaderFileSystem. This file system is usually loaded using AutoService, which depends on ClassLoaderFileSystemRegistrar being present on the classpath (no relation to the name of the file system itself). The classpath of the job jar is based on spark_job_server_jar. Where are you getting your beam-runners-spark-job-server-2.28.0.jar from?

这篇关于如何在Kubernetes环境中使用Spark配置Beam python sdk的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆