如何将 COGROUP 用于大型数据集 [英] How to use COGROUP for large datasets

查看:18
本文介绍了如何将 COGROUP 用于大型数据集的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有两个 rdd,即 val tab_a: RDD[(String, String)]val tab_b: RDD[(String, String)]代码> 我正在使用 cogroup 来处理这些数据集,例如:

I have two rdd's namely val tab_a: RDD[(String, String)] and val tab_b: RDD[(String, String)] I'm using cogroup for those datasets like:

val tab_c = tab_a.cogroup(tab_b).collect.toArray

val updated = tab_c.map { x =>
  {
 //somecode
  }
}

我正在使用 tab_c 组合值作为 map 函数,它适用于小数据集,但在大数据集的情况下,它会引发 Out Of Memory 异常.

I'm using tab_c cogrouped values for map function and it works fine for small datasets but in case of huge datasets it throws Out Of Memory exception.

我尝试将最终值转换为 RDD 但没有运气同样的错误

I have tried converting the final value to RDD but no luck same error

val newcos = spark.sparkContext.parallelize(tab_c)

1.大型数据集如何使用Cogroup?

1.How to use Cogroup for large datasets ?

2.我们可以持久化组合值吗?

2.Can we persist the cogrouped value ?

代码

 val source_primary_key = source.map(rec => (rec.split(",")(0), rec))
source_primary_key.persist(StorageLevel.DISK_ONLY)

val destination_primary_key = destination.map(rec => (rec.split(",")(0), rec))
destination_primary_key.persist(StorageLevel.DISK_ONLY)

val cos = source_primary_key.cogroup(destination_primary_key).repartition(10).collect()

  var srcmis: Array[String] = new Array[String](0)
var destmis: Array[String] = new Array[String](0)

var extrainsrc: Array[String] = new Array[String](0)
var extraindest: Array[String] = new Array[String](0)

var srcs: String = Seq("")(0)
var destt: String = Seq("")(0)

val updated = cos.map { x =>
  {

    val key = x._1
    val value = x._2

    srcs = value._1.mkString(",")
    destt = value._2.mkString(",")

    if (srcs.equalsIgnoreCase(destt) == false && destt != "") {
      srcmis :+= srcs
      destmis :+= destt

    }

    if (srcs == "") {

      extraindest :+= destt.mkString("")
    }

    if (destt == "") {

      extrainsrc :+= srcs.mkString("")
    }

  }

}

代码更新:

  val tab_c = tab_a.cogroup(tab_b).filter(x => x._2._1 =!= x => x._2._2)
 // tab_c = {1,Compactbuffer(1,john,US),Compactbuffer(1,john,UK)}
      {2,Compactbuffer(2,john,US),Compactbuffer(2,johnson,UK)}..

错误:

 ERROR LiveListenerBus: SparkListenerBus has already stopped! Dropping event SparkListenerTaskEnd(4,3,ResultTask,FetchFailed(null,0,-1,27,org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle 0
at org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$2.apply(MapOutputTracker.scala:697)
at org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$2.apply(MapOutputTracker.scala:693)


ERROR YarnScheduler: Lost executor 8 on datanode1: Container killed by YARN for exceeding memory limits. 1.0 GB of 1020 MB physical memory used. Consider boosting spark.yarn.executor.memoryOverhead.

谢谢

推荐答案

当您使用 collect() 时,您基本上是在告诉 spark 将所有结果数据移回主节点,这很容易产生瓶颈.那时你不再使用 Spark,只是一台机器上的普通数组.

When you use collect() you are basically telling spark to move all the resulting data back to the master node, which can easily produce a bottleneck. You are no longer using Spark at that point, just a plain array in a single machine.

要触发计算,只需使用需要每个节点上的数据的东西,这就是执行器位于分布式文件系统之上的原因.例如 saveAsTextFile().

To trigger computation just use something that requires the data at every node, that's why executors live on top of a distributed file system. For instance saveAsTextFile().

以下是一些基本示例.

请记住,这里的整个目标(即,如果您有大数据)是将代码移动到您的数据并在那里进行计算,而不是将所有数据都用于计算.

Remember, the entire objective here (that is, if you have big data) is to move the code to your data and compute there, not to bring all the data to the computation.

这篇关于如何将 COGROUP 用于大型数据集的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆