为什么可变映射在Spark的UserDefinedAggregateFunction(UDAF)中自动变为不可变 [英] Why Mutable map becomes immutable automatically in UserDefinedAggregateFunction(UDAF) in Spark
问题描述
我正在尝试在Spark中定义一个UserDefinedAggregateFunction(UDAF),该函数可对组的一列中每个唯一值的出现次数进行计数.
I am trying to define a UserDefinedAggregateFunction(UDAF) in Spark, which counts the number of occurrences for each unique values in a column of a group.
这是一个示例:
假设我有一个像这样的数据框df
,
This is an example:
Suppose I have a dataframe df
like this,
+----+----+
|col1|col2|
+----+----+
| a| a1|
| a| a1|
| a| a2|
| b| b1|
| b| b2|
| b| b3|
| b| b1|
| b| b1|
+----+----+
我将拥有一个UDAF DistinctValues
I will have a UDAF DistinctValues
val func = new DistinctValues
然后将其应用于数据框df
Then I apply it to the dataframe df
val agg_value = df.groupBy("col1").agg(func(col("col2")).as("DV"))
我希望有这样的东西:
+----+--------------------------+
|col1|DV |
+----+--------------------------+
| a| Map(a1->2, a2->1) |
| b| Map(b1->3, b2->1, b3->1)|
+----+--------------------------+
所以我想到了这样的UDAF,
So I came out with a UDAF like this,
import org.apache.spark.sql.expressions.MutableAggregationBuffer
import org.apache.spark.sql.expressions.UserDefinedAggregateFunction
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types.StructField
import org.apache.spark.sql.types.DataType
import org.apache.spark.sql.types.ArrayType
import org.apache.spark.sql.types.StringType
import org.apache.spark.sql.types.MapType
import org.apache.spark.sql.types.LongType
import Array._
class DistinctValues extends UserDefinedAggregateFunction {
def inputSchema: org.apache.spark.sql.types.StructType = StructType(StructField("value", StringType) :: Nil)
def bufferSchema: StructType = StructType(StructField("values", MapType(StringType, LongType))::Nil)
def dataType: DataType = MapType(StringType, LongType)
def deterministic: Boolean = true
def initialize(buffer: MutableAggregationBuffer): Unit = {
buffer(0) = scala.collection.mutable.Map()
}
def update(buffer: MutableAggregationBuffer, input: Row) : Unit = {
val str = input.getAs[String](0)
var mp = buffer.getAs[scala.collection.mutable.Map[String, Long]](0)
var c:Long = mp.getOrElse(str, 0)
c = c + 1
mp.put(str, c)
buffer(0) = mp
}
def merge(buffer1: MutableAggregationBuffer, buffer2: Row) : Unit = {
var mp1 = buffer1.getAs[scala.collection.mutable.Map[String, Long]](0)
var mp2 = buffer2.getAs[scala.collection.mutable.Map[String, Long]](0)
mp2 foreach {
case (k ,v) => {
var c:Long = mp1.getOrElse(k, 0)
c = c + v
mp1.put(k ,c)
}
}
buffer1(0) = mp1
}
def evaluate(buffer: Row): Any = {
buffer.getAs[scala.collection.mutable.Map[String, LongType]](0)
}
}
然后我在数据框中具有此功能,
Then I have this function on my dataframe,
val func = new DistinctValues
val agg_values = df.groupBy("col1").agg(func(col("col2")).as("DV"))
出现了这样的错误,
func: DistinctValues = $iwC$$iwC$DistinctValues@17f48a25
org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 32.0 failed 4 times, most recent failure: Lost task 1.3 in stage 32.0 (TID 884, ip-172-31-22-166.ec2.internal): java.lang.ClassCastException: scala.collection.immutable.Map$EmptyMap$ cannot be cast to scala.collection.mutable.Map
at $iwC$$iwC$DistinctValues.update(<console>:39)
at org.apache.spark.sql.execution.aggregate.ScalaUDAF.update(udaf.scala:431)
at org.apache.spark.sql.execution.aggregate.AggregationIterator$$anonfun$12.apply(AggregationIterator.scala:187)
at org.apache.spark.sql.execution.aggregate.AggregationIterator$$anonfun$12.apply(AggregationIterator.scala:180)
at org.apache.spark.sql.execution.aggregate.SortBasedAggregationIterator.processCurrentSortedGroup(SortBasedAggregationIterator.scala:116)
at org.apache.spark.sql.execution.aggregate.SortBasedAggregationIterator.next(SortBasedAggregationIterator.scala:152)
at org.apache.spark.sql.execution.aggregate.SortBasedAggregationIterator.next(SortBasedAggregationIterator.scala:29)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:149)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:89)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
在update(buffer: MutableAggregationBuffer, input: Row)
方法中,变量buffer
是一个immutable.Map
,程序厌倦了将其强制转换为mutable.Map
,
It looks like in the update(buffer: MutableAggregationBuffer, input: Row)
method, the variable buffer
is a immutable.Map
, the program tired to cast it to mutable.Map
,
但是我用mutable.Map
在initialize(buffer: MutableAggregationBuffer, input:Row)
方法中初始化了buffer
变量.传递给update
方法的变量是否相同?而且buffer
也是mutableAggregationBuffer
,所以它应该是可变的,对吧?
But I used mutable.Map
to initialize buffer
variable in initialize(buffer: MutableAggregationBuffer, input:Row)
method. Is it the same variable passed to update
method? And also buffer
is mutableAggregationBuffer
, so it should be mutable, right?
为什么我的mutable.Map变成不可变的?有人知道发生了什么吗?
Why my mutable.Map became immutable? Does anyone know what happened?
我确实需要此函数中的可变Map来完成任务.我知道有一种变通方法,可以从不可变映射创建可变映射,然后对其进行更新.但是我真的很想知道为什么可变的变量会在程序中自动转换为不可变的变量,对我来说这没有意义.
I really need a mutable Map in this function to complete the task. I know there is a workaround to create a mutable map from the immutable map, then update it. But I really want to know why the mutable one transforms to immutable one in the program automatically, it doesn't make sense to me.
推荐答案
相信它是您StructType
中的MapType
.因此buffer
拥有一个Map
,这是不可变的.
Believe it is the MapType
in your StructType
. buffer
therefore holds a Map
, which would be immutable.
您可以将其转换,但是为什么不将其保持不变并执行以下操作:
You can convert it, but why don't you just leave it immutable and do this:
mp = mp + (k -> c)
向不可变的Map
添加条目?
下面的工作示例:
class DistinctValues extends UserDefinedAggregateFunction {
def inputSchema: org.apache.spark.sql.types.StructType = StructType(StructField("_2", IntegerType) :: Nil)
def bufferSchema: StructType = StructType(StructField("values", MapType(StringType, LongType))::Nil)
def dataType: DataType = MapType(StringType, LongType)
def deterministic: Boolean = true
def initialize(buffer: MutableAggregationBuffer): Unit = {
buffer(0) = Map()
}
def update(buffer: MutableAggregationBuffer, input: Row) : Unit = {
val str = input.getAs[String](0)
var mp = buffer.getAs[Map[String, Long]](0)
var c:Long = mp.getOrElse(str, 0)
c = c + 1
mp = mp + (str -> c)
buffer(0) = mp
}
def merge(buffer1: MutableAggregationBuffer, buffer2: Row) : Unit = {
var mp1 = buffer1.getAs[Map[String, Long]](0)
var mp2 = buffer2.getAs[Map[String, Long]](0)
mp2 foreach {
case (k ,v) => {
var c:Long = mp1.getOrElse(k, 0)
c = c + v
mp1 = mp1 + (k -> c)
}
}
buffer1(0) = mp1
}
def evaluate(buffer: Row): Any = {
buffer.getAs[Map[String, LongType]](0)
}
}
这篇关于为什么可变映射在Spark的UserDefinedAggregateFunction(UDAF)中自动变为不可变的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!