可以使用 spark 配置来配置 Beam 便携式跑步机吗? [英] It's possible to configure the Beam portable runner with the spark configurations?

查看:30
本文介绍了可以使用 spark 配置来配置 Beam 便携式跑步机吗?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

可以使用 spark 配置来配置 Beam 便携式跑步机吗?更准确地说,是否可以在 Portable Runner 中配置 spark.driver.host?

It's possible to configure the Beam portable runner with the spark configurations? More precisely, it's possible to configure the spark.driver.host in the Portable Runner?

目前,我们在 Kubernetes 集群中实现了气流,为了使用 TensorFlow Extended,我们需要使用 Apache Beam.对于我们的用例,Spark 将是合适的运行器,并且由于气流和 TensorFlow 是用 Python 编码的,我们需要使用 Apache Beam 的便携式运行器(https://beam.apache.org/documentation/runners/spark/#portability).

Currently, we have airflow implemented in a Kubernetes cluster, and aiming to use TensorFlow Extended we need to use Apache beam. For our use case Spark would be the appropriate runner to be used, and as airflow and TensorFlow are coded in python we would need to use the Apache Beam's Portable Runner (https://beam.apache.org/documentation/runners/spark/#portability).

便携式运行器在其容器内创建 spark 上下文,并且不会为驱动程序 DNS 配置留出空间,从而使工作 Pod 内的执行程序无法与驱动程序(作业服务器)通信.

The portable runner creates the spark context inside its container and does not leave space for the driver DNS configuration making the executors inside the worker pods non-communicable to the driver (the job server).

  1. 根据 Beam 文档,在与气流相同的 Pod 中实现了作业 serer,以使用这两个容器之间的本地网络.作业服务器配置:

- name: beam-spark-job-server
  image: apache/beam_spark_job_server:2.27.0
  args: ["--spark-master-url=spark://spark-master:7077"]

作业服务器/气流服务:

Job server/airflow service:

apiVersion: v1
kind: Service
metadata:
  name: airflow-scheduler
  labels:
    app: airflow-k8s
spec:
  type: ClusterIP
  selector:
    app: airflow-scheduler
  ports:
    - port: 8793
      protocol: TCP
      targetPort: 8793
      name: scheduler
    - port: 8099
      protocol: TCP
      targetPort: 8099
      name: job-server
    - port: 7077
      protocol: TCP
      targetPort: 7077
      name: spark-master
    - port: 8098
      protocol: TCP
      targetPort: 8098
      name: artifact
    - port: 8097
      protocol: TCP
      targetPort: 8097
      name: java-expansion

端口 8097、8098 和 8099 与作业服务器相关,8793 与气流相关,7077 与火花主机相关.

The ports 8097,8098 and 8099 are related to the job server, 8793 to airflow, and 7077 to the spark master.

  1. 当测试一个简单的梁示例 python -m apache_beam.examples.wordcount --output ./data_test/--runner=PortableRunner --job_endpoint=localhost:8099 --environment_type=LOOPBACK 来自气流容器 我在气流舱上得到以下响应:
  1. When testing a simple beam example python -m apache_beam.examples.wordcount --output ./data_test/ --runner=PortableRunner --job_endpoint=localhost:8099 --environment_type=LOOPBACK from the airflow container I get the following response on the airflow pod:

Defaulting container name to airflow-scheduler.
Use 'kubectl describe pod/airflow-scheduler-local-f685b5bc7-9d7r6 -n airflow-main-local' to see all of the containers in this pod.
airflow@airflow-scheduler-local-f685b5bc7-9d7r6:/opt/airflow$ python -m apache_beam.examples.wordcount --output ./data_test/ --runner=PortableRunner --job_endpoint=localhost:8099 --environment_type=LOOPBACK
INFO:apache_beam.internal.gcp.auth:Setting socket default timeout to 60 seconds.
INFO:apache_beam.internal.gcp.auth:socket default timeout is 60.0 seconds.
INFO:oauth2client.client:Timeout attempting to reach GCE metadata service.
WARNING:apache_beam.internal.gcp.auth:Unable to find default credentials to use: The Application Default Credentials are not available. They are available if running in Google Compute Engine. Otherwise, the environment variable GOOGLE_APPLICATION_CREDENTIALS must be defined pointing to a file defining the credentials. See https://developers.google.com/accounts/docs/application-default-credentials for more information.
Connecting anonymously.
INFO:apache_beam.runners.worker.worker_pool_main:Listening for workers at localhost:35837
WARNING:root:Make sure that locally built Python SDK docker image has Python 3.7 interpreter.
INFO:root:Default Python SDK image for environment is apache/beam_python3.7_sdk:2.27.0
INFO:apache_beam.runners.portability.portable_runner:Environment "LOOPBACK" has started a component necessary for the execution. Be sure to run the pipeline using
  with Pipeline() as p:
    p.apply(..)
This ensures that the pipeline finishes before this program exits.
INFO:apache_beam.runners.portability.portable_runner:Job state changed to STOPPED
INFO:apache_beam.runners.portability.portable_runner:Job state changed to STARTING
INFO:apache_beam.runners.portability.portable_runner:Job state changed to RUNNING

还有工作日志:

21/02/19 19:50:00 INFO Worker: Asked to launch executor app-20210219194804-0000/47 for BeamApp-root-0219194747-7d7938cf_51452c51-dffe-4c61-bcb7-60c7779e3256
21/02/19 19:50:00 INFO SecurityManager: Changing view acls to: root
21/02/19 19:50:00 INFO SecurityManager: Changing modify acls to: root
21/02/19 19:50:00 INFO SecurityManager: Changing view acls groups to: 
21/02/19 19:50:00 INFO SecurityManager: Changing modify acls groups to: 
21/02/19 19:50:00 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users  with view permissions: Set(root); groups with view permissions: Set(); users  with modify permissions: Set(root); groups with modify permissions: Set()
21/02/19 19:50:00 INFO ExecutorRunner: Launch command: "/usr/local/openjdk-8/bin/java" "-cp" "/opt/spark/conf/:/opt/spark/jars/*" "-Xmx1024M" "-Dspark.driver.port=44447" "org.apache.spark.executor.CoarseGrainedExecutorBackend" "--driver-url" "spark://CoarseGrainedScheduler@airflow-scheduler-local-f685b5bc7-9d7r6:44447" "--executor-id" "47" "--hostname" "172.18.0.3" "--cores" "1" "--app-id" "app-20210219194804-0000" "--worker-url" "spark://Worker@172.18.0.3:35837"
21/02/19 19:50:02 INFO Worker: Executor app-20210219194804-0000/47 finished with state EXITED message Command exited with code 1 exitStatus 1
21/02/19 19:50:02 INFO ExternalShuffleBlockResolver: Clean up non-shuffle files associated with the finished executor 47
21/02/19 19:50:02 INFO ExternalShuffleBlockResolver: Executor is not registered (appId=app-20210219194804-0000, execId=47)
21/02/19 19:50:02 INFO Worker: Asked to launch executor app-20210219194804-0000/48 for BeamApp-root-0219194747-7d7938cf_51452c51-dffe-4c61-bcb7-60c7779e3256
21/02/19 19:50:02 INFO SecurityManager: Changing view acls to: root
21/02/19 19:50:02 INFO SecurityManager: Changing modify acls to: root
21/02/19 19:50:02 INFO SecurityManager: Changing view acls groups to: 
21/02/19 19:50:02 INFO SecurityManager: Changing modify acls groups to: 
21/02/19 19:50:02 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users  with view permissions: Set(root); groups with view permissions: Set(); users  with modify permissions: Set(root); groups with modify permissions: Set()
21/02/19 19:50:02 INFO ExecutorRunner: Launch command: "/usr/local/openjdk-8/bin/java" "-cp" "/opt/spark/conf/:/opt/spark/jars/*" "-Xmx1024M" "-Dspark.driver.port=44447" "org.apache.spark.executor.CoarseGrainedExecutorBackend" "--driver-url" "spark://CoarseGrainedScheduler@airflow-scheduler-local-f685b5bc7-9d7r6:44447" "--executor-id" "48" "--hostname" "172.18.0.3" "--cores" "1" "--app-id" "app-20210219194804-0000" "--worker-url" "spark://Worker@172.18.0.3:35837"
21/02/19 19:50:04 INFO Worker: Executor app-20210219194804-0000/48 finished with state EXITED message Command exited with code 1 exitStatus 1
21/02/19 19:50:04 INFO ExternalShuffleBlockResolver: Clean up non-shuffle files associated with the finished executor 48
21/02/19 19:50:04 INFO ExternalShuffleBlockResolver: Executor is not registered (appId=app-20210219194804-0000, execId=48)
21/02/19 19:50:04 INFO Worker: Asked to launch executor app-20210219194804-0000/49 for BeamApp-root-0219194747-7d7938cf_51452c51-dffe-4c61-bcb7-60c7779e3256
21/02/19 19:50:04 INFO SecurityManager: Changing view acls to: root
21/02/19 19:50:04 INFO SecurityManager: Changing modify acls to: root
21/02/19 19:50:04 INFO SecurityManager: Changing view acls groups to: 
21/02/19 19:50:04 INFO SecurityManager: Changing modify acls groups to: 
21/02/19 19:50:04 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users  with view permissions: Set(root); groups with view permissions: Set(); users  with modify permissions: Set(root); groups with modify permissions: Set()
21/02/19 19:50:04 INFO ExecutorRunner: Launch command: "/usr/local/openjdk-8/bin/java" "-cp" "/opt/spark/conf/:/opt/spark/jars/*" "-Xmx1024M" "-Dspark.driver.port=44447" "org.apache.spark.executor.CoarseGrainedExecutorBackend" "--driver-url" "spark://CoarseGrainedScheduler@airflow-scheduler-local-f685b5bc7-9d7r6:44447" "--executor-id" "49" "--hostname" "172.18.0.3" "--cores" "1" "--app-id" "app-20210219194804-0000" "--worker-url" "spark://Worker@172.18.0.3:35837"
.
.
.

正如我们所见,执行程序不断退出,据我所知,这个问题是由执行程序和驱动程序(在本例中为作业服务器)之间缺少通信造成的.此外,--driver-url"使用随机端口-Dspark.driver.port"转换为驱动程序 pod 名称.由于我们无法定义服务的名称,worker 尝试使用来自驱动程序的原始名称并使用随机生成的端口.由于配置来自驱动程序,更改 worker/master 中的默认 conf 文件不会产生任何结果.使用 这个答案 例如,我尝试在作业服务器中使用环境变量 SPARK_PUBLIC_DNS 但这并没有导致工作日志发生任何变化.

As we can see, the executor is being exited constantly, and by what I know this issue is created by the missing communication between the executor and the driver (the job server in this case). Also, the "--driver-url" is translated to the driver pod name using the random port "-Dspark.driver.port". As we can't define the name of the service, the worker tries to use the original name from the driver and to use a randomly generated port. As the configuration comes from the driver, changing the default conf files in the worker/master doesn't create any results. Using this answer as an example, I tried to use the env variable SPARK_PUBLIC_DNS in the job server but this didn't result in any changes in the worker logs.

在kubernetes中直接使用spark jobkubectl run spark-base --rm -it --labels="app=spark-client";--image bde2020/spark-base:2.4.5-hadoop2.7 -- bash ./spark/bin/pyspark --master spark://spark-master:7077 --conf spark.driver.host=spark-client有服务:

Using directly in kubernetes a spark job kubectl run spark-base --rm -it --labels="app=spark-client" --image bde2020/spark-base:2.4.5-hadoop2.7 -- bash ./spark/bin/pyspark --master spark://spark-master:7077 --conf spark.driver.host=spark-client having the service:

apiVersion: v1
kind: Service
metadata:
  name: spark-client
spec:
  selector:
    app: spark-client
  clusterIP: None

我得到了一个完整的 pyspark shell.如果我省略 --conf 参数,我会得到与第一次设置相同的行为(无限期退出执行程序)

I get a full working pyspark shell. If I omit the --conf parameter I get the same behavior as the first setup (exiting executors indefinitely)

21/02/19 20:21:02 INFO Worker: Executor app-20210219202050-0002/4 finished with state EXITED message Command exited with code 1 exitStatus 1
21/02/19 20:21:02 INFO ExternalShuffleBlockResolver: Clean up non-shuffle files associated with the finished executor 4
21/02/19 20:21:02 INFO ExternalShuffleBlockResolver: Executor is not registered (appId=app-20210219202050-0002, execId=4)
21/02/19 20:21:02 INFO Worker: Asked to launch executor app-20210219202050-0002/5 for Spark shell
21/02/19 20:21:02 INFO SecurityManager: Changing view acls to: root
21/02/19 20:21:02 INFO SecurityManager: Changing modify acls to: root
21/02/19 20:21:02 INFO SecurityManager: Changing view acls groups to: 
21/02/19 20:21:02 INFO SecurityManager: Changing modify acls groups to: 
21/02/19 20:21:02 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users  with view permissions: Set(root); groups with view permissions: Set(); users  with modify permissions: Set(root); groups with modify permissions: Set()
21/02/19 20:21:02 INFO ExecutorRunner: Launch command: "/usr/local/openjdk-8/bin/java" "-cp" "/opt/spark/conf/:/opt/spark/jars/*" "-Xmx1024M" "-Dspark.driver.port=46161" "org.apache.spark.executor.CoarseGrainedExecutorBackend" "--driver-url" "spark://CoarseGrainedScheduler@spark-base:46161" "--executor-id" "5" "--hostname" "172.18.0.20" "--cores" "1" "--app-id" "app-20210219202050-0002" "--worker-url" "spark://Worker@172.18.0.20:45151"

推荐答案

我有三种解决方案可供选择,具体取决于您的部署要求.按难度排序:

I have three solutions to choose from depending on your deployment requirements. In order of difficulty:

  1. 使用 Spark超级罐"作业服务器.这会在 Spark 主服务器中启动一个嵌入式作业服务器,而不是在容器中使用独立的作业服务器.这将大大简化您的部署,因为您根本不需要启动 beam_spark_job_server 容器.

python -m apache_beam.examples.wordcount \
--output ./data_test/ \
--runner=SparkRunner \
--spark_submit_uber_jar \
--spark_master_url=spark://spark-master:7077 \
--environment_type=LOOPBACK

  1. 您可以通过 Spark 配置文件传递属性.创建 Spark 配置文件,并添加 spark.driver.host 和您需要的任何其他属性.在作业服务器的 docker run 命令中,将该配置文件挂载到容器,并将 SPARK_CONF_DIR 环境变量设置为指向该目录.

  1. You can pass the properties through a Spark configuration file. Create the Spark configuration file, and add spark.driver.host and whatever other properties you need. In the docker run command for the job server, mount that configuration file to the container, and set the SPARK_CONF_DIR environment variable to point to that directory.

如果这些都不适合您,您也可以构建自己的自定义版本的作业服务器容器.从 Github 拉取 Beam 源.查看您要使用的发布分支(例如 git checkout origin/release-2.28.0).修改入口点 spark-job-server.sh 并在那里设置 -Dspark.driver.host=x.然后使用 ./gradlew :runners:spark:job-server:container:docker -Pdocker-repository-root=your-repo"构建容器-Pdocker-tag="你的标签".

If that neither of those work for you, you can alternatively build your own customized version of the job server container. Pull Beam source from Github. Check out the release branch you want to use (e.g. git checkout origin/release-2.28.0). Modify the entrypoint spark-job-server.sh and set -Dspark.driver.host=x there. Then build the container using ./gradlew :runners:spark:job-server:container:docker -Pdocker-repository-root="your-repo" -Pdocker-tag="your-tag".

这篇关于可以使用 spark 配置来配置 Beam 便携式跑步机吗?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆