可以将Beam便携式流道配置为带有火花配置吗? [英] It's possible to configure the Beam portable runner with the spark configurations?

查看:58
本文介绍了可以将Beam便携式流道配置为带有火花配置吗?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

可以用火花配置对Beam便携式流道进行配置吗?更准确地说,是否可以在Portable Runner中配置 spark.driver.host ?

It's possible to configure the Beam portable runner with the spark configurations? More precisely, it's possible to configure the spark.driver.host in the Portable Runner?

当前,我们在Kubernetes集群中实现了气流,并且要使用TensorFlow Extended,我们需要使用Apache Beam.对于我们的用例,Spark将是合适的运行器,并且气流和TensorFlow是用python编码的,因此我们需要使用Apache Beam的Portable Runner(

Currently, we have airflow implemented in a Kubernetes cluster, and aiming to use TensorFlow Extended we need to use Apache beam. For our use case Spark would be the appropriate runner to be used, and as airflow and TensorFlow are coded in python we would need to use the Apache Beam's Portable Runner (https://beam.apache.org/documentation/runners/spark/#portability).

便携式运行程序在其容器内创建spark上下文,并且不会为驱动程序DNS配置留出空间,从而使工作区中的执行程序与驱动程序(作业服务器)不可通信.

The portable runner creates the spark context inside its container and does not leave space for the driver DNS configuration making the executors inside the worker pods non-communicable to the driver (the job server).

  1. 根据射线记录,在与气流相同的吊舱中实施了作业伺服器,以使用这两个容器之间的本地网络.作业服务器配置:

- name: beam-spark-job-server
  image: apache/beam_spark_job_server:2.27.0
  args: ["--spark-master-url=spark://spark-master:7077"]

作业服务器/气流服务:

Job server/airflow service:

apiVersion: v1
kind: Service
metadata:
  name: airflow-scheduler
  labels:
    app: airflow-k8s
spec:
  type: ClusterIP
  selector:
    app: airflow-scheduler
  ports:
    - port: 8793
      protocol: TCP
      targetPort: 8793
      name: scheduler
    - port: 8099
      protocol: TCP
      targetPort: 8099
      name: job-server
    - port: 7077
      protocol: TCP
      targetPort: 7077
      name: spark-master
    - port: 8098
      protocol: TCP
      targetPort: 8098
      name: artifact
    - port: 8097
      protocol: TCP
      targetPort: 8097
      name: java-expansion

端口8097、8098和8099与作业服务器相关,端口8793与气流相关,端口7077与Spark Master相关.

The ports 8097,8098 and 8099 are related to the job server, 8793 to airflow, and 7077 to the spark master.

  1. 在测试一个简单的梁示例时 python -m apache_beam.examples.wordcount --output ./data_test/--runner = PortableRunner --job_endpoint = localhost:8099 --environment_type = LOOPBACK 气流容器我在气流舱上得到以下响应:
  1. When testing a simple beam example python -m apache_beam.examples.wordcount --output ./data_test/ --runner=PortableRunner --job_endpoint=localhost:8099 --environment_type=LOOPBACK from the airflow container I get the following response on the airflow pod:

Defaulting container name to airflow-scheduler.
Use 'kubectl describe pod/airflow-scheduler-local-f685b5bc7-9d7r6 -n airflow-main-local' to see all of the containers in this pod.
airflow@airflow-scheduler-local-f685b5bc7-9d7r6:/opt/airflow$ python -m apache_beam.examples.wordcount --output ./data_test/ --runner=PortableRunner --job_endpoint=localhost:8099 --environment_type=LOOPBACK
INFO:apache_beam.internal.gcp.auth:Setting socket default timeout to 60 seconds.
INFO:apache_beam.internal.gcp.auth:socket default timeout is 60.0 seconds.
INFO:oauth2client.client:Timeout attempting to reach GCE metadata service.
WARNING:apache_beam.internal.gcp.auth:Unable to find default credentials to use: The Application Default Credentials are not available. They are available if running in Google Compute Engine. Otherwise, the environment variable GOOGLE_APPLICATION_CREDENTIALS must be defined pointing to a file defining the credentials. See https://developers.google.com/accounts/docs/application-default-credentials for more information.
Connecting anonymously.
INFO:apache_beam.runners.worker.worker_pool_main:Listening for workers at localhost:35837
WARNING:root:Make sure that locally built Python SDK docker image has Python 3.7 interpreter.
INFO:root:Default Python SDK image for environment is apache/beam_python3.7_sdk:2.27.0
INFO:apache_beam.runners.portability.portable_runner:Environment "LOOPBACK" has started a component necessary for the execution. Be sure to run the pipeline using
  with Pipeline() as p:
    p.apply(..)
This ensures that the pipeline finishes before this program exits.
INFO:apache_beam.runners.portability.portable_runner:Job state changed to STOPPED
INFO:apache_beam.runners.portability.portable_runner:Job state changed to STARTING
INFO:apache_beam.runners.portability.portable_runner:Job state changed to RUNNING

工作人员日志:

21/02/19 19:50:00 INFO Worker: Asked to launch executor app-20210219194804-0000/47 for BeamApp-root-0219194747-7d7938cf_51452c51-dffe-4c61-bcb7-60c7779e3256
21/02/19 19:50:00 INFO SecurityManager: Changing view acls to: root
21/02/19 19:50:00 INFO SecurityManager: Changing modify acls to: root
21/02/19 19:50:00 INFO SecurityManager: Changing view acls groups to: 
21/02/19 19:50:00 INFO SecurityManager: Changing modify acls groups to: 
21/02/19 19:50:00 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users  with view permissions: Set(root); groups with view permissions: Set(); users  with modify permissions: Set(root); groups with modify permissions: Set()
21/02/19 19:50:00 INFO ExecutorRunner: Launch command: "/usr/local/openjdk-8/bin/java" "-cp" "/opt/spark/conf/:/opt/spark/jars/*" "-Xmx1024M" "-Dspark.driver.port=44447" "org.apache.spark.executor.CoarseGrainedExecutorBackend" "--driver-url" "spark://CoarseGrainedScheduler@airflow-scheduler-local-f685b5bc7-9d7r6:44447" "--executor-id" "47" "--hostname" "172.18.0.3" "--cores" "1" "--app-id" "app-20210219194804-0000" "--worker-url" "spark://Worker@172.18.0.3:35837"
21/02/19 19:50:02 INFO Worker: Executor app-20210219194804-0000/47 finished with state EXITED message Command exited with code 1 exitStatus 1
21/02/19 19:50:02 INFO ExternalShuffleBlockResolver: Clean up non-shuffle files associated with the finished executor 47
21/02/19 19:50:02 INFO ExternalShuffleBlockResolver: Executor is not registered (appId=app-20210219194804-0000, execId=47)
21/02/19 19:50:02 INFO Worker: Asked to launch executor app-20210219194804-0000/48 for BeamApp-root-0219194747-7d7938cf_51452c51-dffe-4c61-bcb7-60c7779e3256
21/02/19 19:50:02 INFO SecurityManager: Changing view acls to: root
21/02/19 19:50:02 INFO SecurityManager: Changing modify acls to: root
21/02/19 19:50:02 INFO SecurityManager: Changing view acls groups to: 
21/02/19 19:50:02 INFO SecurityManager: Changing modify acls groups to: 
21/02/19 19:50:02 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users  with view permissions: Set(root); groups with view permissions: Set(); users  with modify permissions: Set(root); groups with modify permissions: Set()
21/02/19 19:50:02 INFO ExecutorRunner: Launch command: "/usr/local/openjdk-8/bin/java" "-cp" "/opt/spark/conf/:/opt/spark/jars/*" "-Xmx1024M" "-Dspark.driver.port=44447" "org.apache.spark.executor.CoarseGrainedExecutorBackend" "--driver-url" "spark://CoarseGrainedScheduler@airflow-scheduler-local-f685b5bc7-9d7r6:44447" "--executor-id" "48" "--hostname" "172.18.0.3" "--cores" "1" "--app-id" "app-20210219194804-0000" "--worker-url" "spark://Worker@172.18.0.3:35837"
21/02/19 19:50:04 INFO Worker: Executor app-20210219194804-0000/48 finished with state EXITED message Command exited with code 1 exitStatus 1
21/02/19 19:50:04 INFO ExternalShuffleBlockResolver: Clean up non-shuffle files associated with the finished executor 48
21/02/19 19:50:04 INFO ExternalShuffleBlockResolver: Executor is not registered (appId=app-20210219194804-0000, execId=48)
21/02/19 19:50:04 INFO Worker: Asked to launch executor app-20210219194804-0000/49 for BeamApp-root-0219194747-7d7938cf_51452c51-dffe-4c61-bcb7-60c7779e3256
21/02/19 19:50:04 INFO SecurityManager: Changing view acls to: root
21/02/19 19:50:04 INFO SecurityManager: Changing modify acls to: root
21/02/19 19:50:04 INFO SecurityManager: Changing view acls groups to: 
21/02/19 19:50:04 INFO SecurityManager: Changing modify acls groups to: 
21/02/19 19:50:04 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users  with view permissions: Set(root); groups with view permissions: Set(); users  with modify permissions: Set(root); groups with modify permissions: Set()
21/02/19 19:50:04 INFO ExecutorRunner: Launch command: "/usr/local/openjdk-8/bin/java" "-cp" "/opt/spark/conf/:/opt/spark/jars/*" "-Xmx1024M" "-Dspark.driver.port=44447" "org.apache.spark.executor.CoarseGrainedExecutorBackend" "--driver-url" "spark://CoarseGrainedScheduler@airflow-scheduler-local-f685b5bc7-9d7r6:44447" "--executor-id" "49" "--hostname" "172.18.0.3" "--cores" "1" "--app-id" "app-20210219194804-0000" "--worker-url" "spark://Worker@172.18.0.3:35837"
.
.
.

我们可以看到,执行器不断退出,而据我所知,此问题是由执行器和驱动程序(在本例中为作业服务器)之间缺少通信造成的.此外,-driver-url"使用随机端口"-Dspark.driver.port"将"R"转换为驾驶员吊舱名称.由于我们无法定义服务的名称,因此工作程序尝试使用驱动程序中的原始名称并使用随机生成的端口.由于配置来自驱动程序,因此更改worker/master中的默认conf文件不会产生任何结果.使用此答案例如,我尝试在作业服务器中使用环境变量 SPARK_PUBLIC_DNS ,但这并没有导致工作日志中的任何更改.

As we can see, the executor is being exited constantly, and by what I know this issue is created by the missing communication between the executor and the driver (the job server in this case). Also, the "--driver-url" is translated to the driver pod name using the random port "-Dspark.driver.port". As we can't define the name of the service, the worker tries to use the original name from the driver and to use a randomly generated port. As the configuration comes from the driver, changing the default conf files in the worker/master doesn't create any results. Using this answer as an example, I tried to use the env variable SPARK_PUBLIC_DNS in the job server but this didn't result in any changes in the worker logs.

直接在kubernetes中使用Spark工作 kubectl运行spark-base --rm -it --labels ="app = spark-client"--image bde2020/spark-base:2.4.5-hadoop2.7-bash ./spark/bin/pyspark --master spark://spark-master:7077 --conf spark.driver.host = spark-client拥有服务:

Using directly in kubernetes a spark job kubectl run spark-base --rm -it --labels="app=spark-client" --image bde2020/spark-base:2.4.5-hadoop2.7 -- bash ./spark/bin/pyspark --master spark://spark-master:7077 --conf spark.driver.host=spark-client having the service:

apiVersion: v1
kind: Service
metadata:
  name: spark-client
spec:
  selector:
    app: spark-client
  clusterIP: None

我得到了一个可以正常使用的pyspark shell.如果我省略--conf参数,则会得到与第一次设置相同的行为(无限期退出执行程序)

I get a full working pyspark shell. If I omit the --conf parameter I get the same behavior as the first setup (exiting executors indefinitely)

21/02/19 20:21:02 INFO Worker: Executor app-20210219202050-0002/4 finished with state EXITED message Command exited with code 1 exitStatus 1
21/02/19 20:21:02 INFO ExternalShuffleBlockResolver: Clean up non-shuffle files associated with the finished executor 4
21/02/19 20:21:02 INFO ExternalShuffleBlockResolver: Executor is not registered (appId=app-20210219202050-0002, execId=4)
21/02/19 20:21:02 INFO Worker: Asked to launch executor app-20210219202050-0002/5 for Spark shell
21/02/19 20:21:02 INFO SecurityManager: Changing view acls to: root
21/02/19 20:21:02 INFO SecurityManager: Changing modify acls to: root
21/02/19 20:21:02 INFO SecurityManager: Changing view acls groups to: 
21/02/19 20:21:02 INFO SecurityManager: Changing modify acls groups to: 
21/02/19 20:21:02 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users  with view permissions: Set(root); groups with view permissions: Set(); users  with modify permissions: Set(root); groups with modify permissions: Set()
21/02/19 20:21:02 INFO ExecutorRunner: Launch command: "/usr/local/openjdk-8/bin/java" "-cp" "/opt/spark/conf/:/opt/spark/jars/*" "-Xmx1024M" "-Dspark.driver.port=46161" "org.apache.spark.executor.CoarseGrainedExecutorBackend" "--driver-url" "spark://CoarseGrainedScheduler@spark-base:46161" "--executor-id" "5" "--hostname" "172.18.0.20" "--cores" "1" "--app-id" "app-20210219202050-0002" "--worker-url" "spark://Worker@172.18.0.20:45151"

推荐答案

我有三种解决方案可供选择,具体取决于您的部署要求.按照难度顺序:

I have three solutions to choose from depending on your deployment requirements. In order of difficulty:

  1. 使用Spark超级罐"作业服务器.这将在Spark主服务器内部启动嵌入式作业服务器,而不是在容器中使用独立的作业服务器.这将大大简化您的部署,因为您根本不需要启动 beam_spark_job_server 容器.

python -m apache_beam.examples.wordcount \
--output ./data_test/ \
--runner=SparkRunner \
--spark_submit_uber_jar \
--spark_master_url=spark://spark-master:7077 \
--environment_type=LOOPBACK

  1. 您可以通过Spark配置文件传递属性.创建Spark配置文件,并添加 spark.driver.host 以及所需的其他任何属性.在作业服务器的 docker run 命令中,将该配置文件安装到容器,然后将 SPARK_CONF_DIR 环境变量设置为指向该目录.

  1. You can pass the properties through a Spark configuration file. Create the Spark configuration file, and add spark.driver.host and whatever other properties you need. In the docker run command for the job server, mount that configuration file to the container, and set the SPARK_CONF_DIR environment variable to point to that directory.

如果这些都不适合您,则可以选择构建自己的自定义版本的作业服务器容器.从Github拉光束源.检出您要使用的发行分支(例如 git checkout origin/release-2.28.0 ).修改入口点 spark-job-server.sh 并在此处设置 -Dspark.driver.host = x .然后使用 ./gradlew:runners:spark:job-server:container:docker -Pdocker-repository-root ="your-repo"来构建容器.-Pdocker-tag =您的标签" .

If that neither of those work for you, you can alternatively build your own customized version of the job server container. Pull Beam source from Github. Check out the release branch you want to use (e.g. git checkout origin/release-2.28.0). Modify the entrypoint spark-job-server.sh and set -Dspark.driver.host=x there. Then build the container using ./gradlew :runners:spark:job-server:container:docker -Pdocker-repository-root="your-repo" -Pdocker-tag="your-tag".

这篇关于可以将Beam便携式流道配置为带有火花配置吗?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆