RDD 转换和操作只能由驱动程序调用 [英] RDD transformations and actions can only be invoked by the driver

查看:21
本文介绍了RDD 转换和操作只能由驱动程序调用的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

错误:

org.apache.spark.SparkException: RDD 转换和动作只能由驱动程序调用,不能在其他转换内部调用;例如,rdd1.map(x => rdd2.values.count() * x) 是无效的,因为值转换和计数操作不能在 rdd1.map 转换内部执行.有关详细信息,请参阅 SPARK-5063.

org.apache.spark.SparkException: RDD transformations and actions can only be invoked by the driver, not inside of other transformations; for example, rdd1.map(x => rdd2.values.count() * x) is invalid because the values transformation and count action cannot be performed inside of the rdd1.map transformation. For more information, see SPARK-5063.

def computeRatio(model: MatrixFactorizationModel, test_data: org.apache.spark.rdd.RDD[Rating]): Double = {
  val numDistinctUsers = test_data.map(x => x.user).distinct().count()
  val userRecs: RDD[(Int, Set[Int], Set[Int])] = test_data.groupBy(testUser => testUser.user).map(u => {
    (u._1, u._2.map(p => p.product).toSet, model.recommendProducts(u._1, 20).map(prec => prec.product).toSet)
  })
  val hitsAndMiss: RDD[(Int, Double)] = userRecs.map(x => (x._1, x._2.intersect(x._3).size.toDouble))

  val hits = hitsAndMiss.map(x => x._2).sum() / numDistinctUsers

  return hits
}

我正在使用 MatrixFactorizationModel.scala 中的方法,我必须映射用户,然后调用该方法以获取每个用户的结果.通过这样做,我引入了嵌套映射,我认为这会导致问题:

I am using the method in MatrixFactorizationModel.scala, I have to map over users and then call the method to get the results for each user. By doing that I introduce nested mapping which I believe cause the issue:

我知道这个问题实际上发生在:

I know that issue actually take place at:

val userRecs: RDD[(Int, Set[Int], Set[Int])] = test_data.groupBy(testUser => testUser.user).map(u => {
  (u._1, u._2.map(p => p.product).toSet, model.recommendProducts(u._1, 20).map(prec => prec.product).toSet)
})

因为在映射时我正在调用 model.recommendProducts

Because while mapping over I am calling model.recommendProducts

推荐答案

MatrixFactorizationModel 是一种分布式模型,因此您不能简单地从操作或转换中调用它.与您在这里所做的最接近的事情是这样的:

MatrixFactorizationModel is a distributed model so you cannot simply call it from an action or a transformation. The closest thing to what you do here is something like this:

import org.apache.spark.rdd.RDD
import org.apache.spark.mllib.recommendation.{MatrixFactorizationModel, Rating}

def computeRatio(model: MatrixFactorizationModel, testUsers: RDD[Rating]) = {
  val testData = testUsers.map(r => (r.user, r.product)).groupByKey
  val n = testData.count

  val recommendations = model
     .recommendProductsForUsers(20)
     .mapValues(_.map(r => r.product))

  val hits = testData
    .join(recommendations)
    .values
    .map{case (xs, ys) => xs.toSet.intersect(ys.toSet).size}
    .sum

  hits / n
}

注意事项:

  • distinct 是一项昂贵的操作,在这里完全过时,因为您可以从分组数据中获取相同的信息
  • 代替groupBy后跟投影(map),先投影后分组.如果您只需要产品 ID,则没有理由转移完整评分.
  • distinct is an expensive operation and completely obsoletely here since you can obtain the same information from a grouped data
  • instead of groupBy followed by projection (map), project first and group later. There is no reason to transfer full ratings if you want only a product ids.

这篇关于RDD 转换和操作只能由驱动程序调用的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆