结构化流式Kafka 2.1-> Zeppelin 0.8-> Spark 2.4:Spark不使用jar [英] structured streaming Kafka 2.1->Zeppelin 0.8->Spark 2.4: spark does not use jar

查看:290
本文介绍了结构化流式Kafka 2.1-> Zeppelin 0.8-> Spark 2.4:Spark不使用jar的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个Kafka 2.1消息代理,想对Spark 2.4中的消息数据进行一些处理.我想使用Zeppelin 0.8.1笔记本进行快速原型制作.

I have a Kafka 2.1 message broker and want to do some processing with data of the messages within Spark 2.4. I want to use Zeppelin 0.8.1 notebooks for rapid prototyping.

我下载了spark-streaming-kafka-0-10_2.11.jar,它是结构化流式传输所必需的(

I downloaded the spark-streaming-kafka-0-10_2.11.jar that is necessarry for structured streaming (http://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html) and added it as "Dependencies-artifact" to the "spark"-interpreter of Zeppelin (that also deals with the %pyspark paragraphs). I restarted this interpreter (and also zeppelin).

我还在第一个笔记本段落中加载了jar(我首先认为这是不必要的...):

I also loaded the jar in a first notebook paragraph (I first thought that this should not be necessary...):

%dep z.load("/usr/local/analyse/jar/spark-streaming-kafka-0-10_2.11.jar")
res0: org.apache.zeppelin.dep.Dependency = org.apache.zeppelin.dep.Dependency@2b65d5

因此,我没有出现任何错误,因此加载似乎可以正常进行.现在,我要进行测试,kafka服务器使用该端口在同一台计算机上运行,​​并且还有一个主题"test":

So, I got no error so the loading seems to work. Now, I want to do the testing, the kafka server runs on the same machine using this port and there is also a topic "test":

%pyspark
# Subscribe to a topic
df = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "localhost:9092") \
  .option("subscribe", "test") \
  .load()

但是我得到了错误

无法执行第6行:.option("subscribe","test")\ Traceback (最近通话最近):文件 "/usr/local/analyse/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", 第63行,在装饰中 返回f(* a,** kw)文件"/usr/local/analyse/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", 第328行,位于get_return_value中 格式(target_id,.",名称),值)py4j.protocol.Py4JJavaError:调用o120.load时发生错误. : org.apache.spark.sql.AnalysisException:无法找到数据源: 卡夫卡.请按照的部署"部分部署应用程序 《结构化流+ Kafka集成指南》.在 org.apache.spark.sql.execution.datasources.DataSource $ .lookupDataSource(DataSource.scala:652) 在 org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:161) 在sun.reflect.NativeMethodAccessorImpl.invoke0(本机方法)处 sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 在 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 在java.lang.reflect.Method.invoke(Method.java:498)在 py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)在 py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)在 py4j.Gateway.invoke(Gateway.java:282)在 py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) 在py4j.commands.CallCommand.execute(CallCommand.java:79)处 py4j.GatewayConnection.run(GatewayConnection.java:238)在 java.lang.Thread.run(Thread.java:748)

Fail to execute line 6: .option("subscribe", "test") \ Traceback (most recent call last): File "/usr/local/analyse/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 63, in deco return f(*a, **kw) File "/usr/local/analyse/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, in get_return_value format(target_id, ".", name), value) py4j.protocol.Py4JJavaError: An error occurred while calling o120.load. : org.apache.spark.sql.AnalysisException: Failed to find data source: kafka. Please deploy the application as per the deployment section of "Structured Streaming + Kafka Integration Guide".; at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:652) at org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:161) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at py4j.Gateway.invoke(Gateway.java:282) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:238) at java.lang.Thread.run(Thread.java:748)

在处理上述异常期间,发生了另一个异常:

During handling of the above exception, another exception occurred:

回溯(最近通话最近):文件 "/tmp/zeppelin_pyspark-312826888257172172599.py",第380行,在 exec(code,_zcUserQueryNameSpace)文件",文件中的第6行 "/usr/local/analyse/spark/python/lib/pyspark.zip/pyspark/sql/streaming.py", 负载中的第400行 返回self._df(self._jreader.load())文件"/usr/local/analyse/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", 第1257行,在致电中 答案,self.gateway_client,self.target_id,self.name)文件"/usr/local/analyse/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", 69行,在装饰中 引发AnalysisException(s.split(':',1)[1],stackTrace)pyspark.sql.utils.AnalysisException:'未能找到数据源: 卡夫卡.请按照的部署"部分部署应用程序 《结构化流+ Kafka集成指南》.

Traceback (most recent call last): File "/tmp/zeppelin_pyspark-312826888257172599.py", line 380, in exec(code, _zcUserQueryNameSpace) File "", line 6, in File "/usr/local/analyse/spark/python/lib/pyspark.zip/pyspark/sql/streaming.py", line 400, in load return self._df(self._jreader.load()) File "/usr/local/analyse/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in call answer, self.gateway_client, self.target_id, self.name) File "/usr/local/analyse/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 69, in deco raise AnalysisException(s.split(': ', 1)[1], stackTrace) pyspark.sql.utils.AnalysisException: 'Failed to find data source: kafka. Please deploy the application as per the deployment section of "Structured Streaming + Kafka Integration Guide".;'

我想知道至少应该进行一项调整(解释器配置或直接加载).

I wondered since at least one of the adjustments (interpreter configuration or direct loading) should have worked.

我也尝试在控制台上执行spark-submit --jar/usr/local/analyse/jar/spark-streaming-kafka-0-10_2.11.jar,但这似乎仅在我提交程序的情况下有效.

I also tried spark-submit --jar /usr/local/analyse/jar/spark-streaming-kafka-0-10_2.11.jar on the console but this seems to work only if I submit a program.

因此,我还将spark-streaming-kafka-0-10_2.11.jar复制到了/usr/local/analyse/spark/jars/中,其中所有其他jar火花都在其中.但是在重新启动(火花和齐柏林飞艇)后,我总是遇到相同的错误.

So, I also copied spark-streaming-kafka-0-10_2.11.jar to /usr/local/analyse/spark/jars/ where all the others jars of spark are. But after a restart (of spark and zeppelin) I always get the same error.

同时,我发现可以在Web浏览器中查看spark的环境变量,并且在"Classpath Entries"部分的"Classpath Entries"部分中找到了spark-streaming-kafka-0-10_2.11.jar,其中的源为系统类路径"和由用户添加"(似乎是Zeppelin解释器部分中的工件).因此,似乎我的前两次尝试都应该起作用.

In the meantime I found out that I can view the environment variables of spark in the webbrowser and there I find the spark-streaming-kafka-0-10_2.11.jar in the section "Classpath Entries" with the source "System Classpath" and also as "Added By User" (seems to be the artifact in the interpreter section of Zeppelin). So it seems that my first two attemps should have worked.

推荐答案

第一个问题是,您已经下载了用于Spark Streaming的软件包,但是尝试创建结构化的Streaming对象(使用readstream()).请记住,火花流和火花结构化流是两个不同的事物,需要区别对待.

The first issue is that you have downloaded the package for spark streaming but try to create a structered streaming object (with readstream()). Keep in mind that spark streaming and spark structured streaming are two different things and require to be treated differently.

对于结构化流,您需要下载软件包 spark-sql-kafka-0-10_2.11 及其依赖项 snappy-java lz4-java 未使用.您的依存关系部分应如下所示,以加载所有必需的软件包:

For structured streaming you need to download the package spark-sql-kafka-0-10_2.11 and its dependencies kafka-clients, slf4j-api, snappy-java, lz4-java and unused. Your dependency section should look like this to load all the required packages:

z.load("/tmp/spark-sql-kafka-0-10_2.11-2.4.0.jar")
z.load("/tmp/kafka-clients-2.0.0.jar")
z.load("/tmp/lz4-java-1.4.0.jar")
z.load("/tmp/snappy-java-1.1.7.1.jar")
z.load("/tmp/unused-1.0.0.jar")
z.load("/tmp/slf4j-api-1.7.16.jar")

这篇关于结构化流式Kafka 2.1-> Zeppelin 0.8-> Spark 2.4:Spark不使用jar的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆