在Spark上运行python Apache Beam Pipeline [英] Running a python Apache Beam Pipeline on Spark

查看:93
本文介绍了在Spark上运行python Apache Beam Pipeline的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我在这里尝试使用apache beam(使用python sdk),所以我创建了一个简单的管道,并尝试将其部署在Spark集群上.

I am giving apache beam (with python sdk) a try here so I created a simple pipeline and I tried to deploy it on a Spark cluster.

from apache_beam.options.pipeline_options import PipelineOptions
import apache_beam as beam

op = PipelineOptions([
        "--runner=DirectRunner"
    ]
)


with beam.Pipeline(options=op) as p:
    p | beam.Create([1, 2, 3]) | beam.Map(lambda x: x+1) | beam.Map(print)

此管道与DirectRunner配合良好.因此,要在Spark上部署相同的代码(因为可移植性是Beam中的关键概念)...

This pipeline is working well with DirectRunner. So to deploy the same code on Spark (as the portability is a key concept in Beam)...

首先,我在此处中编辑了 PipelineOptions >:

First I edited the PipelineOptions as mentioned here:

op = PipelineOptions([
        "--runner=PortableRunner",
        "--job_endpoint=localhost:8099",
        "--environment_type=LOOPBACK"
    ]
)

job_endpoint 我使用以下命令运行的Beam Spark作业服务器:

docker run --net=host apache/beam_spark_job_server:latest --spark-master-url=spark://SPARK_URL:SPARK_PORT

这应该可以正常工作,但是作业在Spark上失败,并显示以下错误:

This is supposed to work well but the job fails on Spark with this error :

20/10/31 14:35:58 ERROR TransportRequestHandler: Error while invoking RpcHandler#receive() for one-way message.

java.io.InvalidClassException: org.apache.spark.deploy.ApplicationDescription; local class incompatible: stream classdesc serialVersionUID = 6543101073799644159, local class serialVersionUID = 1574364215946805297

此外,我在 beam_spark_job_server 日志中有此警告:

Also, I have this WARN in the beam_spark_job_server logs:

WARN org.apache.beam.runners.spark.translation.SparkContextFactory: Creating a new Spark Context.

你知道这里的问题在哪里吗?还有其他方法可以在不经过容器化服务的情况下在火花上运行python Beam Pipelines吗?

Any idea where is the problem here? Is there any other way to run python Beam Pipelines on spark without passing by a containerized service ?

推荐答案

之所以会发生这种情况,是因为作业服务器中包含的Spark客户端版本与您要向其提交作业的Spark版本之间存在版本不匹配.

This could happen due to a version mismatch between the version of the Spark client contained in the job server and the version of Spark to which you are submitting the job.

这篇关于在Spark上运行python Apache Beam Pipeline的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆