多个分区的kafka流式传输行为 [英] kafka streaming behaviour for more than one partition

查看:61
本文介绍了多个分区的kafka流式传输行为的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我从Kafka话题中消费.本主题有3个分区.我正在使用foreachRDD处理每个批处理的RDD(使用processData方法来处理每个RDD,并最终从中创建一个DataSet).

I am consuming from Kafka topic. This topic has 3 partitions. I am using foreachRDD to process each batch RDD (using processData method to process each RDD, and ultimately create a DataSet from that).

现在,您可以看到我有count变量,并且我正在"processData"方法中递增此count变量,以检查我已处理了多少实际记录.(我知道,每个RDD都是kafka主题记录的集合,并且数量取决于批处理间隔的大小)

Now, you can see that i have count variable , and i am incrementing this count variable in "processData" method to check how many actual records i have processed. (i understand , each RDD is collection of kafka topic records , and the number depends on batch interval size)

现在,输出是这样的:

1 1 1 2 3 2 4 3 5 ....

1 1 1 2 3 2 4 3 5 ....

这让我认为是因为我可能有3个使用者(因为我有3个分区),并且每个分区将分别调用"foreachRDD"方法,所以相同的计数被打印一次以上,因为每个使用者可能已缓存其计数副本.

This makes me think that its because i might have 3 consumers( as i have 3 partitions), and each of these will call "foreachRDD" method separately, so the same count is being printed more than once, as each consumer might have cached its copy of count.

但是我得到的最终输出数据集具有所有记录.

But the final output DataSet that i get has all the records.

那么,Spark是否会在内部合并所有数据?它如何确定要结合什么?我试图了解这种行为,以便形成自己的逻辑

So , does Spark internally union all the data? How does it makes out what to union? I am trying to understand the behaviour , so that i can form my logic

int count = 0;

int count = 0;

messages.foreachRDD(new VoidFunction<JavaRDD<ConsumerRecord<K, String>>>() {
            public void call(JavaRDD<ConsumerRecord<K, V>> rdd) {
                System.out.println("NUmber of elements in RDD : "+ rdd.count());

                List<Row> rows = rdd.map(record -> processData(record))
                        .reduce((rows1, rows2) -> {
                            rows1.addAll(rows2);
                            return rows1;
                        });

                StructType schema = DataTypes.createStructType(fields);
                Dataset ds = ss.createDataFrame(rows, schema);
                ds.createOrReplaceTempView("trades");                
                ds.show();
            }
        });

推荐答案

假设并不完全准确. foreachRDD 是Spark Streaming中所谓的输出操作之一.输出操作的功能是按批处理间隔指定的间隔安排提供的闭包.该闭包中的代码在spark驱动程序上的每个批处理间隔中执行一次.没有分布在集群中.

The assumptions are not completely accurate. foreachRDD is one of the so-called output operations in Spark Streaming. The function of output operations is to schedule the provided closure at the interval dictated by the batch interval. The code in that closure executes once each batch interval on the spark driver. Not distributed in the cluster.

特别是 foreachRDD 是通用的输出操作,它提供对DStream中基础RDD的访问.在该RDD上应用的操作将在Spark集群上执行.

In particular, foreachRDD is a general purpose output operation that provides access to the underlying RDD within the DStream. Operations applied on that RDD will execute on the Spark cluster.

因此,回到原始问题的代码,在 foreachRDD 闭包中的代码,例如 System.out.println("RDD中的元素数量:" + rdd.count()); 在驱动程序上执行.这也是为什么我们可以在控制台中看到输出的原因.请注意,此 print 中的 rdd.count()将触发集群上RDD的 count ,因此 count 是一个分布式操作,它向驱动程序返回一个值,然后-在驱动程序上-进行 print 操作.

So, coming back to the code of the original question, code in the foreachRDD closure such as System.out.println("NUmber of elements in RDD : "+ rdd.count()); executes on the driver. That's also the reason why we can see the output in the console. Note that the rdd.count() in this print will trigger a count of the RDD on the cluster, so count is a distributed operation that returns a value to the driver, then -on the driver- the print operation takes place.

现在是对RDD的转换:

Now comes a transformation of the RDD:

rdd.map(record -> processData(record))

正如我们提到的,应用于 RDD 的操作将在集群上执行.执行将遵循Spark执行模型;也就是说,转换被组装成多个阶段,并应用于基础数据集的每个分区.鉴于我们正在处理3个kafka主题,因此Spark中将有3个相应的分区.因此, processData 将一次应用于每个分区.

As we mentioned, operations applied to the RDD will execute on the cluster. And that execution will take place following the Spark execution model; that is, transformations are assembled into stages and applied to each partition of the underlying dataset. Given that we are dealing with 3 kafka topics, we will have 3 corresponding partitions in Spark. Hence, processData will be applied once to each partition.

那么,Spark是否在内部合并所有数据?它如何确定要结合什么?

So, does Spark internally union all the data? How does it make out what to union?

我们有用于Spark Streaming的输出操作的相同方法,也有针对Spark的操作.操作可能会对数据进行操作,并将结果带给驱动程序.最简单的操作是 collect ,它将完整的数据集带给驱动程序,并有可能无法容纳在内存中.其他常见操作 count 汇总数据集中的记录数,并向驱动程序返回一个数字.

The same way we have output operations for Spark Streaming, we have actions for Spark. Actions will potentially apply an operation to the data and bring the results to the driver. The most simple operation is collect which brings the complete dataset to the driver, with the risk that it might not fit in memory. Other common action, count summarizes the number of records in the dataset and returns a single number to the driver.

在上面的代码中,我们使用 reduce ,这也是应用提供的功能并将结果数据带给驱动程序的动作.问题中表达的是内部合并所有数据"的操作的使用.在reduce表达式中,我们实际上是在收集分发到单个本地集合中的所有数据.等同于执行以下操作: rdd.map(record-> processData(record)).collect()

In the code above, we are using reduce, which is also an action that applies the provided function and brings the resulting data to the driver. It's the use of that action that is "internally union all the data" as expressed in the question. In the reduce expression, we are actually collecting all the data that was distributed into a single local collection. It would be equivalent to do: rdd.map(record -> processData(record)).collect()

如果要创建数据集,则应避免首先将所有数据移动"到驱动程序.

If the intention is to create a Dataset, we should avoid "moving" all the data to the driver first.

更好的方法是:

val rows = rdd.map(record -> processData(record))
val df = ss.createDataFrame(rows, schema);
...

在这种情况下,所有分区的数据将保留在它们所在的执行程序的本地.

In this case, the data of all partitions will remain local to the executor where they are located.

请注意,应避免将数据移至驱动程序.它很慢,并且在大型数据集的情况下可能会导致作业崩溃,因为驱动程序通常无法保存群集中的所有可用数据.

Note that moving data to the driver should be avoided. It is slow and in cases of large datasets will probably crash the job as the driver cannot typically hold all data available in a cluster.

这篇关于多个分区的kafka流式传输行为的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆