优化Flink转换 [英] Optimizing Flink transformation

查看:106
本文介绍了优化Flink转换的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有以下方法来计算DataSet中某个值的概率:

I have the following method that computes the probability of a value in a DataSet:

/**
   * Compute the probabilities of each value on the given [[DataSet]]
   *
   * @param x single colum [[DataSet]]
   * @return Sequence of probabilites for each value
   */
  private[this] def probs(x: DataSet[Double]): Seq[Double] = {
        val counts = x.groupBy(_.doubleValue)
          .reduceGroup(_.size.toDouble)
          .name("X Probs")
          .collect

        val total = counts.sum

        counts.map(_ / total)
  }

问题是,当我提交flink作业时,使用此方法,由于任务TimeOut,flink导致flink取消了该作业.我对只有40.000个实例和9个属性的DataSet上的每个属性执行此方法.

The problem is that when I submit my flink job, that uses this method, its causing flink to kill the job due to a task TimeOut. I am executing this method for each attribute on a DataSet with only 40.000 instances and 9 attributes.

有没有办法使我的代码更高效?

Is there a way I could do this code more efficient?

经过几次尝试,我将其与mapPartition一起使用,该方法是类InformationTheory的一部分,该类进行一些计算以计算熵,互信息等.因此,例如,计算SymmetricalUncertainty这样:

After a few tries, I made it work with mapPartition, this method is part of a class InformationTheory, which does some computations to calculate Entropy, mutual information etc. So, for example, SymmetricalUncertainty is computed as this:

/**
   * Computes 'symmetrical uncertainty' (SU) - a symmetric mutual information measure.
   *
   * It is defined as SU(X, y) = 2 * (IG(X|Y) / (H(X) + H(Y)))
   *
   * @param xy [[DataSet]] with two features
   * @return SU value
   */
  def symmetricalUncertainty(xy: DataSet[(Double, Double)]): Double = {
    val su = xy.mapPartitionWith {
      case in ⇒
        val x = in map (_._2)
        val y = in map (_._1)

        val mu = mutualInformation(x, y)
        val Hx = entropy(x)
        val Hy = entropy(y)

        Some(2 * mu / (Hx + Hy))
    }

    su.collect.head.head
  }

有了这个,我可以有效地计算entropy,相互信息等.问题是,它仅在并行度为1的情况下工作,问题出在mapPartition中.

With this, I can compute efficiently entropy, mutual information etc. The catch is, it only works with a level of parallelism of 1, the problem resides in mapPartition.

有什么办法可以使我执行与SymmetricalUncertainty类似的操作,但是无论并行度如何?

Is there a way I could do something similar to what I am doing here with SymmetricalUncertainty, but with whatever level of parallelism?

推荐答案

我终于做到了,不知道它是否是最好的解决方案,但是可以在n个并行级别上工作:

I finally did it, don't know if its the best solution, but its working with n levels of parallelism:

def symmetricalUncertainty(xy: DataSet[(Double, Double)]): Double = {
    val su = xy.reduceGroup { in ⇒
        val invec = in.toVector
        val x = invec map (_._2)
        val y = invec map (_._1)

        val mu = mutualInformation(x, y)
        val Hx = entropy(x)
        val Hy = entropy(y)

        2 * mu / (Hx + Hy)
    }

    su.collect.head
  } 

您可以在 InformationTheory.scala 及其测试 查看全文

登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆