Spark从json文件中逐块处理rdd并发布到Kafka主题 [英] Spark to process rdd chunk by chunk from json files and post to Kafka topic

查看:58
本文介绍了Spark从json文件中逐块处理rdd并发布到Kafka主题的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我是 Spark & 的新手斯卡拉.我需要处理来自 s3 位置的 json 文件数量.这些数据基本上是批处理数据,将保留以便稍后重新处理.现在我的 spark 作业应该以这样一种方式处理这些文件,即它应该选择 5 个原始 json 记录并且应该向 Kafka 主题发送一条消息.只选择 5 条记录的原因是 kafka 主题在同一主题上同时处理实时和批量数据.所以批处理不应该延迟实时处理.

I am new to Spark & scala. I have a requirement to process number of json files say from s3 location. These data is basically batch data which would be kept for reproccessing sometime later. Now my spark job should process these files in such a way that it should pick 5 raw json records and should send a message to Kafka topic. The reason of picking only 5 records is kafka topic is processing both real time and batch data simultaneously on the same topic. so the batch processing should not delay the real time processing.

我需要按顺序处理整个 json 文件,所以我一次只选择 5 条记录并向 kafka 发布消息并选择 json 文件的下 5 条记录,依此类推...

I need to process the whole json file sequentially and so I would pick only 5 records at a time and post a message to kafka and pick next 5 records of json file and so on...

我写了一段代码,可以从 json 文件中读取并将其发布到 kafka 主题.

I have written a piece of code which would read from json files and post it to kafka topic.

        val jsonRDD = sc.textFile(s3Location)

        var count = 0

        val buf = new StringBuilder

        jsonRDD.collect().foreach(line => {
            count += 1
                    buf ++= line
                    if (count > 5) {
                        println(s"Printing 5 jsons $buf")
                        count = 0
                        buf.setLength(0)
                        SendMessagetoKakfaTopic(buf) // psuedo cod for sending message to kafkatopic 
                        Thread.sleep(10000)
                    }
        })
        if (buf != null) {
            println(s"Printing remaining jsons $buf")
            SendMessagetoKakfaTopic(buf)
        }

我相信在 Spark 中有一种更有效的处理 JSON 的方法.

I believe there is a more efficient way of processing JSONs in Spark.

而且我还应该寻找任何其他参数,如内存、资源等.因为数据可能会超过 100 次演出.

And also I should also be looking for any other parameters like memory, resources etc. Since the data might go beyond 100's of gig.

推荐答案

这看起来像是 Spark Streaming 或(推荐)Spark 结构化流.

That looks like a case for Spark Streaming or (recommended) Spark Structured Streaming.

在任何一种情况下,您都会监控目录并每批次间隔处理新文件(可配置).

In either case you monitor a directory and process new files every batch interval (configurable).

您可以使用 SparkContext.textFile(带通配符)或 SparkContext.wholeTextFiles 处理它.无论哪种情况,您最终都会使用 RDD[String] 来表示 JSON 文件中的行(每个 JSON 文件一行).

You could handle it using SparkContext.textFile (with wildcards) or SparkContext.wholeTextFiles. In either case, you'll eventually end up with RDD[String] to represent the rows in your JSON files (one line per JSON file).

如果您的要求是按 5 行一个块一个 5 行的块顺序处理文件,您可以使用 RDD.toLocalIterator 使转换管道的效率稍微提高一些:

If your requirement is to process the files sequentially, 5-line chunk by 5-line chunk, you could make the transformation pipeline slightly more efficient by using RDD.toLocalIterator:

toLocalIterator: Iterator[T]

返回一个迭代器,该迭代器包含此 RDD 中的所有元素.迭代器将消耗与此 RDD 中最大分区一样多的内存.

Return an iterator that contains all of the elements in this RDD. The iterator will consume as much memory as the largest partition in this RDD.

参见RDD API.

使用 迭代器 JSON,您'd 做 sliding 有 5 个元素.

With Iterator of JSONs, you'd do sliding with 5 elements.

这将为您提供非常高效的管道.

That would give you pretty efficient pipeline.

我再次强烈建议阅读 Structured Streaming + Kafka 集成指南(Kafka broker 0.10.0 或更高版本)(关于读但也支持写).

I once again strongly recommend reading up on Structured Streaming in Structured Streaming + Kafka Integration Guide (Kafka broker version 0.10.0 or higher) (it's about reading but writing is also supported).

这篇关于Spark从json文件中逐块处理rdd并发布到Kafka主题的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆