无法在Spark中看到来自Kafka Stream的消息 [英] Unable to see messages from Kafka Stream in Spark
问题描述
我刚刚开始使用 Pyspark
库对 Kafka Stream
到 Spark
进行测试.
I just started the testing of Kafka Stream
to Spark
using Pyspark
library.
我已经在 Jupyter Notebook
上运行了整个设置.我正在尝试从 Twitter流
获取数据.
I have been running the whole setup on Jupyter Notebook
.
I am trying to get data from the Twitter Streaming
.
Twitter流代码:
import json
import tweepy
from uuid import uuid4
import time
from kafka import KafkaConsumer
from kafka import KafkaProducer
auth = tweepy.OAuthHandler("key", "key")
auth.set_access_token("token", "token")
api = tweepy.API(auth, wait_on_rate_limit=True, retry_count=3, retry_delay=5,
retry_errors=set([401, 404, 500, 503]))
class CustomStreamListener(tweepy.StreamListener):
def __init__(self, api):
self.api = api
super(tweepy.StreamListener, self).__init__()
def on_data(self, tweet):
print tweet
# Kafka Producer to send data to twitter topic
producer.send('twitter', json.dumps(tweet))
def on_error(self, status_code):
print status_code
return True # Don't kill the stream
def on_timeout(self):
print 'on_timeout'
return True # Don't kill the stream
producer = KafkaProducer(bootstrap_servers='localhost:9092')
sapi = tweepy.streaming.Stream(auth, CustomStreamListener(api))
sapi.filter(track=["#party"])
火花流代码
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
sc = SparkContext(appName="PythonSparkStreamingKafka_RM_01").getOrCreate()
sc.setLogLevel("WARN")
streaming_context = StreamingContext(sc, 10)
kafkaStream = KafkaUtils.createStream(streaming_context, 'localhost:2181', 'spark-streaming', {'twitter': 1})
parsed = kafkaStream.map(lambda v: v)
parsed.count().map(lambda x:'Tweets in this batch: %s' % x).pprint()
streaming_context.start()
streaming_context.awaitTermination()
打印输出:
我做错了哪一部分?
推荐答案
您还可以使用一些像Kafdrop这样的GUI工具.它在调试kafka消息时非常方便.您不仅可以查看消息队列,还可以对它们的偏移量进行分区等.
You can also use some GUI tool like Kafdrop. Its something that came in very handy while debugging kafka messages. You can not only look into the message queues, but also the partition their offsets etc.
它是一个很好的工具,您应该可以轻松地部署它.
Its a good tool and you should be able to deploy it easily.
以下是链接: https://github.com/HomeAdvisor/Kafdrop
这篇关于无法在Spark中看到来自Kafka Stream的消息的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!