了解Spark结构化流式并行 [英] Understanding Spark Structured Streaming Parallelism

查看:98
本文介绍了了解Spark结构化流式并行的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我是Spark世界中的新手,正在为某些概念而苦苦挣扎.

I'm a newbie in the Spark world and struggling with some concepts.

使用Kafka的Spark Structured Streaming采购时并行性如何发生?

How does parallelism happen when using Spark Structured Streaming sourcing from Kafka ?

让我们考虑以下代码段:

Let's consider the following snippet code:

SparkSession spark = SparkSession
          .builder()
          .appName("myApp")
          .getOrCreate();   

Dataset<VideoEventData> ds = spark
  .readStream()
  .format("kafka")
  ...

gDataset = ds.groupByKey(...)

pDataset = gDataset.mapGroupsWithState(
      ...
      /* process each key - values */
      loop values
        if value is valid - save key/value result in the HDFS
      ... 
)

StreamingQuery query = pDataset.writeStream()
          .outputMode("update")
          .format("console")
          .start();

//await
query.awaitTermination();

我已经了解到并行性与数据分区的数量有关,并且数据集的分区数量基于spark.sql.shuffle.partitions参数.

I've read that the parallelism is related with the number of data partitions, and the number of partitions for a Dataset is based on the spark.sql.shuffle.partitions parameter.

  1. 对于每一批(从Kafka中抽出),提取的物品是否会在spark.sql.shuffle.partitions的数量之间分配?例如,spark.sql.shuffle.partitions=5Batch1=100行,我们将最终得到5个分区,每个分区20行吗?

  1. For every batch (pull from the Kafka), will the pulled items be divided among the number of spark.sql.shuffle.partitions? For example, spark.sql.shuffle.partitions=5 and Batch1=100 rows, will we end up with 5 partitions with 20 rows each ?

考虑提供的代码段,由于groupByKey后跟mapGroups/mapGroupsWithState函数,我们是否仍会利用Spark并行性?

Considering the snippet code provided, do we still leverage in the Spark parallelism due to the groupByKey followed by a mapGroups/mapGroupsWithState functions ?

更新:

gDataset.mapGroupsWithState内部是我处理每个键/值并将结果存储在HDFS中的地方.因此,输出接收器仅用于在控制台中输出某些统计信息.

Inside the gDataset.mapGroupsWithState is where I process each key/values and store the result in the HDFS. So, the output sink is being used only to output some stats in the console.

推荐答案

对于每一批(从Kafka中拉出),提取的物品是否 在spark.sql.shuffle.partitions?

For every Batch (pull from the Kafka), will the pulled items be divided among the number of spark.sql.shuffle.partitions?

一旦达到groupByKey(即随机边界),它们将被分割.最初检索数据时,分区数将等于Kafka分区数

They will be divided once they reach groupByKey which is a shuffle boundary. When you retrieve the data at first, the number of partitions will be equal to the number of Kafka partitions

考虑提供的代码段代码,我们是否仍在 由于groupByKey后接a,因此产生了火花并行性 mapGroups/mapGroupsWithState函数

Considering the snippet code provided, do we still leverage in the Spark parallelism due to the groupByKey followed by a mapGroups/mapGroupsWithState functions

通常是的,但这还取决于您设置Kafka主题的方式.尽管您从代码中看不到,但Spark会在内部将数据分成不同的阶段,分成较小的任务,并将它们分配给集群中的可用执行程序.如果您的Kafka主题只有1个分区,则意味着在groupByKey之前,您的内部流将包含一个分区,该分区不会被并行化,而是在单个执行程序上执行.只要您的Kafka分区数大于1,您的处理就将是并行的.在重排边界之后,Spark将对数据进行重新分区,以包含spark.sql.shuffle.partitions指定的分区数量.

Generally yes, but it also depends on how you setup your Kafka topic. Although not visible to you from the code, Spark will internally split the data different stage into smaller tasks and distribute them among the available executors in the cluster. If your Kafka topic has only 1 partition, that means that prior to groupByKey, your internal stream will contain a single partition, which won't be parallalized but executed on a single executor. As long as your Kafka partition count is greater than 1, your processing will be parallel. After the shuffle boundary, Spark will re-partition the data to contain the amount of partitions specified by the spark.sql.shuffle.partitions.

这篇关于了解Spark结构化流式并行的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆