星火:约嵌套RDD工作 [英] Spark: Work around nested RDD

查看:518
本文介绍了星火:约嵌套RDD工作的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

有两个表。首先表有两个字段的记录 BOOK1 BOOK2 。这些书的id是usualy读取在一起,成双成对。
第二个表都有列图书阅读这些书,其中图书阅读是图书和阅读器的ID,分别为。对于第二个表中每一位读者,我需要对中找到对应的表书。例如,如果读者看书1,2,3和我们对(1,7),(6,2),(4,10)在结果列表中该读者应该有书7,6。

我第一次读者群书,然后遍历对。每本书中对我试着以配合在用户列表中的所有的书:

 进口org.apache.spark.SparkConf
进口org.apache.spark.SparkContext
进口org.apache.spark.SparkContext._
进口org.apache.log4j.Logger
进口org.apache.log4j.Level
进口org.apache.spark.sql.DataFrame
进口org.apache.spark.sql.functions._
对象的简单{  案例类对(BOOK1:智力,BOOK2:智力)
  案例类图书(书:智力,读卡器:智力,名称:字符串)  缬氨酸对=阵列(
    对(1,2),
    对(1,3),
    对(5,7)
  )  VAL testRecs =阵列(
    图书(书= 1,读卡器= 710,名称=BOOK1),
    图书(书= 2,读卡器= 710,名称=BOOK2),
    图书(书= 3,读卡器= 710,名称=BOOK3),
    图书(书= 8,读卡器= 710,名称=book8),
    图书(书= 1,读卡器= 720,名称=BOOK1),
    图书(书= 2,读卡器= 720,名称=BOOK2),
    图书(书= 8,读卡器= 720,名称=book8),
    图书(书= 3,读卡器= 730,名称=BOOK3),
    图书(书= 8,读卡器= 740,名称=book8)
  )  高清主(参数:数组[字符串]){
    Logger.getLogger(org.apache.spark)。执行setLevel(Level.WARN)
    Logger.getLogger(org.eclipse.jetty.server)。执行setLevel(Level.OFF)
    //设置环境
    VAL的conf =新SparkConf()
      .setMaster(本地[5])
      .setAppName(简单)
      .SET(spark.executor.memory,2克)
    VAL SC =新SparkContext(CONF)
    VAL sqlContext =新org.apache.spark.sql.SQLContext(SC)
    进口sqlContext.implicits._    VAL pairsDf = sc.parallelize(对).toDF()
    VAL TESTDATA = sc.parallelize(testRecs)    // ***集团通过试验数据读卡器
    VAL testByReader = testData.map(R = GT;(r.reader,r.book))
    VAL testGroups = testByReader.groupByKey()
    VAL X = testGroups.map(元组=>的元组匹配{
      情况下(用户,bookIter)= GT; matchList(用户,pairsDf,bookIter.toList)
    })
    x.foreach(的println)
  }  高清matchList(用户:智力,DF:数据帧,toMatch:列表[INT])= {
    // VAL X = df.map(R =>(R(0)中,r(1)))---这也将失败!!
    //X
    VAL relatedBooks = df.map(R => {
      VAL BOOK1 = R(0)
      VAL BOOK2 = R(1)
      VAL Z = toMatch.map(书=>
        如果(书== BOOK1)
          列表(BOOK2)
        其他{
          如果(书== BOOK2)名单(BOOK1)
          其他名单()
        } //如果
      )
      z.flatMap(身份)
    })
    (用户,relatedBooks)
  }
}

这导致显示java.lang.NullPointerException (下图)。据我了解,星火不支持嵌套RDDS。请另一种方式来解决这个任务的建议。

  ...
15/06/09 18点59分25秒INFO服务器:码头-8.y.z-快照
15/06/09 18点59分25秒INFO AbstractConnector:开始SocketConnector@0.0.0.0:44837
15/06/09 18时59分26秒INFO服务器:码头-8.y.z-快照
15/06/09 18时59分26秒INFO AbstractConnector:开始SelectChannelConnector@0.0.0.0:4040
[0期:> (0 + 0)/ 10] 15/06/09十八点59分30秒错误执行人:异常的任务在0.0 1.0阶段(TID 5)
显示java.lang.NullPointerException
    在org.apache.spark.sql.DataFrame.schema(DataFrame.scala:253)
    在org.apache.spark.sql.DataFrame.rdd(DataFrame.scala:961)
    在org.apache.spark.sql.DataFrame.map(DataFrame.scala:848)
    在简单的$ .matchList(Simple.scala:60)
    在简单$$ anonfun $ 2.适用(Simple.scala:52)
    在简单$$ anonfun $ 2.适用(Simple.scala:51)
    在scala.collection.Iterator $$不久$ 11.next(Iterator.scala:328)
    在scala.collection.Iterator $ class.foreach(Iterator.scala:727)
    在scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
    在org.apache.spark.rdd.RDD $$ anonfun $ $的foreach 1.适用(RDD.scala:798)
    在org.apache.spark.rdd.RDD $$ anonfun $ $的foreach 1.适用(RDD.scala:798)
    在org.apache.spark.SparkContext $$ anonfun $ runJob $ 5.apply(SparkContext.scala:1498)
    在org.apache.spark.SparkContext $$ anonfun $ runJob $ 5.apply(SparkContext.scala:1498)
    在org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
    在org.apache.spark.scheduler.Task.run(Task.scala:64)
    在org.apache.spark.executor.Executor $ TaskRunner.run(Executor.scala:203)
    在java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    在java.util.concurrent.ThreadPoolExecutor中的$ Worker.run(ThreadPoolExecutor.java:615)
    在java.lang.Thread.run(Thread.java:744)


解决方案

您可以创建两个RDDS。一个用于bookpair,一个用于readerbook,然后通过BOOKID加入这两个RDDS。

  VAL bookpair =阵列((1,2),(2,4),(3,4),(5,6),(4,6),(7, 3))
VAL bookpairRdd = sc.parallelize(bookpair)
VAL readerbook =阵列((富,1),(酒吧,2),(USER1,3),(用户3,4))
VAL readerRdd = sc.parallelize(readerbook).MAP(X => x.swap)
VAL joinedRdd = readerRdd.join(bookpairRdd)
joinedRdd.foreach(的println)(4,(user3,6))
(3,(user1,4))
(2,(巴,4))
(1,(富,2))

There are two tables. First table has records with two fields book1 and book2. These are id's of books that usualy are read together, in pairs. Second table has columns books and readers of these books, where books and readers are book and reader IDs, respectively. For every reader in the second table I need to find corresponding books in the pairs table. For example if reader read books 1,2,3 and we have pairs (1,7), (6,2), (4,10) the resulting list for this reader should have books 7,6.

I first group books by readers and then iterate pairs. Every book in pair I try to match with all books in a user list:

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.log4j.Logger
import org.apache.log4j.Level
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions._


object Simple {

  case class Pair(book1: Int, book2: Int)
  case class Book(book: Int, reader: Int, name:String)

  val pairs = Array(
    Pair(1, 2),
    Pair(1, 3),
    Pair(5, 7)
  )

  val testRecs = Array(
    Book(book = 1, reader = 710, name = "book1"),
    Book(book = 2, reader = 710, name = "book2"),
    Book(book = 3, reader = 710, name = "book3"),
    Book(book = 8, reader = 710, name = "book8"),
    Book(book = 1, reader = 720, name = "book1"),
    Book(book = 2, reader = 720, name = "book2"),
    Book(book = 8, reader = 720, name = "book8"),
    Book(book = 3, reader = 730, name = "book3"),
    Book(book = 8, reader = 740, name = "book8")
  )

  def main(args: Array[String]) {
    Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
    Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)
    // set up environment
    val conf = new SparkConf()
      .setMaster("local[5]")
      .setAppName("Simple")
      .set("spark.executor.memory", "2g")
    val sc = new SparkContext(conf)
    val sqlContext = new org.apache.spark.sql.SQLContext(sc)
    import sqlContext.implicits._

    val pairsDf = sc.parallelize(pairs).toDF()
    val testData = sc.parallelize(testRecs)

    // *** Group test data by reader
    val testByReader = testData.map(r => (r.reader, r.book))
    val testGroups = testByReader.groupByKey()
    val x = testGroups.map(tuple => tuple match {
      case(user, bookIter) => matchList(user,pairsDf, bookIter.toList)
    })
    x.foreach(println)
  }

  def matchList(user:Int, df: DataFrame, toMatch: List[Int]) = {
    //val x = df.map(r => (r(0), r(1))) --- This also fails!!
    //x
    val relatedBooks = df.map(r => {
      val book1 = r(0)
      val book2 = r(1)
      val z = toMatch.map(book =>
        if (book == book1)
          List(book2)
        else {
          if (book == book2) List(book1)
          else List()
        } //if
      )
      z.flatMap(identity)
    })
    (user,relatedBooks)
  }
}

This results in java.lang.NullPointerException (below). As I understand, Spark does not support nested RDDs. Please advise on another way to solve this task.

...
15/06/09 18:59:25 INFO Server: jetty-8.y.z-SNAPSHOT
15/06/09 18:59:25 INFO AbstractConnector: Started SocketConnector@0.0.0.0:44837
15/06/09 18:59:26 INFO Server: jetty-8.y.z-SNAPSHOT
15/06/09 18:59:26 INFO AbstractConnector: Started SelectChannelConnector@0.0.0.0:4040
[Stage 0:>                                                          (0 + 0) / 5]15/06/09 18:59:30 ERROR Executor: Exception in task 0.0 in stage 1.0 (TID 5)
java.lang.NullPointerException
    at org.apache.spark.sql.DataFrame.schema(DataFrame.scala:253)
    at org.apache.spark.sql.DataFrame.rdd(DataFrame.scala:961)
    at org.apache.spark.sql.DataFrame.map(DataFrame.scala:848)
    at Simple$.matchList(Simple.scala:60)
    at Simple$$anonfun$2.apply(Simple.scala:52)
    at Simple$$anonfun$2.apply(Simple.scala:51)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
    at scala.collection.Iterator$class.foreach(Iterator.scala:727)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
    at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:798)
    at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:798)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1498)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1498)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
    at org.apache.spark.scheduler.Task.run(Task.scala:64)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:744)

解决方案

You can create two rdds . One for bookpair and one for readerbook and then join the two rdds by bookid.

val bookpair = Array((1,2),(2,4),(3,4),(5,6),(4,6),(7,3))
val bookpairRdd = sc.parallelize(bookpair)
val readerbook = Array(("foo",1),("bar",2),("user1",3),("user3",4))
val readerRdd = sc.parallelize(readerbook).map(x => x.swap)
val joinedRdd = readerRdd.join(bookpairRdd)
joinedRdd.foreach(println)

(4,(user3,6))
(3,(user1,4))
(2,(bar,4))
(1,(foo,2))

这篇关于星火:约嵌套RDD工作的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆