如何正确使用pyspark将数据发送到kafka经纪人? [英] how to properly use pyspark to send data to kafka broker?

查看:1029
本文介绍了如何正确使用pyspark将数据发送到kafka经纪人?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试编写一个简单的pyspark作业,该作业将从kafka经纪人主题中接收数据,对该数据进行一些转换,然后将转换后的数据放在另一个kafka经纪人主题中.

I'm trying to write a simple pyspark job, which would receive data from a kafka broker topic, did some transformation on that data, and put the transformed data on a different kafka broker topic.

我有以下代码,该代码从kafka主题读取数据,但是运行sendkafka函数无效:

I have the following code, which reads data from a kafka topic, but has no effect running sendkafka function:

from pyspark import SparkConf, SparkContext

from operator import add
import sys
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
import json
from kafka import SimpleProducer, KafkaClient

def sendkafka(messages):
    kafka = KafkaClient("localhost:9092")
    producer = SimpleProducer(kafka)
    for message in messages:
        yield producer.send_messages('spark.out', message)

def main():
    sc = SparkContext(appName="PythonStreamingDirectKafkaWordCount")
    ssc = StreamingContext(sc, 5)

    brokers, topic = sys.argv[1:]
    kvs = KafkaUtils.createDirectStream(ssc, [topic], {"metadata.broker.list": brokers})
    parsed = kvs.map(lambda (key, value): json.loads(value))
    parsed.pprint()

    sentRDD = kvs.mapPartitions(sendkafka)
    sentRDD.count()

    ssc.start()
    ssc.awaitTermination()
if __name__ == "__main__":

   main()

为了使我的sendkafka函数实际将数据发送到spark.out kafka主题,我应该更改什么?

What should I change, in order to make my sendkafka function to actually send data to the spark.out kafka topic?

推荐答案

这是正确的代码,它从Kafka读入Spark,然后将spark数据写回到另一个kafka主题:

Here is the correct code, which reads from Kafka into Spark, and writes spark data back to a different kafka topic:

from pyspark import SparkConf, SparkContext
from operator import add
import sys
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
import json
from kafka import SimpleProducer, KafkaClient
from kafka import KafkaProducer

producer = KafkaProducer(bootstrap_servers='localhost:9092')

def handler(message):
    records = message.collect()
    for record in records:
        producer.send('spark.out', str(record))
        producer.flush()

def main():
    sc = SparkContext(appName="PythonStreamingDirectKafkaWordCount")
    ssc = StreamingContext(sc, 10)

    brokers, topic = sys.argv[1:]
    kvs = KafkaUtils.createDirectStream(ssc, [topic], {"metadata.broker.list": brokers})
    kvs.foreachRDD(handler)

    ssc.start()
    ssc.awaitTermination()
if __name__ == "__main__":

   main()

运行此方法的方法是:

spark-submit --jars spark-streaming-kafka-assembly_2.10-1.6.1.jar s.py localhost:9092 test

这篇关于如何正确使用pyspark将数据发送到kafka经纪人?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆