在pyspark中不使用pivot进行分组的有效方法 [英] effective way to groupby without using pivot in pyspark

查看:13
本文介绍了在pyspark中不使用pivot进行分组的有效方法的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个查询,需要使用 pyspark 计算内存利用率.我已经使用 pivot 使用 python pandas 实现了这一点,但现在我需要在 pyspark 中进行,并且旋转将是一个昂贵的功能,所以我想知道在 pyspark 中是否有针对此解决方案的任何替代方案

I have a query where I need to calculate memory utilization using pyspark. I had achieved this with python pandas using pivot but now I need to do it in pyspark and pivoting would be an expensive function so I would like to know if there is any alternative in pyspark for this solution

time_stamp          Hostname    kpi kpi_subtype value_current
2019/08/17 10:01:05 Server1     memory  Total       100
2019/08/17 10:01:06 Server1     memory  used        35
2019/08/17 10:01:09 Server1     memory  buffer      8
2019/08/17 10:02:04 Server1     memory  cached      10
2019/08/17 10:01:05 Server2     memory  Total       100
2019/08/17 10:01:06 Server2     memory  used        42
2019/08/17 10:01:09 Server2     memory  buffer      7
2019/08/17 10:02:04 Server2     memory  cached      9
2019/08/17 10:07:05 Server1     memory  Total       100
2019/08/17 10:07:06 Server1     memory  used        35
2019/08/17 10:07:09 Server1     memory  buffer      8
2019/08/17 10:07:04 Server1     memory  cached      10
2019/08/17 10:08:05 Server2     memory  Total       100
2019/08/17 10:08:06 Server2     memory  used        35
2019/08/17 10:08:09 Server2     memory  buffer      8
2019/08/17 10:08:04 Server2     memory  cached      10

哪些需要转化为

time_stamp      Hostname    kpi Percentage
2019-08-17 10:05:00 Server1     memory  17
2019-08-17 10:05:00 Server2     memory  26
2019-08-17 10:10:00 Server1     memory  17
2019-08-17 10:10:00 Server2     memory  17

我使用的 Python 代码

Python code i used

df3 = pd.read_csv('/home/yasin/Documents/IMI/Data/memorry sample.csv')
df3['time_stamp'] = pd.to_datetime(df3['time_stamp'])
ns5min=5*60*1000000000 
df3['time_stamp'] = pd.to_datetime(((df3['time_stamp'].astype(np.int64) // ns5min + 1 ) * ns5min))
df4 = df3.pivot_table('value_current' , ['time_stamp' , 'Hostname ' , 'kpi' ], 'kpi_subtype')
df4 = df4.reset_index()
df4['Percentage'] = ((df4['Total'] - (df4['Total'] - df4['used'] + df4['buffer'] + df4['cached'])) / df4['Total']) * 100

寻找一个在 pyspark 中复制它并在 python 中更有效的方法,因为数据透视是一项昂贵的操作,我需要在一个非常大的数据集上每 5 分钟执行一次此操作

Looking for a to replicate this in pyspark and a more efficient way in python as pivot is an expensive operation and I need to perform this every 5 mins on a really large dataset

推荐答案

当转换为列的值列表未知时,透视的开销很大.Spark 有一个重载的 pivot 方法,将它们作为参数.

Pivoting is expensive when the list of values that are translated to columns is unknown. Spark has an overloaded pivot method that takes them as an argument.

def pivot(pivotColumn: String, values: Seq[Any])

如果它们未知,Spark 必须对数据集中的不同值进行排序和收集.否则,逻辑非常简单并且描述了 此处.

In case they aren't known Spark must sort and collect the distinct values from your dataset. Otherwise, the logic is pretty straightforward and described here.

该实现添加了一个新的逻辑运算符 (o.a.s.sql.catalyst.plans.logical.Pivot).该逻辑运算符由新的分析器规则 (o.a.s.sql.catalyst.analysis.Analyzer.ResolvePivot) 转换,该规则当前将其转换为包含大量 if 语句的聚合,每个主元值一个表达式.

The implementation adds a new logical operator (o.a.s.sql.catalyst.plans.logical.Pivot). That logical operator is translated by a new analyzer rule (o.a.s.sql.catalyst.analysis.Analyzer.ResolvePivot) that currently translates it into an aggregation with lots of if statements, one expression per pivot value.

例如,df.groupBy("A", "B").pivot("C", Seq("small", "large")).sum("D")被翻译成等价于 df.groupBy("A", "B").agg(expr("sum(if(C = 'small', D, null))"), expr("sum(if(C = '大',D,空))")).您可以自己完成此操作,但时间会很长,而且可能很快就会出错.

For example, df.groupBy("A", "B").pivot("C", Seq("small", "large")).sum("D") would be translated into the equivalent of df.groupBy("A", "B").agg(expr("sum(if(C = ‘small’, D, null))"), expr("sum(if(C = ‘large’, D, null))")). You could have done this yourself but it would get long and possibly error prone quickly.

如果不旋转,我会做这样的事情:

Without pivoting I would do something like that:

val in = spark.read.csv("input.csv")
      //cast to the unix timestamp
      .withColumn("timestamp", unix_timestamp($"time_stamp", "yyyy/MM/dd HH:mm:ss").cast(TimestampType))
      .drop($"time_stamp")

现在我们可以按时间窗口和主机名对数据集进行分组,并将 KPI 指标收集到地图中.
有一个很好的 answer 描述了这一点.

Now we can group our dataset by the time window with hostname and collect KPI metrics into a map.
There is an excellent answer describing just that.

val joinMap = udf { values: Seq[Map[String, Double]] => values.flatten.toMap }

val grouped = in.groupBy(window($"timestamp", "5 minutes"), $"Hostname")
  .agg(joinMap(collect_list(map($"kpi_subtype", $"value_current".cast(DoubleType)))).as("metrics"))

输出

+------------------------------------------+--------+-------------------------------------------------------------+
|window                                    |Hostname|metrics                                                      |
+------------------------------------------+--------+-------------------------------------------------------------+
|[2019-08-17 10:00:00, 2019-08-17 10:05:00]|Server1 |[Total -> 100.0, used -> 35.0, buffer -> 8.0, cached -> 10.0]|
|[2019-08-17 10:00:00, 2019-08-17 10:05:00]|Server2 |[Total -> 100.0, used -> 42.0, buffer -> 7.0, cached -> 9.0] |
|[2019-08-17 10:05:00, 2019-08-17 10:10:00]|Server1 |[Total -> 100.0, used -> 35.0, buffer -> 8.0, cached -> 10.0]|
|[2019-08-17 10:05:00, 2019-08-17 10:10:00]|Server2 |[Total -> 100.0, used -> 35.0, buffer -> 8.0, cached -> 10.0]|
+------------------------------------------+--------+-------------------------------------------------------------+

现在我们定义一些别名和一个简单的选择语句:

Now we define some aliases and a simple select statement:

val total = col("metrics")("Total")
val used = col("metrics")("used")
val buffer = col("metrics")("buffer")
val cached = col("metrics")("cached")

val result = grouped.select($"window", $"Hostname",
          (total - ((total - used + buffer + cached) / total) * 100).as("percentage"))

我们开始:

+------------------------------------------+--------+----------+
|window                                    |Hostname|percentage|
+------------------------------------------+--------+----------+
|[2019-08-17 10:00:00, 2019-08-17 10:05:00]|Server1 |17.0      |
|[2019-08-17 10:00:00, 2019-08-17 10:05:00]|Server2 |26.0      |
|[2019-08-17 10:05:00, 2019-08-17 10:10:00]|Server1 |17.0      |
|[2019-08-17 10:05:00, 2019-08-17 10:10:00]|Server2 |17.0      |
+------------------------------------------+--------+----------+

这篇关于在pyspark中不使用pivot进行分组的有效方法的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆