直方图-以并行方式进行 [英] Histogram -Doing it in a parallel way

查看:155
本文介绍了直方图-以并行方式进行的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

+----+----+--------+
| Id | M1 |  trx   |
+----+----+--------+
| 1  | M1 | 11.35  |
| 2  | M1 | 3.4    |
| 3  | M1 | 10.45  |
| 2  | M1 | 3.95   |
| 3  | M1 | 20.95  |
| 2  | M2 | 25.55  |
| 1  | M2 |  9.95  |
| 2  | M2 | 11.95  |
| 1  | M2 |  9.65  |
| 1  | M2 | 14.54  |
+----+----+--------+

使用上述数据框,我应该能够使用以下代码生成如下的直方图. 类似Queston在这里

With the above dataframe I should be able to generate a histogram as below using the below code. Similar Queston is here

val (Range,counts) = df
.select(col("trx"))
.rdd.map(r => r.getDouble(0))
.histogram(10)
// Range: Array[Double] = Array(3.4, 5.615, 7.83, 10.045, 12.26, 14.475, 16.69, 18.905, 21.12, 23.335, 25.55)
// counts: Array[Long] = Array(2, 0, 2, 3, 0, 1, 0, 1, 0, 1) 

但是这里的问题是,如何基于列"M1"并行创建直方图?这意味着我需要为列值M1和M2提供两个直方图输出.

But Issue here is,how can I parallely create the histogram based on column 'M1' ?This means I need to have two histogram output for column Values M1 and M2.

推荐答案

首先,您需要知道histogram会生成两个单独的顺序作业.一种用于检测数据的最小值和最大值,一种用于计算实际直方图.您可以使用Spark UI进行检查.

First, you need to know that histogram generates two separate sequential jobs. One to detect the minimum and maximum of your data, one to compute the actual histogram. You can check this using the Spark UI.

我们可以遵循相同的方案,仅用两个作业就可以在任意数量的列上构建直方图.但是,我们不能使用histogram函数,该函数仅用于处理一个双打集合.我们需要自己实施.第一项工作简直是简单.

We can follow the same scheme to build histograms on as many columns as you wish, with only two jobs. Yet, we cannot use the histogram function which is only meant to handle one collection of doubles. We need to implement it by ourselves. The first job is dead simple.

val Row(min_trx : Double, max_trx : Double) = df.select(min('trx), max('trx)).head

然后我们在本地计算直方图的范围.请注意,我对所有列使用相同的范围.它允许轻松比较列之间的结果(通过在同一图上绘制它们).但是,每列具有不同的范围只是此代码的一小部分修改.

Then we compute locally the ranges of the histogram. Note that I use the same ranges for all the columns. It allows to compare the results easily between the columns (by plotting them on the same figure). Having different ranges per column would just be a small modification of this code though.

val hist_size = 10
val hist_step = (max_trx - min_trx) / hist_size
val hist_ranges = (1 until hist_size)
    .scanLeft(min_trx)((a, _) => a + hist_step) :+ max_trx
// I add max_trx manually to avoid rounding errors that would exclude the value

那是第一部分.然后,我们可以使用UDF确定每个值在什么范围内终止,并计算与spark并行的所有直方图.

That was the first part. Then, we can use a UDF to determine in what range each value ends up, and compute all the histograms in parallel with spark.

val range_index = udf((x : Double) => hist_ranges.lastIndexWhere(x >= _))
val hist_df = df
    .withColumn("rangeIndex", range_index('trx))
    .groupBy("M1", "rangeIndex")
    .count()
// And voilà, all the data you need is there.
hist_df.show()
+---+----------+-----+
| M1|rangeIndex|count|
+---+----------+-----+
| M2|         2|    2|
| M1|         0|    2|
| M2|         5|    1|
| M1|         3|    2|
| M2|         3|    1|
| M1|         7|    1|
| M2|        10|    1|
+---+----------+-----+

作为奖励,您可以使用RDD API或收集数据框并在scala中对其进行修改,从而对数据进行整形以在本地(在驱动程序内)使用它.

As a bonus, you can shape the data to use it locally (within the driver), either using the RDD API or by collecting the dataframe and modifying it in scala.

这是使用spark的一种方法,因为这是关于spark;-)

Here is one way to do it with spark since this is a question about spark ;-)

val hist_map = hist_df.rdd
    .map(row => row.getAs[String]("M1") ->
             (row.getAs[Int]("rangeIndex"), row.getAs[Long]("count")))
    .groupByKey
    .mapValues( _.toMap)
    .mapValues( hists => (1 to hist_size)
                    .map(i => hists.getOrElse(i, 0L)).toArray )
    .collectAsMap

如何为每个列值建立一个范围:

我们使用groupBy为列的每个值计算M1的最小值和最大值,而不是计算M1的最小值和最大值.

Instead of computing the min and max of M1, we compute it for each value of the column with groupBy.

val min_max_map = df.groupBy("M1")
    .agg(min('trx), max('trx))
    .rdd.map(row => row.getAs[String]("M1") ->
      (row.getAs[Double]("min(trx)"), row.getAs[Double]("max(trx)")))
    .collectAsMap // maps each column value to a tuple (min, max)

然后我们调整UDF,以便它使用此映射,我们就完成了.

Then we adapt the UDF so that it uses this map and we are done.

// for clarity, let's define a function that generates histogram ranges
def generate_ranges(min_trx : Double, max_trx : Double, hist_size : Int) = {
    val hist_step = (max_trx - min_trx) / hist_size
    (1 until hist_size).scanLeft(min_trx)((a, _) => a + hist_step) :+ max_trx
}
// and use it to generate one range per column value
val range_map = min_max_map.keys
    .map(key => key ->
        generate_ranges(min_max_map(key)._1, min_max_map(key)._2, hist_size))
    .toMap

val range_index = udf((x : Double, m1 : String) =>
                       range_map(m1).lastIndexWhere(x >= _))

最后,只需将range_index('trx)替换为range_index('trx, 'M1),每个列值就会有一个范围.

Finally, just replace range_index('trx) by range_index('trx, 'M1) and you will have one range per column value.

这篇关于直方图-以并行方式进行的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆