在将RDD发布到Kafka之前对Spark中的RDD进行排序? [英] Sort RDD in Spark before publishing it to Kafka?

查看:79
本文介绍了在将RDD发布到Kafka之前对Spark中的RDD进行排序?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

在我的代码中,我首先订阅一个Kafka流,处理每个RDD以创建我的类 People 的实例,然后,我要发布结果集( Dataset [People] )到Kafka的特定主题.重要的是要注意,并不是从卡夫卡收到的所有传入消息都映射到 People 的实例.此外,应按照与从卡夫卡收到的顺序完全相同的顺序将人员实例发送到卡夫卡.

In my code, I first subscribe to a Kafka stream, process each RDD to create an instance of my class People and then, I want to publish the result set (Dataset[People]) to a specific topic to Kafka. It is important to note that not every incoming message received from Kafka maps to an instance of People. Moreover, instances of people should be sent to Kafka in exactly the same order as received from Kafka.

但是,我不确定在执行程序上运行相应代码时,是否真的有必要进行排序,或者 People 的实例是否保持相同的顺序(我可以直接将数据集发布到Kafka).据我了解,排序是必要的,因为 foreachRDD 中的代码可以在集群中的不同节点上执行.这是正确的吗?

However, I am not sure if sorting is really necessary or if the instances of People maintain the same order when the respective code is run on the executors (and I can directly publish my Dataset to Kafka). As far as I understand, sorting is necessary, because the code inside foreachRDD can be executed on different nodes in the cluster. Is this correct?

这是我的代码:

val myStream = KafkaUtils.createDirectStream[K, V](streamingContext, PreferConsistent, Subscribe[K, V](topics, consumerConfig))

def process(record: (RDD[ConsumerRecord[String, String]], Time)): Unit = record match {
case (rdd, time) if !rdd.isEmpty =>
    // More Code...
    // In the end, I have: Dataset[People]
case _ =>
}

myStream.foreachRDD((x, y) => process((x, y))) // Do I have to replace this call with map, sort the RDD and then publish it to Kafka?

推荐答案

此外,应该以与从卡夫卡收到的顺序完全相同的顺序将人员实例发送到卡夫卡.

Moreover, instances of people should be sent to Kafka in exactly the same order as received from Kafka.

除非您具有单个分区(然后您将不使用Spark,对吗?),接收数据的顺序是不确定的,类似地,发送数据的顺序也不会确定.排序在这里没有任何区别.

Unless you have a single partition (and then you wouldn't use Spark, would you?) the order in which data is received is not deterministic, and similarly order in which data is send won't be. Sorting doesn't make any difference here.

如果您需要非常特定的处理顺序(如果使用数据密集型应用程序,则通常是设计错误),则需要顺序应用程序或比Spark更细粒度控制的系统.

If you need a very specific order of processing (it is typically a design mistake, if you work with data intensive applications) you need a sequential application, or system with much more granular control than Spark.

这篇关于在将RDD发布到Kafka之前对Spark中的RDD进行排序?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆