聚合如何在 Scala 中工作? [英] How does aggregate work in scala?

查看:43
本文介绍了聚合如何在 Scala 中工作?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我知道普通聚合在 Scala 中的工作原理以及它的过度使用.尝试了很多以了解以下代码的工作原理,但不能.有人可以帮我解释它是如何工作的,并给我一个 (10,4) 的输出

val input=List(1,2,3,4)val 结果 = input.aggregate((0, 0))((acc, value) =>(acc._1 + 值,acc._2 + 1),(acc1, acc2) =>(acc1._1 + acc2._1, acc1._2 + acc2._2))

解决方案

有人可以帮我解释它是如何工作的并给我一个输出(10,4)

使用聚合时,您提供三个参数:

  • 从分区中累积元素的初始值,通常是中性元素
  • 一个给定一个分区的函数,将在其中累积结果

  • 合并两个分区的函数

因此,在您的情况下,分区的初始值是元组 (0, 0).

然后,您定义的累加器函数会将您正在遍历的当前元素与元组的第一个元素相加,并将元组的第二个元素加一.实际上,它将计算分区中元素的总和及其元素数.

组合器函数组合了两个元组.正如您定义的那样,它将求和并计算 2 个分区的元素数.在您的情况下不使用它,因为您按顺序遍历管道.您可以在 List 上调用 .par 以便获得并行实现以查看组合器的运行情况(注意它必须是一个关联函数).

因此你得到 (10, 4) 因为 1+2+3+4=10 并且列表中有 4 个元素(你做了 4 个添加).

您可以在累加器函数中添加打印语句(在顺序输入上运行),以查看其行为:

Acc: (0,0) - value:1累积:(1,1) - 值:2Acc:(3,2) - 值:3Acc:(6,3) - 值:4

<块引用>

我知道普通聚合如何在 Scala 中工作以及它的过度使用.

对于顺序输入,aggregate 是一个 foldLeft:

def aggregate[B](z: =>B)(seqop: (B, A) => B, combop: (B, B) => B): B = foldLeft(z)(seqop)

对于并行输入,列表被分成块,以便多个线程可以单独工作.使用初始值在每个块上运行累加器函数.当两个线程需要合并他们的结果时,使用 combine 函数:

def aggregate[S](z: =>S)(seqop: (S, T) => S, combop: (S, S) => S): S = {tasksupport.executeAndWaitResult(new Aggregate(() => z, seqop, combop, splitter))}

这是 fork-join 模型的原理,但它要求您的任务可以很好地并行化.这里就是这种情况,因为一个线程不需要知道另一个线程的结果来完成它的工作.

I knew how a normal aggregate works in scala and its use over fold. Tried a lot to know how the below code works, but couldn't. Could someone help me in explaining how it works and gives me a output of (10,4)

val input=List(1,2,3,4)
val result = input.aggregate((0, 0))(
(acc, value) => (acc._1 + value, acc._2 + 1),
(acc1, acc2) => (acc1._1 + acc2._1, acc1._2 + acc2._2))

解决方案

Could someone help me in explaining how it works and gives me a output of (10,4)

When using aggregate, you provide three parameters:

  • the initial value from which you accumulate elements from a partition, often it's the neutral element
  • a function that given a partition, will accumulate the result within it

  • a function that will combine two partitions

So in your case, the initial value for a partition is the tuple (0, 0).

Then the accumulator function you defined will sum the current element you're traversing with the first element of the tuple and increment the second element of the tuple by one. In fact, it will compute the sum of the elements in a partition and its number of elements.

The combiner function combined two tuples. As you defined it, it will sum the sums and count the number of elements of 2 partitions. It's not used in your case because you traverse the pipeline sequentially. You could call .par on the List so that you get a parallel implementation to see the combiner in action (note that it has to be an associative function).

Thus you get (10, 4) because 1+2+3+4=10 and there was 4 elements in the list (you did 4 additions).

You could add a print statement in the accumulator function (running on a sequential input), to see how it behaves:

Acc: (0,0) - value:1
Acc: (1,1) - value:2
Acc: (3,2) - value:3
Acc: (6,3) - value:4

I knew how a normal aggregate works in scala and its use over fold.

For a sequential input, aggregate is a foldLeft:

def aggregate[B](z: =>B)(seqop: (B, A) => B, combop: (B, B) => B): B = foldLeft(z)(seqop)

For a parallel input, the list is split into chunks so that multiple threads can work separately. The accumulator function is run on each chunk, using the initial value. When two threads need to merge their results, the combine function is used:

def aggregate[S](z: =>S)(seqop: (S, T) => S, combop: (S, S) => S): S = {
  tasksupport.executeAndWaitResult(new Aggregate(() => z, seqop, combop, splitter))
}

This is the principle of the fork-join model but it requires that your task can be parallelizable well. It's the case here, because a thread does not need to know the result of another thread to do its job.

这篇关于聚合如何在 Scala 中工作?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆