无法在Spark中看到来自Kafka Stream的消息 [英] Unable to see messages from Kafka Stream in Spark

查看:48
本文介绍了无法在Spark中看到来自Kafka Stream的消息的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我刚刚开始使用 Pyspark 库对 Kafka Stream Spark 进行测试.

I just started the testing of Kafka Stream to Spark using Pyspark library.

我已经在 Jupyter Notebook 上运行了整个设置.我正在尝试从 Twitter流获取数据.

I have been running the whole setup on Jupyter Notebook. I am trying to get data from the Twitter Streaming.

Twitter流代码:

import json
import tweepy
from uuid import uuid4
import time
from kafka import KafkaConsumer
from kafka import KafkaProducer

auth = tweepy.OAuthHandler("key", "key")
auth.set_access_token("token", "token")
api = tweepy.API(auth, wait_on_rate_limit=True, retry_count=3, retry_delay=5,
                 retry_errors=set([401, 404, 500, 503]))
class CustomStreamListener(tweepy.StreamListener):
    def __init__(self, api):
        self.api = api
        super(tweepy.StreamListener, self).__init__()

    def on_data(self, tweet):
        print tweet
        # Kafka Producer to send data to twitter topic
        producer.send('twitter', json.dumps(tweet))

    def on_error(self, status_code):
        print status_code
        return True # Don't kill the stream

    def on_timeout(self):
        print 'on_timeout'
        return True # Don't kill the stream
producer = KafkaProducer(bootstrap_servers='localhost:9092')
sapi = tweepy.streaming.Stream(auth, CustomStreamListener(api))
sapi.filter(track=["#party"])

火花流代码

from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils

sc = SparkContext(appName="PythonSparkStreamingKafka_RM_01").getOrCreate()
sc.setLogLevel("WARN")

streaming_context = StreamingContext(sc, 10)
kafkaStream = KafkaUtils.createStream(streaming_context, 'localhost:2181', 'spark-streaming', {'twitter': 1})  
parsed = kafkaStream.map(lambda v: v)
parsed.count().map(lambda x:'Tweets in this batch: %s' % x).pprint()

streaming_context.start()
streaming_context.awaitTermination()

打印输出:

我做错了哪一部分?

推荐答案

您还可以使用一些像Kafdrop这样的GUI工具.它在调试kafka消息时非常方便.您不仅可以查看消息队列,还可以对它们的偏移量进行分区等.

You can also use some GUI tool like Kafdrop. Its something that came in very handy while debugging kafka messages. You can not only look into the message queues, but also the partition their offsets etc.

它是一个很好的工具,您应该可以轻松地部署它.

Its a good tool and you should be able to deploy it easily.

以下是链接: https://github.com/HomeAdvisor/Kafdrop

这篇关于无法在Spark中看到来自Kafka Stream的消息的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆