无法在 Spark 中看到来自 Kafka Stream 的消息 [英] Unable to see messages from Kafka Stream in Spark

查看:29
本文介绍了无法在 Spark 中看到来自 Kafka Stream 的消息的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我刚刚开始使用 Pyspark 库测试 Kafka StreamSpark.

I just started the testing of Kafka Stream to Spark using Pyspark library.

我一直在 Jupyter Notebook 上运行整个设置.我正在尝试从 Twitter Streaming 获取数据.

I have been running the whole setup on Jupyter Notebook. I am trying to get data from the Twitter Streaming.

Twitter 流媒体代码:

import json
import tweepy
from uuid import uuid4
import time
from kafka import KafkaConsumer
from kafka import KafkaProducer

auth = tweepy.OAuthHandler("key", "key")
auth.set_access_token("token", "token")
api = tweepy.API(auth, wait_on_rate_limit=True, retry_count=3, retry_delay=5,
                 retry_errors=set([401, 404, 500, 503]))
class CustomStreamListener(tweepy.StreamListener):
    def __init__(self, api):
        self.api = api
        super(tweepy.StreamListener, self).__init__()

    def on_data(self, tweet):
        print tweet
        # Kafka Producer to send data to twitter topic
        producer.send('twitter', json.dumps(tweet))

    def on_error(self, status_code):
        print status_code
        return True # Don't kill the stream

    def on_timeout(self):
        print 'on_timeout'
        return True # Don't kill the stream
producer = KafkaProducer(bootstrap_servers='localhost:9092')
sapi = tweepy.streaming.Stream(auth, CustomStreamListener(api))
sapi.filter(track=["#party"])

Spark Streaming 代码

from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils

sc = SparkContext(appName="PythonSparkStreamingKafka_RM_01").getOrCreate()
sc.setLogLevel("WARN")

streaming_context = StreamingContext(sc, 10)
kafkaStream = KafkaUtils.createStream(streaming_context, 'localhost:2181', 'spark-streaming', {'twitter': 1})  
parsed = kafkaStream.map(lambda v: v)
parsed.count().map(lambda x:'Tweets in this batch: %s' % x).pprint()

streaming_context.start()
streaming_context.awaitTermination()

打印输出:

我做错了什么特定部分?

What particular part I am doing wrong?

推荐答案

您也可以使用一些 GUI 工具,例如 Kafdrop.它在调试 kafka 消息时非常方便.您不仅可以查看消息队列,还可以查看分区的偏移量等.

You can also use some GUI tool like Kafdrop. Its something that came in very handy while debugging kafka messages. You can not only look into the message queues, but also the partition their offsets etc.

它是一个很好的工具,您应该能够轻松部署它.

Its a good tool and you should be able to deploy it easily.

这里是链接:https://github.com/HomeAdvisor/Kafdrop

这篇关于无法在 Spark 中看到来自 Kafka Stream 的消息的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆