如何在群集中查找在RDD中存储给定元素并向其发送消息的机器? [英] How to find out the machine in the cluster which stores a given element in RDD and send a message to it?

查看:71
本文介绍了如何在群集中查找在RDD中存储给定元素并向其发送消息的机器?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我想知道是否在RDD中,例如RDD = {"0", "1", "2",... "99999"},可以在集群中找到存储给定元素(例如:100)的机器吗?

I want to know if in an RDD, for example, RDD = {"0", "1", "2",... "99999"}, can I find out the machine in the cluster which stores a given element (e.g.: 100)?

然后我可以随机播放一些数据并将其发送到特定计算机吗?我知道RDD的分区对用户是透明的,但是我可以使用键/值之类的方法来实现这一点吗?

And then in shuffle, can I aggregate some data and send it to the certain machine? I know that the partition of RDD is transparent for users but could I use some method like key/value to achieve that?

推荐答案

通常来说,使用RDD API的答案是否定的.如果可以使用图形表示逻辑,则可以尝试在GraphX或Giraph中使用基于消息的API.如果不能,那么直接使用Akka代替Spark可能是更好的选择.

Generally speaking the answer is no or at least not with RDD API. If you can express your logic using graphs then you can try message based API in GraphX or Giraph. If not then using Akka directly instead of Spark could be a better choice.

仍然,有一些解决方法,但是我不希望有高性能.让我们从一些虚拟数据开始:

Still, there are some workarounds but I wouldn't expect high performance. Lets start with some dummy data:

import org.apache.spark.rdd.RDD

val toPairs = (s: Range) => s.map(_.toChar.toString)

val rdd: RDD[(Int, String)] = sc.parallelize(Seq(
  (0, toPairs(97 to 100)), // a-d
  (1, toPairs(101 to 107)), // e-k
  (2, toPairs(108 to 115)) // l-s
)).flatMap{ case (i, vs) => vs.map(v => (i, v)) }

并使用自定义分区程序对其进行分区:

and partition it using custom partitioner:

import org.apache.spark.Partitioner

class IdentityPartitioner(n: Int) extends Partitioner {
  def numPartitions: Int = n
  def getPartition(key: Any): Int = key.asInstanceOf[Int]
}

val partitioner = new IdentityPartitioner(4)
val parts = rdd.partitionBy(partitioner)

现在,我们具有4个分区的RDD,其中1个为空:

Now we have RDD with 4 partitions including one empty:

parts.mapPartitionsWithIndex((i, iter) => Iterator((i, iter.size))).collect
// Array[(Int, Int)] = Array((0,4), (1,7), (2,8), (3,0))

  • 您可以做的最简单的事情就是利用分区本身.首先是一个虚拟函数和一个辅助函数:

    • The simplest thing you can do is to leverage partitioning itself. First a dummy function and a helper:

      // Dummy map function
      def transform(s: String) =
        Map("e" -> "x", "k" -> "y", "l" -> "z").withDefault(identity)(s)
      
      // Map String to partition
      def address(curr: Int, s: String) = {
        val m = Map("x" -> 3, "y" -> 3, "z" -> 3).withDefault(x => curr)
        (m(s), s)
      }
      

      并发送"数据:

      val transformed: RDD[(Int, String)] = parts
        // Emit pairs (partition, string)
        .map{case (i, s) => address(i, transform(s))}
        // Repartition
        .partitionBy(partitioner)
      
      transformed
        .mapPartitionsWithIndex((i, iter) => Iterator((i, iter.size)))
        .collect
      // Array[(Int, Int)] = Array((0,4), (1,5), (2,7), (3,3))
      

    • 另一种方法是收集消息":

    • another approach is to collect "messages":

      val tmp = parts.mapValues(s => transform(s))
      
      val messages: Map[Int,Iterable[String]] = tmp
        .flatMap{case (i, s) => {
           val target = address(i, s)
           if (target != (i, s)) Seq(target) else Seq()
         }}
        .groupByKey
        .collectAsMap
      

      创建广播

      val messagesBD = sc.broadcast(messages)
      

      并使用它发送消息:

      val transformed = tmp
        .filter{case (i, s) => address(i, s) == (i, s)}
        .mapPartitionsWithIndex((i, iter) => {
          val combined = iter ++ messagesBD.value.getOrElse(i, Seq())
          combined.map((i, _))
        }, true)
      
      transformed
        .mapPartitionsWithIndex((i, iter) => Iterator((i, iter.size)))
        .collect
      
      // Array[(Int, Int)] = Array((0,4), (1,5), (2,7), (3,3))
      

    • 请注意以下行:

      val combined = iter ++ messagesBD.value.getOrElse(i, Seq())
      

      messagesBD.value是整个广播数据,实际上是Map[Int,Iterable[String]],但是getOrElse方法仅返回映射到i的数据(如果可用).

      messagesBD.value is the entire broadcast data, which is actually a Map[Int,Iterable[String]], but then getOrElse method returns only the data that was mapped to i (if available).

      这篇关于如何在群集中查找在RDD中存储给定元素并向其发送消息的机器?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆