火花柱状性能 [英] Spark columnar performance

查看:20
本文介绍了火花柱状性能的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我是 Spark 的相对初学者.我有一个宽数据框(1000 列),我想根据相应的列是否有缺失值来添加列

所以

<前>+----+|一个 |+----+|1 |+----+|空|+----+|3 |+----+

变成

<前>+----+-------+|一个 |管理信息系统 |+----+-------+|1 |0 |+----+-------+|空|1 |+----+-------+|3 |1 |+----+-------+

这是自定义 ml 转换器的一部分,但算法应该很清楚.

override def transform(dataset: org.apache.spark.sql.Dataset[_]): org.apache.spark.sql.DataFrame = {var ds = 数据集dataset.columns.foreach(c => {if (dataset.filter(col(c).isNull).count() > 0) {ds = ds.withColumn(c + "_MIS", when(col(c).isNull, 1).otherwise(0))}})ds.toDF()}

循环列,如果 > 0 空值创建一个新列.

传入的数据集被缓存(使用 .cache 方法)并且相关的配置设置是默认值.现在它在一台笔记本电脑上运行,即使行数最少,对于 1000 列也需要 40 分钟的时间.我认为问题是由于访问了数据库,所以我尝试使用镶木地板文件而不是相同的结果.查看作业 UI,它似乎在执行文件扫描以进行计数.

有没有办法改进这个算法以获得更好的性能,或者以某种方式调整缓存?增加 spark.sql.inMemoryColumnarStorage.batchSize 只是给我一个 OOM 错误.

解决方案

移除条件:

if (dataset.filter(col(c).isNull).count() > 0)

只留下内部表达式.正如它所写的那样,Spark 需要 #columns 数据扫描.

如果您想修剪列计算一次统计信息,如使用 Pyspark 计算 Spark 数据帧每列中非 NaN 条目的数量,并使用单个 drop 调用.

I'm a relative beginner to things Spark. I have a wide dataframe (1000 columns) that I want to add columns to based on whether a corresponding column has missing values

so

+----+          
| A  |
+----+
| 1  |
+----+
|null|     
+----+
| 3  |
+----+

becomes

+----+-------+          
| A  | A_MIS |
+----+-------+
| 1  |   0   |
+----+-------+
|null|   1   |
+----+-------+
| 3  |   1   |
+----+-------+

This is part of a custom ml transformer but the algorithm should be clear.

override def transform(dataset: org.apache.spark.sql.Dataset[_]): org.apache.spark.sql.DataFrame = {
  var ds = dataset
  dataset.columns.foreach(c => {
    if (dataset.filter(col(c).isNull).count() > 0) {
      ds = ds.withColumn(c + "_MIS", when(col(c).isNull, 1).otherwise(0))
    }
  })


  ds.toDF()
}

Loop over the columns, if > 0 nulls create a new column.

The dataset passed in is cached (using the .cache method) and the relevant config settings are the defaults. This is running on a single laptop for now, and runs in the order of 40 minutes for the 1000 columns even with a minimal amount of rows. I thought the problem was due to hitting a database, so I tried with a parquet file instead with the same result. Looking at the jobs UI it appears to be doing filescans in order to do the count.

Is there a way I can improve this algorithm to get better performance, or tune the cacheing in some way? Increasing spark.sql.inMemoryColumnarStorage.batchSize just got me an OOM error.

解决方案

Remove the condition:

if (dataset.filter(col(c).isNull).count() > 0) 

and leave only the internal expression. As it is written Spark requires #columns data scans.

If you want prune columns compute statistics once, as outlined in Count number of non-NaN entries in each column of Spark dataframe with Pyspark, and use single drop call.

这篇关于火花柱状性能的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆