Apache Spark中的递归方法调用 [英] Recursive method call in Apache Spark

查看:258
本文介绍了Apache Spark中的递归方法调用的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在Apache Spark上的数据库中构建家族树,使用递归搜索来找到数据库中每个人的最终父母(即家族树顶部的人).

I'm building a family tree from a database on Apache Spark, using a recursive search to find the ultimate parent (i.e. the person at the top of the family tree) for each person in the DB.

假设在搜索其ID时返回的第一个人是正确的父母

It is assumed that the first person returned when searching for their id is the correct parent

val peopleById = peopleRDD.keyBy(f => f.id)
def findUltimateParentId(personId: String) : String = {

    if((personId == null) || (personId.length() == 0))
        return "-1"

    val personSeq = peopleById.lookup(personId)
    val person = personSeq(0)
    if(person.personId == "0 "|| person.id == person.parentId) {

        return person.id

    }
    else {

        return findUltimateParentId(person.parentId)

    }

}

val ultimateParentIds = peopleRDD.foreach(f => f.findUltimateParentId(f.parentId))

出现以下错误

由以下原因引起:org.apache.spark.SparkException:RDD转换和操作只能由驱动程序调用,而不能在其他转换内部调用;例如,rdd1.map(x => rdd2.values.count() * x)无效,因为值转换和计数操作不能为在rdd1.map转换内部执行.有关更多信息,请参阅SPARK-5063."

"Caused by: org.apache.spark.SparkException: RDD transformations and actions can only be invoked by the driver, not inside of other transformations; for example, rdd1.map(x => rdd2.values.count() * x) is invalid because the values transformation and count action cannot be performed inside of the rdd1.map transformation. For more information, see SPARK-5063."

通过阅读其他类似的问题,我了解到问题是我正在foreach循环中调用findUltimateParentId,如果我使用具有人员ID的shell调用该方法,它将返回正确的最终

I understand from reading other similar questions that the problem is that I'm calling the findUltimateParentId from within the foreach loop, and if I call the method from the shell with a person's id, it returns the correct ultimate parent id

但是,没有其他建议的解决方案对我有用,或者至少我看不到如何在我的程序中实现它们,任何人都可以帮忙吗?

However, none of the other suggested solutions work for me, or at least I can't see how to implement them in my program, can anyone help?

推荐答案

使用SparkContext.broadcast修复了此问题:

fixed this by using SparkContext.broadcast:

val peopleById = peopleRDD.keyBy(f => f.id)
val broadcastedPeople = sc.broadcast(peopleById.collectAsMap())

def findUltimateParentId(personId: String) : String = {

    if((personId == null) || (personId.length() == 0))
        return "-1"

    val personOption = broadcastedPeople.value.get(personId)
    if(personOption.isEmpty) {

        return "0";

    }
    val person = personOption.get
    if(person.personId == 0 || person.orgId == person.personId) {

        return person.id

    }
    else {

        return findUltimateParentId(person.parentId)

    }

}

val ultimateParentIds = peopleRDD.foreach(f => f.findUltimateParentId(f.parentId))

现在工作很好!

这篇关于Apache Spark中的递归方法调用的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆