无法在 Spark 中看到来自 Kafka Stream 的消息 [英] Unable to see messages from Kafka Stream in Spark
问题描述
我刚刚开始使用 Pyspark
库测试 Kafka Stream
到 Spark
.
I just started the testing of Kafka Stream
to Spark
using Pyspark
library.
我一直在 Jupyter Notebook
上运行整个设置.我正在尝试从 Twitter Streaming
获取数据.
I have been running the whole setup on Jupyter Notebook
.
I am trying to get data from the Twitter Streaming
.
Twitter 流媒体代码:
import json
import tweepy
from uuid import uuid4
import time
from kafka import KafkaConsumer
from kafka import KafkaProducer
auth = tweepy.OAuthHandler("key", "key")
auth.set_access_token("token", "token")
api = tweepy.API(auth, wait_on_rate_limit=True, retry_count=3, retry_delay=5,
retry_errors=set([401, 404, 500, 503]))
class CustomStreamListener(tweepy.StreamListener):
def __init__(self, api):
self.api = api
super(tweepy.StreamListener, self).__init__()
def on_data(self, tweet):
print tweet
# Kafka Producer to send data to twitter topic
producer.send('twitter', json.dumps(tweet))
def on_error(self, status_code):
print status_code
return True # Don't kill the stream
def on_timeout(self):
print 'on_timeout'
return True # Don't kill the stream
producer = KafkaProducer(bootstrap_servers='localhost:9092')
sapi = tweepy.streaming.Stream(auth, CustomStreamListener(api))
sapi.filter(track=["#party"])
Spark Streaming 代码
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
sc = SparkContext(appName="PythonSparkStreamingKafka_RM_01").getOrCreate()
sc.setLogLevel("WARN")
streaming_context = StreamingContext(sc, 10)
kafkaStream = KafkaUtils.createStream(streaming_context, 'localhost:2181', 'spark-streaming', {'twitter': 1})
parsed = kafkaStream.map(lambda v: v)
parsed.count().map(lambda x:'Tweets in this batch: %s' % x).pprint()
streaming_context.start()
streaming_context.awaitTermination()
打印输出:
我做错了什么特定部分?
What particular part I am doing wrong?
推荐答案
您也可以使用一些 GUI 工具,例如 Kafdrop.它在调试 kafka 消息时非常方便.您不仅可以查看消息队列,还可以查看分区的偏移量等.
You can also use some GUI tool like Kafdrop. Its something that came in very handy while debugging kafka messages. You can not only look into the message queues, but also the partition their offsets etc.
它是一个很好的工具,您应该能够轻松部署它.
Its a good tool and you should be able to deploy it easily.
这里是链接:https://github.com/HomeAdvisor/Kafdrop
这篇关于无法在 Spark 中看到来自 Kafka Stream 的消息的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!