在pyspark中不使用pivot进行分组的有效方法 [英] effective way to groupby without using pivot in pyspark
问题描述
我有一个查询,需要使用 pyspark 计算内存利用率.我已经使用 pivot 使用 python pandas 实现了这一点,但现在我需要在 pyspark 中进行,并且旋转将是一个昂贵的功能,所以我想知道在 pyspark 中是否有针对此解决方案的任何替代方案
I have a query where I need to calculate memory utilization using pyspark. I had achieved this with python pandas using pivot but now I need to do it in pyspark and pivoting would be an expensive function so I would like to know if there is any alternative in pyspark for this solution
time_stamp Hostname kpi kpi_subtype value_current
2019/08/17 10:01:05 Server1 memory Total 100
2019/08/17 10:01:06 Server1 memory used 35
2019/08/17 10:01:09 Server1 memory buffer 8
2019/08/17 10:02:04 Server1 memory cached 10
2019/08/17 10:01:05 Server2 memory Total 100
2019/08/17 10:01:06 Server2 memory used 42
2019/08/17 10:01:09 Server2 memory buffer 7
2019/08/17 10:02:04 Server2 memory cached 9
2019/08/17 10:07:05 Server1 memory Total 100
2019/08/17 10:07:06 Server1 memory used 35
2019/08/17 10:07:09 Server1 memory buffer 8
2019/08/17 10:07:04 Server1 memory cached 10
2019/08/17 10:08:05 Server2 memory Total 100
2019/08/17 10:08:06 Server2 memory used 35
2019/08/17 10:08:09 Server2 memory buffer 8
2019/08/17 10:08:04 Server2 memory cached 10
哪些需要转化为
time_stamp Hostname kpi Percentage
2019-08-17 10:05:00 Server1 memory 17
2019-08-17 10:05:00 Server2 memory 26
2019-08-17 10:10:00 Server1 memory 17
2019-08-17 10:10:00 Server2 memory 17
我使用的 Python 代码
Python code i used
df3 = pd.read_csv('/home/yasin/Documents/IMI/Data/memorry sample.csv')
df3['time_stamp'] = pd.to_datetime(df3['time_stamp'])
ns5min=5*60*1000000000
df3['time_stamp'] = pd.to_datetime(((df3['time_stamp'].astype(np.int64) // ns5min + 1 ) * ns5min))
df4 = df3.pivot_table('value_current' , ['time_stamp' , 'Hostname ' , 'kpi' ], 'kpi_subtype')
df4 = df4.reset_index()
df4['Percentage'] = ((df4['Total'] - (df4['Total'] - df4['used'] + df4['buffer'] + df4['cached'])) / df4['Total']) * 100
寻找一个在 pyspark 中复制它并在 python 中更有效的方法,因为数据透视是一项昂贵的操作,我需要在一个非常大的数据集上每 5 分钟执行一次此操作
Looking for a to replicate this in pyspark and a more efficient way in python as pivot is an expensive operation and I need to perform this every 5 mins on a really large dataset
推荐答案
当转换为列的值列表未知时,透视的开销很大.Spark 有一个重载的 pivot
方法,将它们作为参数.
Pivoting is expensive when the list of values that are translated to columns is unknown. Spark has an overloaded pivot
method that takes them as an argument.
def pivot(pivotColumn: String, values: Seq[Any])
如果它们未知,Spark 必须对数据集中的不同值进行排序和收集.否则,逻辑非常简单并且描述了 此处.
In case they aren't known Spark must sort and collect the distinct values from your dataset. Otherwise, the logic is pretty straightforward and described here.
该实现添加了一个新的逻辑运算符 (o.a.s.sql.catalyst.plans.logical.Pivot).该逻辑运算符由新的分析器规则 (o.a.s.sql.catalyst.analysis.Analyzer.ResolvePivot) 转换,该规则当前将其转换为包含大量 if 语句的聚合,每个主元值一个表达式.
The implementation adds a new logical operator (o.a.s.sql.catalyst.plans.logical.Pivot). That logical operator is translated by a new analyzer rule (o.a.s.sql.catalyst.analysis.Analyzer.ResolvePivot) that currently translates it into an aggregation with lots of if statements, one expression per pivot value.
例如,df.groupBy("A", "B").pivot("C", Seq("small", "large")).sum("D")被翻译成等价于 df.groupBy("A", "B").agg(expr("sum(if(C = 'small', D, null))"), expr("sum(if(C = '大',D,空))")).您可以自己完成此操作,但时间会很长,而且可能很快就会出错.
For example, df.groupBy("A", "B").pivot("C", Seq("small", "large")).sum("D") would be translated into the equivalent of df.groupBy("A", "B").agg(expr("sum(if(C = ‘small’, D, null))"), expr("sum(if(C = ‘large’, D, null))")). You could have done this yourself but it would get long and possibly error prone quickly.
如果不旋转,我会做这样的事情:
Without pivoting I would do something like that:
val in = spark.read.csv("input.csv")
//cast to the unix timestamp
.withColumn("timestamp", unix_timestamp($"time_stamp", "yyyy/MM/dd HH:mm:ss").cast(TimestampType))
.drop($"time_stamp")
现在我们可以按时间窗口和主机名对数据集进行分组,并将 KPI 指标收集到地图中.
有一个很好的 answer 描述了这一点.
Now we can group our dataset by the time window with hostname and collect KPI metrics into a map.
There is an excellent answer describing just that.
val joinMap = udf { values: Seq[Map[String, Double]] => values.flatten.toMap }
val grouped = in.groupBy(window($"timestamp", "5 minutes"), $"Hostname")
.agg(joinMap(collect_list(map($"kpi_subtype", $"value_current".cast(DoubleType)))).as("metrics"))
输出
+------------------------------------------+--------+-------------------------------------------------------------+
|window |Hostname|metrics |
+------------------------------------------+--------+-------------------------------------------------------------+
|[2019-08-17 10:00:00, 2019-08-17 10:05:00]|Server1 |[Total -> 100.0, used -> 35.0, buffer -> 8.0, cached -> 10.0]|
|[2019-08-17 10:00:00, 2019-08-17 10:05:00]|Server2 |[Total -> 100.0, used -> 42.0, buffer -> 7.0, cached -> 9.0] |
|[2019-08-17 10:05:00, 2019-08-17 10:10:00]|Server1 |[Total -> 100.0, used -> 35.0, buffer -> 8.0, cached -> 10.0]|
|[2019-08-17 10:05:00, 2019-08-17 10:10:00]|Server2 |[Total -> 100.0, used -> 35.0, buffer -> 8.0, cached -> 10.0]|
+------------------------------------------+--------+-------------------------------------------------------------+
现在我们定义一些别名和一个简单的选择语句:
Now we define some aliases and a simple select statement:
val total = col("metrics")("Total")
val used = col("metrics")("used")
val buffer = col("metrics")("buffer")
val cached = col("metrics")("cached")
val result = grouped.select($"window", $"Hostname",
(total - ((total - used + buffer + cached) / total) * 100).as("percentage"))
我们开始:
+------------------------------------------+--------+----------+
|window |Hostname|percentage|
+------------------------------------------+--------+----------+
|[2019-08-17 10:00:00, 2019-08-17 10:05:00]|Server1 |17.0 |
|[2019-08-17 10:00:00, 2019-08-17 10:05:00]|Server2 |26.0 |
|[2019-08-17 10:05:00, 2019-08-17 10:10:00]|Server1 |17.0 |
|[2019-08-17 10:05:00, 2019-08-17 10:10:00]|Server2 |17.0 |
+------------------------------------------+--------+----------+
这篇关于在pyspark中不使用pivot进行分组的有效方法的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!