迭代RDD和更新可变集合将返回一个空集合 [英] Iterating an RDD and updating a mutable collection returns an empty collection

查看:112
本文介绍了迭代RDD和更新可变集合将返回一个空集合的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我是Scala和Spark的新手,希望对理解以下代码为何不能产生我所期望的结果有所帮助.

I am new to Scala and Spark and would like some help in understanding why the below code isn't producing my desired outcome.

我正在比较两个表

我想要的输出模式是:

case class DiscrepancyData(fieldKey:String, fieldName:String, val1:String, val2:String, valExpected:String)

当我手动逐步运行以下代码时,实际上最终得到了想要的结果. List[DiscrepancyData]完全填充了我想要的输出.但是,我必须在下面的代码中丢失一些东西,因为它返回一个空列表(在调用此代码之前,从HIVE中读取表,映射,分组,过滤等等等还涉及其他代码):

When I run the below code step by step manually, I actually end up with my desired outcome. Which is a List[DiscrepancyData] completely populated with my desired output. However, I must be missing something in the code below because it returns an empty list (before this code gets called there are other codes that is involved in reading tables from HIVE, mapping, grouping, filtering, etc etc etc):

val compareCols  = Set(year, nominal, adjusted_for_inflation, average_private_nonsupervisory_wage)

val key = "year"

def compare(table:RDD[(String, Iterable[Row])]): List[DiscrepancyData] = {
    var discs: ListBuffer[DiscrepancyData] = ListBuffer()
    def compareFields(fieldOne:String, fieldTwo:String, colName:String, row1:Row, row2:Row): DiscrepancyData = {
        if (fieldOne != fieldTwo){
            DiscrepancyData(
                row1.getAs(key).toString, //fieldKey
                colName, //fieldName
                row1.getAs(colName).toString, //table1Value
                row2.getAs(colName).toString, //table2Value
                row2.getAs(colName).toString) //expectedValue
        }
        else null
    }
    def comparison() {
        for(row <- table){
            var elem1 = row._2.head //gets the first element in the iterable
            var elem2 = row._2.tail.head //gets the second element in the iterable

            for(col <- compareCols){
                var value1 = elem1.getAs(col).toString
                var value2 = elem2.getAs(col).toString

                var disc = compareFields(value1, value2, col, elem1, elem2)

                if (disc != null) discs += disc
            }
        }
    }

    comparison()

    discs.toList
}

我正在这样调用上述函数:

I'm calling the above function as such:

var outcome = compare(groupedFiltered)

这是groupedFiltered中的数据:

Here is the data in groupedFiltered:

(1991,CompactBuffer([1991,7.14,5.72,39%], [1991,4.14,5.72,39%]))
(1997,CompactBuffer([1997,4.88,5.86,39%], [1997,3.88,5.86,39%]))
(1999,CompactBuffer([1999,5.15,5.96,39%], [1999,5.15,5.97,38%]))
(1947,CompactBuffer([1947,0.9,2.94,35%], [1947,0.4,2.94,35%]))
(1980,CompactBuffer([1980,3.1,6.88,45%], [1980,3.1,6.88,48%]))
(1981,CompactBuffer([1981,3.15,6.8,45%], [1981,3.35,6.8,45%]))

groupedFiltered的表模式:

The table schema for groupedFiltered:

(year String, 
nominal Double,
adjusted_for_inflation Double, 
average_provate_nonsupervisory_wage String)

推荐答案

Spark是一个分布式计算引擎.在经典单节点计算的代码正在做什么"旁边,使用Spark我们还需要考虑代码在哪里运行"

Spark is a distributed computing engine. Next to "what the code is doing" of classic single-node computing, with Spark we also need to consider "where the code is running"

让我们检查一下上面表达式的简化版本:

Let's inspect a simplified version of the expression above:

val records: RDD[List[String]] = ??? //whatever data
var list:mutable.List[String] = List()
for {record <- records
     entry <- records } 
    { list += entry }

scala for-comprehension使此表达式看起来像是自然的本地计算,但实际上RDD操作已序列化并装运"给执行程序,执行程序将在内部执行内部操作.我们可以这样重写上面的内容:

The scala for-comprehension makes this expression look like a natural local computation, but in reality the RDD operations are serialized and "shipped" to executors, where the inner operation will be executed locally. We can rewrite the above like this:

records.foreach{ record =>     //RDD.foreach => serializes closure and executes remotely
     record.foreach{entry =>   //record.foreach => local operation on the record collection
        list += entry          // this mutable list object is updated in each executor but never sent back to the driver. All updates are lost  
     }
}

在分布式计算中,可变对象通常是一成不变的.想象一下,一个执行者添加了一条记录,而另一位执行者将其删除了,正确的结果是什么?还是每个执行者都有不同的价值,这才是正确的选择?

Mutable objects are in general a no-go in distributed computing. Imagine that one executor adds a record and another one removes it, what's the correct result? Or that each executor comes to a different value, which is the right one?

要实现上述操作,我们需要将数据转换为所需的结果.

To implement the operation above, we need to transform the data into our desired result.

我将从另一个最佳实践开始:不要将null用作返回值.我也将行操作移到了函数中.让我们牢记这一点来重写比较操作:

I'd start by applying another best practice: Do not use null as return value. I also moved the row ops into the function. Lets rewrite the comparison operation with this in mind:

def compareFields(colName:String, row1:Row, row2:Row): Option[DiscrepancyData] = {
    val key = "year"
    val v1 = row1.getAs(colName).toString
    val v2 = row2.getAs(colName).toString
    if (v1 != v2){
        Some(DiscrepancyData(
            row1.getAs(key).toString, //fieldKey
            colName, //fieldName
            v1, //table1Value
            v2, //table2Value
            v2) //expectedValue
        )
    } else None
}

现在,我们可以将差异的计算重写为初始table数据的转换:

Now, we can rewrite the computation of discrepancies as a transformation of the initial table data:

val discrepancies = table.flatMap{case (str, row) =>
    compareCols.flatMap{col => compareFields(col, row.next, row.next) }   
}

我们现在也可以使用for-comprehension表示法,因为我们了解运行的地方:

We can also use the for-comprehension notation, now that we understand where things are running:

val discrepancies = for {
    (str,row) <- table
    col <- compareCols
    dis <- compareFields(col, row.next, row.next)
} yield dis

请注意,discrepancies的类型为RDD[Discrepancy].如果我们想获取驱动程序的实际值,我们需要:

Note that discrepancies is of type RDD[Discrepancy]. If we want to get the actual values to the driver we need to:

val materializedDiscrepancies = discrepancies.collect()

这篇关于迭代RDD和更新可变集合将返回一个空集合的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆