如何将COGROUP用于大型数据集 [英] How to use COGROUP for large datasets

查看:68
本文介绍了如何将COGROUP用于大型数据集的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有两个rdd's,分别是val tab_a: RDD[(String, String)]val tab_b: RDD[(String, String)].我正在对这些数据集使用cogroup:

I have two rdd's namely val tab_a: RDD[(String, String)] and val tab_b: RDD[(String, String)] I'm using cogroup for those datasets like:

val tab_c = tab_a.cogroup(tab_b).collect.toArray

val updated = tab_c.map { x =>
  {
 //somecode
  }
}

我正在将tab_c分组值用于地图函数,它对于小型数据集工作正常,但在大型数据集的情况下会抛出Out Of Memory exception.

I'm using tab_c cogrouped values for map function and it works fine for small datasets but in case of huge datasets it throws Out Of Memory exception.

我尝试将最终值转换为RDD,但是没有碰到相同的错误

I have tried converting the final value to RDD but no luck same error

val newcos = spark.sparkContext.parallelize(tab_c)

1.如何将Cogroup用于大型数据集?

1.How to use Cogroup for large datasets ?

2.我们可以保留合并值吗?

2.Can we persist the cogrouped value ?

代码

 val source_primary_key = source.map(rec => (rec.split(",")(0), rec))
source_primary_key.persist(StorageLevel.DISK_ONLY)

val destination_primary_key = destination.map(rec => (rec.split(",")(0), rec))
destination_primary_key.persist(StorageLevel.DISK_ONLY)

val cos = source_primary_key.cogroup(destination_primary_key).repartition(10).collect()

  var srcmis: Array[String] = new Array[String](0)
var destmis: Array[String] = new Array[String](0)

var extrainsrc: Array[String] = new Array[String](0)
var extraindest: Array[String] = new Array[String](0)

var srcs: String = Seq("")(0)
var destt: String = Seq("")(0)

val updated = cos.map { x =>
  {

    val key = x._1
    val value = x._2

    srcs = value._1.mkString(",")
    destt = value._2.mkString(",")

    if (srcs.equalsIgnoreCase(destt) == false && destt != "") {
      srcmis :+= srcs
      destmis :+= destt

    }

    if (srcs == "") {

      extraindest :+= destt.mkString("")
    }

    if (destt == "") {

      extrainsrc :+= srcs.mkString("")
    }

  }

}

代码已更新:

  val tab_c = tab_a.cogroup(tab_b).filter(x => x._2._1 =!= x => x._2._2)
 // tab_c = {1,Compactbuffer(1,john,US),Compactbuffer(1,john,UK)}
      {2,Compactbuffer(2,john,US),Compactbuffer(2,johnson,UK)}..

错误:

 ERROR LiveListenerBus: SparkListenerBus has already stopped! Dropping event SparkListenerTaskEnd(4,3,ResultTask,FetchFailed(null,0,-1,27,org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle 0
at org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$2.apply(MapOutputTracker.scala:697)
at org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$2.apply(MapOutputTracker.scala:693)


ERROR YarnScheduler: Lost executor 8 on datanode1: Container killed by YARN for exceeding memory limits. 1.0 GB of 1020 MB physical memory used. Consider boosting spark.yarn.executor.memoryOverhead.

谢谢

推荐答案

使用collect()时,基本上是在告诉spark将所有结果数据移回主节点,这很容易产生瓶颈.那时,您不再使用Spark,只需在一台机器上使用普通数组即可.

When you use collect() you are basically telling spark to move all the resulting data back to the master node, which can easily produce a bottleneck. You are no longer using Spark at that point, just a plain array in a single machine.

要触发计算,只需使用在每个节点上都需要数据的东西,这就是执行器位于分布式文件系统之上的原因.例如saveAsTextFile().

To trigger computation just use something that requires the data at every node, that's why executors live on top of a distributed file system. For instance saveAsTextFile().

这是一些基本示例.

请记住,这里的整个目标(即,如果您有大数据)是将代码移到数据中并在那里进行计算,而不是将所有数据都带到计算中.

Remember, the entire objective here (that is, if you have big data) is to move the code to your data and compute there, not to bring all the data to the computation.

这篇关于如何将COGROUP用于大型数据集的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆