坚持Spark Streaming输出 [英] Persisting Spark Streaming output

查看:200
本文介绍了坚持Spark Streaming输出的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在收集来自消息应用程序的数据,目前我正在使用Flume,它每天发送大约5000万条记录



我希望使用Kafka,
使用Spark Streaming
从Kafka消费,并将其持久化到hadoop并与impala一起查询



我遇到了每种方法的问题..
$ b

方法1 - 将rdd保存为parquet,将外部配置单元parquet表指向parquet目录

  // scala 
val ssc = new StreamingContext(sparkConf,Seconds(bucketsize.toInt))
val lines = KafkaUtils.createStream(ssc,zkQuorum,group,topicMap) .map(_._ 2)
lines.foreachRDD(rdd => {

// 1 - 从rdd创建SchemaRDD对象并指定模式
val SchemaRDD1 = sqlContext.jsonRDD(rdd,schema)

- 将其注册为spark sql表
SchemaRDD1.registerTempTable(sparktable)

// 3 - qry可以产生另一个SchemaRDD对象数据需要'finalParquet'。并将其作为parquet文件持久化
finalParquet = sqlContext.sql(sql)
finalParquet.saveAsParquetFile(dir)

问题是,finalParquet。 saveAsParquetFile 输出一个巨大的数字。 Dstream从Kafka收到超过200个文件,每批处理1分钟。
输出许多文件的原因是因为计算是按照另一篇文章中的解释进行分配的 -
所提出的解决方案对我来说似乎并不理想,例如就像一个用户说的那样 - 如果你的数据非常少,那么只有一个输出文件是一个好主意。

方法2 - 使用Hivecontext。直接插入rdd数据到一个配置单元表

 #python 
sqlContext = HiveContext(sc)
ssc = StreamingContext(sc,int(batch_interval))
kvs = KafkaUtils.createStream(ssc,zkQuorum,group,{topics:1})
lines = kvs.map(lambda x:x [1])。 persist(StorageLevel.MEMORY_AND_DISK_SER)
lines.foreachRDD(sendRecord)
$ b $ def sendRecord(rdd):

sql =INSERT INTO TABLE table select * from beacon_sparktable

#1 - 将模式应用于RDD,创建数据框'beaconDF'
beaconDF = sqlContext.jsonRDD(rdd,schema)

#2-寄存器DataFrame作为spark sql表。
beaconDF.registerTempTable(beacon_sparktable)

#3 - 直接从qry插入到配置单元上的spark sql表
sqlContext.sql(sql);

这可以很好地工作,它可以直接插入镶木地板表中,时间超过批处理间隔时间。
消费者无法跟上正在生产的产品数量,并且要处理的批次开始排队。

它似乎写入配置单元的速度很慢。我尝试调整批量intervla大小,运行更多的消费者实例。



总结



什么是最佳方式坚持从Spark Streaming获取大数据,因为多个文件存在问题,写入配置单元可能存在延迟问题?
其他人在做什么?



在这里也有类似的问题,但是他在目录中存在一个问题,因为太多的文件包含
如何让Spark Streaming编写它的输出以便Impala可以读取它?

很多感谢您的帮助

解决方案

在解决方案#2中,创建的文件数量可以通过每个RDD的分区数来控制。



查看此示例:
$ b $

  //创建一个Hive表(假设它已经存在)
sqlContext.sql(CREATE TABLE test id int,txt string)STORED AS PARQUET)

//创建一个包含2条记录和1个分区的RDD
val rdd = sc.parallelize(List(List(1,hello ),List(2,world)),1)

//从RDD创建数据框
val schema = StructType(Seq(
StructFie ld(id,IntegerType,nullable = false),
StructField(txt,StringType,nullable = false)
))
val df = sqlContext.createDataFrame(rdd.map Row(_:_ *)),schema)

//创建一个文件,因为RDD有1个分区
df.write.mode(append)。saveAsTable( test)

现在,我猜你可以使用从Kafka提取数据的频率,以及每个RDD的分区数量(默认情况下,您的Kafka主题的分区,您可以通过重新分区来减少分区)。



我使用Spark 1.5 CDH 5.5.1,并且使用 df.write.mode(append)。saveAsTable(test)或您的SQL字符串得到相同的结果。 p>

I'm Collecting the data from a messaging app, i'm currently using Flume, it sends approx 50 Million records per day

I wish to use Kafka, consume from Kafka using Spark Streaming and persist it to hadoop and query with impala

I'm having issues with each approach I've tried..

Approach 1 - Save rdd as parquet, point an external hive parquet table to the parquet directory

// scala
val ssc =  new StreamingContext(sparkConf, Seconds(bucketsize.toInt))
val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2)
lines.foreachRDD(rdd => {

    // 1 - Create a SchemaRDD object from the rdd and specify the schema
    val SchemaRDD1 = sqlContext.jsonRDD(rdd, schema)

    // 2 - register it as a spark sql table
    SchemaRDD1.registerTempTable("sparktable")

    // 3 - qry sparktable to produce another SchemaRDD object of the data needed 'finalParquet'. and persist this as parquet files
    val finalParquet = sqlContext.sql(sql)
    finalParquet.saveAsParquetFile(dir)

The problem is that finalParquet.saveAsParquetFile outputs a huge no. of files, the Dstream received from Kafka outputs over 200 files for a 1 minute batch size. The reason that it outputs many files is because the computation is distributed as explained in another post- how to make saveAsTextFile NOT split output into multiple file? the propsed solutions dont seem optimal for me , for e.g. as one user states - Having a single output file is only a good idea if you have very little data.

Approach 2 - Use Hivecontext. insert rdd data directly to a hive table

# python
sqlContext = HiveContext(sc)
ssc = StreamingContext(sc, int(batch_interval))
kvs = KafkaUtils.createStream(ssc, zkQuorum, group, {topics: 1})
lines = kvs.map(lambda x: x[1]).persist(StorageLevel.MEMORY_AND_DISK_SER)
lines.foreachRDD(sendRecord)

def sendRecord(rdd):

  sql = "INSERT INTO TABLE table select * from beacon_sparktable"

  # 1 - Apply the schema to the RDD creating a data frame 'beaconDF'
  beaconDF = sqlContext.jsonRDD(rdd,schema)

  # 2- Register the DataFrame as a spark sql table.
  beaconDF.registerTempTable("beacon_sparktable")

  # 3 - insert to hive directly from a qry on the spark sql table
  sqlContext.sql(sql);

This works fine , it inserts directly to a parquet table but there are scheduling delays for the batches as processing time exceeds the batch interval time. The consumer cant keep up with whats being produced and the batches to process begin to queue up.

it seems writing to hive is slow. ive tried adjusting batch intervla size, running more consumer instances.

In summary

What is the best way to persist Big data from Spark Streaming given that there are issues with multiple files and potential latency with writing to hive? What are other people doing?

A similar question has been asked here, but he has an issue with directories as apposed to too many files How to make Spark Streaming write its output so that Impala can read it?

Many Thanks for any help

解决方案

In solution #2, the number of files created can be controlled via the number of partitions of each RDD.

See this example:

// create a Hive table (assume it's already existing)
sqlContext.sql("CREATE TABLE test (id int, txt string) STORED AS PARQUET")

// create a RDD with 2 records and only 1 partition
val rdd = sc.parallelize(List( List(1, "hello"), List(2, "world") ), 1)

// create a DataFrame from the RDD
val schema = StructType(Seq(
 StructField("id", IntegerType, nullable = false),
 StructField("txt", StringType, nullable = false)
))
val df = sqlContext.createDataFrame(rdd.map( Row(_:_*) ), schema)

// this creates a single file, because the RDD has 1 partition
df.write.mode("append").saveAsTable("test")

Now, I guess you can play with the frequency at which you pull data from Kafka, and the number of partitions of each RDD (default, the partitions of your Kafka topic, that you can possibly reduce by repartitioning).

I'm using Spark 1.5 from CDH 5.5.1, and I get the same result using either df.write.mode("append").saveAsTable("test") or your SQL string.

这篇关于坚持Spark Streaming输出的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆