结构化流 Kafka 2.1->Zeppelin 0.8->Spark 2.4:spark 不使用 jar [英] structured streaming Kafka 2.1->Zeppelin 0.8->Spark 2.4: spark does not use jar

查看:25
本文介绍了结构化流 Kafka 2.1->Zeppelin 0.8->Spark 2.4:spark 不使用 jar的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个 Kafka 2.1 消息代理,想对 Spark 2.4 中的消息数据进行一些处理.我想使用 Zeppelin 0.8.1 notebooks 进行快速原型设计.

I have a Kafka 2.1 message broker and want to do some processing with data of the messages within Spark 2.4. I want to use Zeppelin 0.8.1 notebooks for rapid prototyping.

我下载了结构化流媒体所必需的 spark-streaming-kafka-0-10_2.11.jar (http://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html) 并将其作为Dependencies-artifact"添加到Zeppelin 的spark"解释器(也处理 %pyspark 段落).我重新启动了这个解释器(还有 zeppelin).

I downloaded the spark-streaming-kafka-0-10_2.11.jar that is necessarry for structured streaming (http://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html) and added it as "Dependencies-artifact" to the "spark"-interpreter of Zeppelin (that also deals with the %pyspark paragraphs). I restarted this interpreter (and also zeppelin).

我还在第一个 notebook 段落中加载了 jar(我最初认为这应该没有必要......):

I also loaded the jar in a first notebook paragraph (I first thought that this should not be necessary...):

%dep z.load("/usr/local/analyse/jar/spark-streaming-kafka-0-10_2.11.jar")
res0: org.apache.zeppelin.dep.Dependency = org.apache.zeppelin.dep.Dependency@2b65d5

所以,我没有出错,所以加载似乎有效.现在,我想做测试,kafka 服务器使用这个端口在同一台机器上运行,还有一个主题测试":

So, I got no error so the loading seems to work. Now, I want to do the testing, the kafka server runs on the same machine using this port and there is also a topic "test":

%pyspark
# Subscribe to a topic
df = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "localhost:9092") \
  .option("subscribe", "test") \
  .load()

但我收到错误

未能执行第 6 行:.option("subscribe", "test") \ Traceback(最近一次通话):文件"/usr/local/analysis/spark/python/lib/pyspark.zip/pyspark/sql/utils.py",第 63 行,装饰中返回 f(*a, **kw) 文件/usr/local/analysis/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py",第 328 行,在 get_return_value 中format(target_id, ".", name), value) py4j.protocol.Py4JJavaError: 调用 o120.load 时出错.:org.apache.spark.sql.AnalysisException:无法找到数据源:卡夫卡.请按照部署部分部署应用程序《结构化流+Kafka集成指南》.在org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:652)在org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:161)在 sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 在sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)在sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)在 java.lang.reflect.Method.invoke(Method.java:498) 在py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) 在py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) 在py4j.Gateway.invoke(Gateway.java:282) 在py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)在 py4j.commands.CallCommand.execute(CallCommand.java:79) 在py4j.GatewayConnection.run(GatewayConnection.java:238) 在java.lang.Thread.run(Thread.java:748)

Fail to execute line 6: .option("subscribe", "test") \ Traceback (most recent call last): File "/usr/local/analyse/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 63, in deco return f(*a, **kw) File "/usr/local/analyse/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, in get_return_value format(target_id, ".", name), value) py4j.protocol.Py4JJavaError: An error occurred while calling o120.load. : org.apache.spark.sql.AnalysisException: Failed to find data source: kafka. Please deploy the application as per the deployment section of "Structured Streaming + Kafka Integration Guide".; at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:652) at org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:161) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at py4j.Gateway.invoke(Gateway.java:282) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:238) at java.lang.Thread.run(Thread.java:748)

在处理上述异常的过程中,又发生了一个异常:

During handling of the above exception, another exception occurred:

回溯(最近一次调用最后一次):文件/tmp/zeppelin_pyspark-312826888257172599.py",第 380 行,在exec(code, _zcUserQueryNameSpace) File "", line 6, in File"/usr/local/analysis/spark/python/lib/pyspark.zip/pyspark/sql/streaming.py",第 400 行,加载中返回 self._df(self._jreader.load()) 文件/usr/local/analysis/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py",第 1257 行,调用答案,self.gateway_client,self.target_id,self.name)文件/usr/local/analysis/spark/python/lib/pyspark.zip/pyspark/sql/utils.py",第 69 行,装饰中引发 AnalysisException(s.split(': ', 1)[1], stackTrace) pyspark.sql.utils.AnalysisException: '未能找到数据源:卡夫卡.请按照部署部分部署应用程序《结构化流+Kafka集成指南》.;'

Traceback (most recent call last): File "/tmp/zeppelin_pyspark-312826888257172599.py", line 380, in exec(code, _zcUserQueryNameSpace) File "", line 6, in File "/usr/local/analyse/spark/python/lib/pyspark.zip/pyspark/sql/streaming.py", line 400, in load return self._df(self._jreader.load()) File "/usr/local/analyse/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in call answer, self.gateway_client, self.target_id, self.name) File "/usr/local/analyse/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 69, in deco raise AnalysisException(s.split(': ', 1)[1], stackTrace) pyspark.sql.utils.AnalysisException: 'Failed to find data source: kafka. Please deploy the application as per the deployment section of "Structured Streaming + Kafka Integration Guide".;'

我想知道至少有一项调整(解释器配置或直接加载)应该有效.

I wondered since at least one of the adjustments (interpreter configuration or direct loading) should have worked.

我也在控制台上尝试过 spark-submit --jar/usr/local/analysis/jar/spark-streaming-kafka-0-10_2.11.jar 但这似乎只有在我提交程序时才有效.

I also tried spark-submit --jar /usr/local/analyse/jar/spark-streaming-kafka-0-10_2.11.jar on the console but this seems to work only if I submit a program.

因此,我还将 spark-streaming-kafka-0-10_2.11.jar 复制到/usr/local/analysis/spark/jars/其他所有 spark jar 所在的位置.但是在重启(spark 和 zeppelin)后,我总是遇到同样的错误.

So, I also copied spark-streaming-kafka-0-10_2.11.jar to /usr/local/analyse/spark/jars/ where all the others jars of spark are. But after a restart (of spark and zeppelin) I always get the same error.

与此同时,我发现我可以在浏览器中查看 spark 的环境变量,并且在Classpath Entries"部分中找到了 spark-streaming-kafka-0-10_2.11.jar 和源"System Classpath"以及Added By User"(似乎是 Zeppelin 解释器部分中的工件).所以看起来我的前两次尝试应该有效.

In the meantime I found out that I can view the environment variables of spark in the webbrowser and there I find the spark-streaming-kafka-0-10_2.11.jar in the section "Classpath Entries" with the source "System Classpath" and also as "Added By User" (seems to be the artifact in the interpreter section of Zeppelin). So it seems that my first two attemps should have worked.

推荐答案

第一个问题是你已经下载了 Spark Streaming 的包,但是尝试创建一个结构化的流式对象(使用 readstream()).请记住,火花流和火花结构化流是两种不同的东西,需要区别对待.

The first issue is that you have downloaded the package for spark streaming but try to create a structered streaming object (with readstream()). Keep in mind that spark streaming and spark structured streaming are two different things and require to be treated differently.

对于结构化流媒体,您需要下载包 spark-sql-kafka-0-10_2.11 及其依赖项 kafka-clients, slf4j-api, snappy-javalz4-java未使用.您的依赖项部分应如下所示以加载所有必需的包:

For structured streaming you need to download the package spark-sql-kafka-0-10_2.11 and its dependencies kafka-clients, slf4j-api, snappy-java, lz4-java and unused. Your dependency section should look like this to load all the required packages:

z.load("/tmp/spark-sql-kafka-0-10_2.11-2.4.0.jar")
z.load("/tmp/kafka-clients-2.0.0.jar")
z.load("/tmp/lz4-java-1.4.0.jar")
z.load("/tmp/snappy-java-1.1.7.1.jar")
z.load("/tmp/unused-1.0.0.jar")
z.load("/tmp/slf4j-api-1.7.16.jar")

这篇关于结构化流 Kafka 2.1->Zeppelin 0.8->Spark 2.4:spark 不使用 jar的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆