PySpark处理流数据并将处理后的数据保存到文件 [英] PySpark Processing Stream data and saving processed data to file

查看:976
本文介绍了PySpark处理流数据并将处理后的数据保存到文件的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试复制正在流式传输其位置坐标的设备,然后处理数据并将其保存到文本文件中. 我正在使用Kafka和Spark流(在pyspark上),这是我的体系结构:

I am trying to replicate a device that is streaming it's location's coordinates, then process the data and save it to a text file. I am using Kafka and Spark streaming (on pyspark),this is my architecture:

1-Kafka生产者以以下字符串格式将数据发送到名为test的主题:

1-Kafka producer emits data to a topic named test in the following string format :

"LG float LT float" example : LG 8100.25191107 LT 8406.43141483

生产者代码:

from kafka import KafkaProducer
import random

producer = KafkaProducer(bootstrap_servers='localhost:9092')

for i in range(0,10000):
    lg_value = str(random.uniform(5000, 10000))
    lt_value = str(random.uniform(5000, 10000))
producer.send('test', 'LG '+lg_value+' LT '+lt_value)

producer.flush()

生产者工作正常,我可以在消费者中获得流数据(甚至是火花)

The producer works fine and i get the streamed data in the consumer(and even in spark)

2- Spark流正在接收此流,我什至可以pprint()

2- Spark streaming is receiving this stream,i can even pprint() it

火花流处理代码

from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils

ssc = StreamingContext(sc, 1)
kvs = KafkaUtils.createDirectStream(ssc, ["test"], {"bootstrap.servers": "localhost:9092"})

lines = kvs.map(lambda x: x[1])

words      = lines.flatMap(lambda line: line.split(" "))
words.pprint()
word_pairs = words.map(lambda word: (word, 1))
counts     = word_pairs.reduceByKey(lambda a, b: a+b)
results    = counts.foreachRDD(lambda word: word.saveAsTextFile("C:\path\spark_test.txt"))
//I tried this kvs.saveAsTextFiles('C:\path\spark_test.txt')
// to copy all stream and it works fine
ssc.start()
ssc.awaitTermination()

作为错误,我得到了:

16/12/26 00:51:53 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)
org.apache.spark.SparkException: Python worker did not connect back in time

和其他例外情况.

我真正想要的是将每个条目"LG float LT float"以JSON格式保存在文件中,但是首先我想将坐标简单地保存在文件中,我似乎无法实现.有什么想法吗?

What i actually want is to save each entry "LG float LT float" as a JSON format in a file,but first i want to simply save the coordinates in a file,i cant seem to make that happen.Any ideas?

如果需要,我可以提供完整的堆栈跟踪信息

I can provide with the full stack trace if needed

推荐答案

我这样解决了这个问题,所以我做了一个函数来将每个RDD保存在文件中,这是解决了我的问题的代码:

I solved this like this, so i made a function to save each RDD, in the file ,this is the code that solved my problem :

from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils

ssc = StreamingContext(sc, 1)
kvs = KafkaUtils.createDirectStream(ssc, ["test"], {"bootstrap.servers": "localhost:9092"})

lines = kvs.map(lambda x: x[1])

coords      = lines.map(lambda line: line)

def saveCoord(rdd):
    rdd.foreach(lambda rec: open("C:\path\spark_test.txt", "a").write(
        "{"+rec.split(" ")[0]+":"+rec.split(" ")[1]+","+rec.split(" ")[2]+":"+rec.split(" ")[3]+"},\n"))
coords.foreachRDD(saveCoord)
coords.pprint()

ssc.start()
ssc.awaitTermination()

这篇关于PySpark处理流数据并将处理后的数据保存到文件的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆