spark sql:如何在组级别实现数据帧的并行处理,但在每个组中,我们需要对行进行顺序处理 [英] spark sql : How to achieve parallel processing of dataframe at group level but with in each group, we require sequential processing of rows

查看:92
本文介绍了spark sql:如何在组级别实现数据帧的并行处理,但在每个组中,我们需要对行进行顺序处理的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

  1. 在数据框上应用分组.可以说,这导致了100组,每组10行.
  2. 我有一个必须应用于每个组的功能.它可以并行且以任何顺序发生(即,可以随意选择以任何顺序执行任何组).
  3. 但是在组中,我需要保证对行进行顺序处理.因为在处理了组中的每一行之后,我在处理组中剩余的任何行时都使用了输出.

我们采用以下方法,其中一切都在驱动程序上运行,并且无法利用Spark集群在节点之间的并行处理(如预期的那样,性能确实很差)

We took the below approach where everything was running on the driver and could not utilize the spark cluster's parallel processing across nodes (as expected, performance was real bad)

1)将主DF分成多个数据帧,并放置在一个数组中:

1) Break the main DF into multiple dataframes, placed in an array :

val securityIds = allocStage1DF.select("ALLOCONEGROUP").distinct.collect.flatMap(_.toSeq)
val bySecurityArray = securityIds.map(securityIds => allocStage1DF.where($"ALLOCONEGROUP" <=> securityIds))

2)循环遍历数据帧,并从上面的数据帧逐行传递到一种方法进行处理:

2) Loop thru the dataframe and pass to a method to process, row-by-row from above dataframe:

df.coalesce(1).sort($"PRIORITY" asc).collect().foreach({
  row => AllocOneOutput.allocOneOutput(row)}
)

我们正在寻找的是并行处理和顺序处理的组合.

What we are looking for is a combination of parallel and sequential processing.

  1. 组级别的并行处理.因为它们都是独立的组,可以并行化.

  1. Parallel processing at group level. because, these are all independent groups and can be parallelized.

每组中的行都必须按照一个顺序一个接一个地处理,这对于我们的用例而言非常重要.

With in each group, rows have to be processed one after the other in a sequence which is very important for our use case.

示例数据

在SECURITY_ID,CC,BU,MPU上应用分组,这从上面给了我们2个组(SECID_1,CC_A,BU_A,MPU_A和SECID_2,CC_A,BU_A,MPU_A).

Apply grouping on SECURITY_ID,CC,BU,MPU which gives us 2 groups from above (SECID_1,CC_A,BU_A,MPU_A and SECID_2,CC_A,BU_A,MPU_A).

借助于优先级矩阵(除了用于将行排序为行的ref表之外,什么都没有),我们将每个组转置为以下内容:

with the help of a priority matrix ( nothing but a ref table for assinging rank to rows), we transpose each group into below :

转置数据

以上组中的每一行都具有优先级,并按该顺序排序.现在,我想通过将每一行传递给函数来处理每一行,并获得如下所示的输出:

Each row in the above group has a priority and are sorted in that order. Now, I want to process each row one after the other by passing them to a function and get an output like below :

输出

用例的详细说明:

  1. 基本数据框包含金融公司的所有交易头寸数据.一些客户购买(买入)给定的金融产品(由securityId唯一标识),而某些客户出售(卖空)它们.
  2. 我们的应用程序的想法是在给定的securityId中识别/配对多头和空头.
  3. 由于这种配对是在securityId中发生的,因此我们说基础数据帧基于此securityId分为几组,并且每个组都可以独立处理.
  4. 我们为什么要在组中寻找顺序处理?这是因为,当给定组中有很多多头头寸和许多空头头寸时(如示例数据所示),参考表(优先级矩阵)决定了必须将哪个多头头寸与哪个空头头寸配对.基本上,它给出了处理的顺序.
  5. 第二个原因是,当给定长数量和短数量时 不相等,则剩余数量符合配对条件. 即,如果剩余数量多,则可以与下一个配对 优先级或副优先级在组中可用的数量少 反之亦然.
  6. 由于原因,在4& 5,我们希望在一组中逐行处理.
  1. Base data frame has all trading positions data of a financial firm. some customers buy (long) a given financial product (uniquely identified by securityId) and some sell(short) them.
  2. The idea of our application is to identify/pair the long positions and short positions in a given securityId.
  3. Since this pairing happens with in a securityId, we said that the base data frame is divided into groups based on this securityId and each group can processed independently.
  4. Why are we looking for sequential processing within a group ? It is because, when there are many long positions and many short positions in a given group (as the example data had) then the reference table (priority matrix) decides which long position has to be paired against which short position. basically, it gives the order of processing.
  5. Second reason is that, when a given long quantity and short quantity are not equal then the residual quantity is eligible for pairing. i.e., if long quantity is left, then it can be paired with the next short quantity available in the group as per the priority or vice versa.
  6. Because of the reasons, mentioned in 4 & 5, we are looking to process row after row with in a group.

以上要点使用以下数据集进行描述.

Above points are described using the dataset below.

基础数据框架

    +-------------+----------+----------+------
ROW_NO|SECURITY_ID| ACCOUNT|POSITION|
+-------------+----------+----------+------
  1|      secId|   Acc1|       +100|
  2|      secId|   Acc2|       -150|
  3|      secId|   Acc3|       -25|
  4|      secId2|   Acc3|       -25|
  5|      secId2|   Acc3|       -25|

基础数据帧基于组被securityID划分.让我们如下使用secId组

Base data frame is divided based on group by securityID. Let us use secId group as below

+-------------+----------+----------+------
ROW_NO|SECURITY_ID| ACCOUNT|POSITION|
+-------------+----------+----------+------
  1|      secId|   Acc1|       +100|
  2|      secId|   Acc2|       -150|
  3|      secId|   Acc3|       -25|

在上述情况下,可以将100的正位置与-50或-25配对.为了打破平局,下面的参考表称为优先级矩阵,有助于定义顺序.

In the above case positive position of 100 can be paired with either -50 or -25. In order to break the tie, the following ref table called priority matrix helps by defining the order.

+-------------+----------+----------+------
+vePositionAccount|-vePositionAccount| RANK
+-------------+----------+----------+------
  Acc1|            Acc3|              1|     
  Acc1|            Acc2|              2|  

因此,从上面的矩阵中我们知道,行1和3将首先配对,然后行1和2将配对.这就是我们正在讨论的顺序(顺序处理).现在让它们配对如下:

so, from above matrix we know that rowNo 1 and 3 will be paired first and then rowNo 1 and 2. This is the order (sequential processing) that we are talking about. Lets pair them now as below :

+-------------+----------+----------+------+-------------+----------+----------+------
+veROW_NO|+veSECURITY_ID| +veACCOUNT|+vePOSITION|+veROW_NO|+veSECURITY_ID| +veACCOUNT|+vePOSITION|
+-------------+----------+----------+------+-------------+----------+----------+------
  1|      secId|    Acc1|       +100|      3|       secId|   Acc3|       -25|
  1|      secId|    Acc1|       +100|      2|       secId|   Acc2|       -150|

在第2行之后处理第1行会发生什么? (这就是我们所需要的)

What happens when row 1 is processed after row 2 ? (this is what we need)

1.处理第1行后-Acc1中的位置将为(100-25)= 75 Acc3中的位置将为0.Acc1中更新的位置75将现在用于处理第二行.

1.After processing row 1 - Position in Acc1 will be (100 - 25) = 75 Position in Acc3 will be 0. The updated position in Acc1 which is 75 will be now used in processing second row.

2.处理完第2行后-Acc1中的Position将为0.在Acc2中的排名将是(75-150)-75.

2.After processing row 2 - Position in Acc1 will be 0 . Position in Acc2 will be (75-150) -75.

结果数据框:

+-------------+----------+----------+------
ROW_NO|SECURITY_ID| ACCOUNT|POSITION|
+-------------+----------+----------+------
  1|      secId|   Acc1|       0|
  2|      secId|   Acc2|       -75|
  3|      secId|   Acc3|       0|

在第1行之后处理第2行会发生什么? (我们不要这样)

What happens when row 2 is processed after row 1 ? (we dont want this)

  1. 处理完第2行后-Acc1中的位置将为0 Acc2中的位置将为(100-150)-50. Acc1中更新的位置0将立即用于处理第一行.
  2. 处理完第1行后-Acc1中的位置将为0.Acc3中的位置将保持不变,为-25.

结果数据框:

+-------------+----------+----------+------
ROW_NO|SECURITY_ID| ACCOUNT|POSITION|
+-------------+----------+----------+------
  1|      secId|   Acc1|       0|
  2|      secId|   Acc2|       -50|
  3|      secId|   Acc3|       -25|

如上所示,成组处理的顺序决定了我们的输出

As you see above, the order of processing with in a group determines our output

我还想问-为什么不触发在一部分数据帧中进行顺序处理?我们说我们需要集群的并行处理能力.这就是为什么我们将数据帧分为几组,并要求集群将逻辑并行应用于这些组.我们要说的是,如果该组有100行,那么让这100行按顺序依次处理. spark不支持吗?

I also wanted to ask - why does not spark support sequential processing with in a section of dataframe ? we are saying that we need parallel processing capability of the cluster. That is why we are dividing the data frame into groups and asking the cluster to apply the logic on these groups in parallel. All we are saying is if the group has lets say 100 rows, then let these 100 rows are processed 1 after other in an order. Is this not supported by spark ?

如果不是,那么大数据中还有哪些其他技术可以帮助实现这一目标?

If it is not, then what other technology in big data can help acheive that ?

备用实施:

  1. 将数据框划分为与组数一样多的分区(在本例中为50000.组更多,但任何组中的行最多不超过100s).
  2. 对数据帧运行"ForeachPartition"操作,该逻辑中的逻辑是在各个分区之间独立执行的.
  3. 将每个分区的处理结果写入集群.
  4. 在处理完整个数据帧之后,一个单独的工作将从步骤3中读取这些单独的文件,并将其写入单个文件/数据帧.

我怀疑成千上万个分区是否有用,但是想知道这种方法听起来是否不错.

I doubt if thousands of partitions is any good, but yet would like to know if the approach is sounding good.

推荐答案

在遵循以下规则之前,该概念已经足够有效:

The concept works well enough until this rule:

  1. 第二个原因是,当给定的长数量和短数量不相等时,则剩余数量符合条件 配对.即,如果剩余数量多,则可以与之配对 根据优先级,组中下一个可用的短缺数量,或 反之亦然.
  1. Second reason is that, when a given long quantity and short quantity are not equal then the residual quantity is eligible for pairing. i.e., if long quantity is left, then it can be paired with the next short quantity available in the group as per the priority or vice versa.

这是因为您要进行迭代,并使用依赖关系逻辑进行循环,因此难以使用面向流程的Spark进行编码.

This is because you want iteration, looping with dependencies logic, that is difficult to code with Spark which is more flow-oriented.

我还参与了一个陈述了所有内容的项目-做到大 使用Spark,scala或pyspark的数据.作为一名架构师和编码员, 我查看了与您所在地区相似的算法,但没有 相当,其中对于商品来说,所有期间都是一组数据 需要将点分为牛市,熊市或非牛市.像你的 算法,但仍然不同,我不知道循环的数量 做前期.实际上,我需要做一些事情,然后决定 在我经历过的那个时期的左边和右边重复那件事 可能被标记为牛市或熊市或什么都没有.终止 条件是必需的.参见下图.有点像扁平"的二叉树 遍历直到所有路径耗尽.不是那种火花般的感觉.

I also worked on a project where everything was stated - do it in Big Data with Spark, scala or pyspark. Being an architect as well as coder, I looked at the algorithm for something similar to your area, but not quite, in which for commodities all the periods for a set of data points needed to be classified as bull, bear, or not. Like your algorithm, but still different, I did not know what amount of looping to do up-front. In fact I needed to do something, then decide to repeat that something to the left and to the right of a period I had marked as either bull or bear or nothing, potentially. Termination conditions were required. See picture below. Sort of like a 'flat' binary tree traversal until all paths exhausted. Not that Spark-ish.

实际上-为了学术目的-解决了我在 Spark,但这是一项学术练习.问题的关键是 这种类型的处理-我的示例和您的示例很差 适合Spark.我们在ORACLE中进行了这些计算,并且仅进行了 s回 结果保存到Hadoop数据存储中.

I actually - for academic purposes - solved my specific situation in Spark, but it was an academic exercise. The point to the matter is that this type of processing - my example and your example are a poor fit for Spark. We did these calculation in ORACLE and simply sqooped the results to Hadoop datastore.

因此,我的建议是您不要在Spark中尝试此方法,因为它不太适合用例.相信我,它变得凌乱.老实说,对我而言,很明显,这种类型的处理是个问题.但是开始时,这是查询的一个常见方面.

My advice is therefore that you not try this in Spark, as it does not fit the use cases well enough. Trust me, it gets messy. To be honest, it was soon apparent to me that this type of processing was an issue. But when starting out, it is a common aspect to query.

这篇关于spark sql:如何在组级别实现数据帧的并行处理,但在每个组中,我们需要对行进行顺序处理的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆