持久化 Spark Streaming 输出 [英] Persisting Spark Streaming output

查看:42
本文介绍了持久化 Spark Streaming 输出的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在从消息应用程序收集数据,我目前正在使用 Flume,它每天发送大约 5000 万条记录

I'm collecting the data from a messaging app, I'm currently using Flume, it sends approx 50 Million records per day

我想使用 Kafka,使用 Spark Streaming 从 Kafka 消费并将其持久化到 hadoop 并使用 impala 进行查询

I wish to use Kafka, consume from Kafka using Spark Streaming and persist it to hadoop and query with impala

我尝试过的每种方法都有问题..

I'm having issues with each approach I've tried..

方法 1 - 将 RDD 保存为 parquet,将外部 hive parquet 表指向 parquet 目录

Approach 1 - Save RDD as parquet, point an external hive parquet table to the parquet directory

// scala
val ssc =  new StreamingContext(sparkConf, Seconds(bucketsize.toInt))
val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2)
lines.foreachRDD(rdd => {

    // 1 - Create a SchemaRDD object from the rdd and specify the schema
    val SchemaRDD1 = sqlContext.jsonRDD(rdd, schema)

    // 2 - register it as a spark sql table
    SchemaRDD1.registerTempTable("sparktable")

    // 3 - qry sparktable to produce another SchemaRDD object of the data needed 'finalParquet'. and persist this as parquet files
    val finalParquet = sqlContext.sql(sql)
    finalParquet.saveAsParquetFile(dir)

问题在于 finalParquet.saveAsParquetFile 输出大量文件,从 Kafka 接收的 Dstream 输出超过 200 个文件,批处理大小为 1 分钟.它输出许多文件的原因是因为计算的分布如另一篇文章中所述 - 如何使 saveAsTextFile 不将输出拆分为多个文件?

The problem is that finalParquet.saveAsParquetFile outputs a huge number of files, the Dstream received from Kafka outputs over 200 files for a 1 minute batch size. The reason that it outputs many files is because the computation is distributed as explained in another post- how to make saveAsTextFile NOT split output into multiple file?

然而,提议的解决方案对我来说似乎不是最佳的,例如正如一位用户所说 - 如果您的数据很少,那么拥有一个输出文件只是一个好主意.

However, the propsed solutions don't seem optimal for me , for e.g. as one user states - Having a single output file is only a good idea if you have very little data.

方法 2 - 使用 HiveContext.将RDD数据直接插入到hive表中

Approach 2 - Use HiveContext. insert RDD data directly to a hive table

# python
sqlContext = HiveContext(sc)
ssc = StreamingContext(sc, int(batch_interval))
kvs = KafkaUtils.createStream(ssc, zkQuorum, group, {topics: 1})
lines = kvs.map(lambda x: x[1]).persist(StorageLevel.MEMORY_AND_DISK_SER)
lines.foreachRDD(sendRecord)

def sendRecord(rdd):

  sql = "INSERT INTO TABLE table select * from beacon_sparktable"

  # 1 - Apply the schema to the RDD creating a data frame 'beaconDF'
  beaconDF = sqlContext.jsonRDD(rdd,schema)

  # 2- Register the DataFrame as a spark sql table.
  beaconDF.registerTempTable("beacon_sparktable")

  # 3 - insert to hive directly from a qry on the spark sql table
  sqlContext.sql(sql);

这很好用,它直接插入到镶木地板表中,但由于处理时间超过了批处理间隔时间,因此批处理会出现调度延迟.消费者跟不上正在生产的东西,要处理的批次开始排队.

This works fine , it inserts directly to a parquet table but there are scheduling delays for the batches as processing time exceeds the batch interval time. The consumer cant keep up with whats being produced and the batches to process begin to queue up.

似乎写入 hive 很慢.我尝试调整批处理间隔大小,运行更多消费者实例.

it seems writing to hive is slow. I've tried adjusting batch interval size, running more consumer instances.

鉴于存在多个文件的问题以及写入 hive 的潜在延迟,从 Spark Streaming 保存大数据的最佳方法是什么?其他人在做什么?

What is the best way to persist Big data from Spark Streaming given that there are issues with multiple files and potential latency with writing to hive? What are other people doing?

这里也有人问过类似的问题,但他对目录有问题,因为文件太多如何制作 Spark流式写入其输出以便 Impala 可以读取它?

A similar question has been asked here, but he has an issue with directories as apposed to too many files How to make Spark Streaming write its output so that Impala can read it?

非常感谢您的帮助

推荐答案

我觉得小文件问题可以稍微解决.您可能会获得大量基于 kafka 分区的文件.对我来说,我有 12 个分区的 Kafka 主题,我使用 spark.write.mode("append").parquet("/location/on/hdfs") 编写.

I think the small file problem could be resolved somewhat. You may be getting large number of files based on kafka partitions. For me, I have 12 partition Kafka topic and I write using spark.write.mode("append").parquet("/location/on/hdfs").

现在,根据您的要求,您可以添加 coalesce(1) 或更多以减少文件数量.另一种选择是增加微批次持续时间.例如,如果你能接受写作日延迟 5 分钟,你就可以有 300 秒的微批次.

Now depending on your requirements, you can either add coalesce(1) or more to reduce number of files. Also another option is to increase the micro batch duration. For example, if you can accept 5 minutes delay in writing day, you can have micro batch of 300 seconds.

对于第二个问题,批次排队只是因为您没有启用背压.首先,您应该验证您可以在单个批次中处理的最大值是多少.一旦你能解决这个数字,你可以设置 spark.streaming.kafka.maxRatePerPartition 值和 spark.streaming.backpressure.enabled=true 以启用每个记录的有限数量微批次.如果仍然不能满足需求,那么只能选择增加topic的分区或者增加spark应用的资源.

For the second issues, the batches queue up only because you don't have back pressure enabled. First you should verify what is the max you can process in a single batch. Once you can get around that number, you can set spark.streaming.kafka.maxRatePerPartition value and spark.streaming.backpressure.enabled=true to enable limited number of records per micro batch. If you still cannot meet the demand, then the only options are to either increase partitions on topic or to increase resources on spark application.

这篇关于持久化 Spark Streaming 输出的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆