Spark Streaming 不读取 Kafka 主题 [英] Spark Streaming not reading from Kafka topics

查看:44
本文介绍了Spark Streaming 不读取 Kafka 主题的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我已经在 Ubuntu 上设置了 Kafka 和 Spark.我正在尝试使用 pyspark(Jupyter notebook)通过 Spark Streaming 阅读 kafka 主题.Spark 既不读取数据也不抛出任何错误.

I have set up Kafka and Spark on Ubuntu. I am trying to read kafka topics through Spark Streaming using pyspark(Jupyter notebook). Spark is neither reading the data nor throwing any error.

Kafka 生产者和消费者在终端上相互通信.Kafka 在端口 9092,9093,9094 上配置了 3 个分区.消息被存储在 kafka 主题中.现在,我想通过 Spark Streaming 阅读它.我不确定我错过了什么.即使我已经在互联网上对其进行了探索,但找不到任何可行的解决方案.请帮助我理解缺失的部分.

Kafka producer and consumer are communicating with each other on terminal. Kafka is configured with 3 partitions on port 9092,9093,9094. Messages are getting stored in kafka topics. Now, I want to read it through Spark Streaming. I am not sure what I am missing. Even I have explored it on internet, but couldnt find any working solution. Please help me to understand the missing part.

  • 主题名称:new_topic
  • 火花 - 2.3.2
  • 卡夫卡 - 2.11-2.1.0
  • Python 3
  • Java-1.8.0_201
  • 动物园管理员端口:2181

卡夫卡制作人:bin/kafka-console-producer.sh --broker-list localhost:9092 --topic new_topic

Kafka Producer : bin/kafka-console-producer.sh --broker-list localhost:9092 --topic new_topic

Kafka 消费者:bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic new_topic --from-beginning

Kafka Consumer: bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic new_topic --from-beginning

Pyspark 代码(Jupyter Notebook):

#!/usr/bin/env python
# coding: utf-8
import os

os.environ['PYSPARK_PYTHON'] = '/usr/bin/python3'

os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages 
org.apache.spark:spark-streaming-kafka-0-8_2.11:2.3.2 pyspark-shell'

import findspark
findspark.init('/home/shekhar/spark-2.3.2-bin-hadoop2.7')
import pyspark
import sys

from pyspark import SparkConf,SparkContext
from pyspark.streaming import StreamingContext

from pyspark.streaming.kafka import KafkaUtils

from uuid import uuid1

if __name__=="__main__":
    #sconf = SparkConf().setAppName("SparkStr").setMaster("local")
    sc = SparkContext(appName="SparkStreamingReceiverKafkaWordCount")
    sc.setLogLevel("WARN")
    ssc = StreamingContext(sc,2)
    broker,topic = sys.argv[1:]
    kvs = KafkaUtils.createStream(ssc,"localhost:9092","raw-event- 
    streaming-consumer",{topic:1})
    lines = kvs.map(lambda x: x[1])
    lines.pprint()
    ssc.start()
    ssc.awaitTermination()

显示在 Jypyter 笔记本中的输出:

-------------------------------------------
Time: 2019-01-30 00:52:18
-------------------------------------------

-------------------------------------------
Time: 2019-01-30 00:52:20
-------------------------------------------

Spark-submit 命令:

bin/spark-submit 
  --packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.3.2 SparkKafka-Copy1.py localhost:9092 new_topic 
  --master spark://localhost:4040

spark-submit 在终端的输出如下:

Ivy Default Cache set to: /home/shekhar/.ivy2/cache
The jars for the packages stored in: /home/shekhar/.ivy2/jars
:: loading settings :: url = jar:file:/home/shekhar/spark-2.3.2-bin-hadoop2.7/jars/ivy-2.4.0.jar!/org/apache/ivy/core/settings/ivysettings.xml
org.apache.spark#spark-streaming-kafka-0-8_2.11 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-0698f154-2d3f-4d56-b2c5-099190b947df;1.0
    confs: [default]
    found org.apache.spark#spark-streaming-kafka-0-8_2.11;2.3.2 in central
    found org.apache.kafka#kafka_2.11;0.8.2.1 in central
    found org.scala-lang.modules#scala-xml_2.11;1.0.2 in central
    found com.yammer.metrics#metrics-core;2.2.0 in central
    found org.slf4j#slf4j-api;1.7.16 in central
    found org.scala-lang.modules#scala-parser-combinators_2.11;1.0.4 in central
    found com.101tec#zkclient;0.3 in central
    found log4j#log4j;1.2.17 in central
    found org.apache.kafka#kafka-clients;0.8.2.1 in central
    found net.jpountz.lz4#lz4;1.2.0 in central
    found org.xerial.snappy#snappy-java;1.1.2.6 in central
    found org.spark-project.spark#unused;1.0.0 in central
:: resolution report :: resolve 617ms :: artifacts dl 19ms
    :: modules in use:
    com.101tec#zkclient;0.3 from central in [default]
    com.yammer.metrics#metrics-core;2.2.0 from central in [default]
    log4j#log4j;1.2.17 from central in [default]
    net.jpountz.lz4#lz4;1.2.0 from central in [default]
    org.apache.kafka#kafka-clients;0.8.2.1 from central in [default]
    org.apache.kafka#kafka_2.11;0.8.2.1 from central in [default]
    org.apache.spark#spark-streaming-kafka-0-8_2.11;2.3.2 from central in [default]
    org.scala-lang.modules#scala-parser-combinators_2.11;1.0.4 from central in [default]
    org.scala-lang.modules#scala-xml_2.11;1.0.2 from central in [default]
    org.slf4j#slf4j-api;1.7.16 from central in [default]
    org.spark-project.spark#unused;1.0.0 from central in [default]
    org.xerial.snappy#snappy-java;1.1.2.6 from central in [default]
    ---------------------------------------------------------------------
    |                  |            modules            ||   artifacts   |
    |       conf       | number| search|dwnlded|evicted|| number|dwnlded|
    ---------------------------------------------------------------------
    |      default     |   12  |   0   |   0   |   0   ||   12  |   0   |
    ---------------------------------------------------------------------
:: retrieving :: org.apache.spark#spark-submit-parent-0698f154-2d3f-4d56-b2c5-099190b947df
    confs: [default]
    0 artifacts copied, 12 already retrieved (0kB/25ms)
2019-01-30 18:40:19 WARN  Utils:66 - Your hostname, shekhar-VirtualBox resolves to a loopback address: 127.0.1.1; using 10.0.2.15 instead (on interface enp0s3)
2019-01-30 18:40:19 WARN  Utils:66 - Set SPARK_LOCAL_IP if you need to bind to another address
2019-01-30 18:40:19 WARN  NativeCodeLoader:62 - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Exception in thread "main" java.io.IOException: Cannot run program "python": error=2, No such file or directory
    at java.lang.ProcessBuilder.start(ProcessBuilder.java:1048)
    at org.apache.spark.deploy.PythonRunner$.main(PythonRunner.scala:100)
    at org.apache.spark.deploy.PythonRunner.main(PythonRunner.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
    at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:894)
    at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:198)
    at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:228)
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:137)
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.io.IOException: error=2, No such file or directory
    at java.lang.UNIXProcess.forkAndExec(Native Method)
    at java.lang.UNIXProcess.<init>(UNIXProcess.java:247)
    at java.lang.ProcessImpl.start(ProcessImpl.java:134)
    at java.lang.ProcessBuilder.start(ProcessBuilder.java:1029)
    ... 12 more
2019-01-30 18:40:19 INFO  ShutdownHookManager:54 - Shutdown hook called
2019-01-30 18:40:19 INFO  ShutdownHookManager:54 - Deleting directory /tmp/spark-e6d0532c-3593-4c28-8bb6-6d48aedb12f3

推荐答案

现已解决.我必须设置 PYTHONPATH 并将其导出到 .bashrc 文件中的路径中.

It is resolved now. I had to set up the PYTHONPATH and export it in the path in .bashrc file.

PYTHONPATH=/usr/bin/python3
export PATH=$PATH:$PYTHONPATH/bin

同时在createstream下的main函数中,zookeeper的端口也被改成了2181,错误的给出了9092.

Along with that in the main function under createstream, zookeeper port was changed to 2181, which was wrongly given as 9092.

这篇关于Spark Streaming 不读取 Kafka 主题的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆