计算数据框SPARK上的过滤项 [英] Counting filtered items on dataframe SPARK

查看:91
本文介绍了计算数据框SPARK上的过滤项的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有以下数据框:df

I have the following dataframe :df

在某些时候,我需要根据时间戳(毫秒)过滤掉项目. 但是,对我来说很重要的是保存筛选的记录数(以防万一我想使工作失败) 我可以天真地做:

In some point I need to filter out items base on timestamps(milliseconds). However it is important to me to save how much records werefiltered(In case it is too many I want to fail the job) Naively I can do:

======Lots of calculations on df ======
val df_filtered = df.filter($"ts" >= startDay && $"ts"  <= endDay)
val filtered_count = df.count - df_filtered.count

但是,由于SPARK会执行整个执行树3次(过滤器和2个计数),因此感觉像是彻底的矫kill过正. Hadoop MapReduce中的此任务非常容易,因为我可以为过滤的每一行维护计数器. 有没有更有效的方法,我只能找到累加器,但无法将其连接到过滤器.

However it feels like complete overkill since SPARK will perform the whole execution tree, 3 times (filter and 2 counts). This task in Hadoop MapReduce is really easy since I can maintain counter for each row filtered. Is there more efficient way, I could only find accumulators but I can't connect it to filter.

一种建议的方法是将df缓存在过滤器之前,但是由于DF的大小,我宁愿将此选项作为最后的选择.

A suggested approach was to cache df before the filter however I would prefer this option as last resort due to DF size.

推荐答案

Spark 1.6.0代码:

import org.apache.spark.sql.SQLContext
import org.apache.spark.{SparkConf, SparkContext}

object Main {

  val conf = new SparkConf().setAppName("myapp").setMaster("local[*]")
  val sc = new SparkContext(conf)
  val sqlContext = new SQLContext(sc)

  case class xxx(a: Int, b: Int)

  def main(args: Array[String]): Unit = {

    val df = sqlContext.createDataFrame(sc.parallelize(Seq(xxx(1, 1), xxx(2, 2), xxx(3,3))))

    val acc = sc.accumulator[Long](0)

    val filteredRdd = df.rdd.filter(r => {
      if (r.getAs[Int]("a") > 2) {
        true
      } else {
        acc.add(1)
        false
      }
    })

    val filteredRddDf = sqlContext.createDataFrame(filteredRdd, df.schema)

    filteredRddDf.show()

    println(acc.value)
  }
}

Spark 2.x.x代码:

import org.apache.spark.sql.SparkSession

object Main {

  val ss = SparkSession.builder().master("local[*]").getOrCreate()
  val sc = ss.sparkContext

  case class xxx(a: Int, b: Int)

  def main(args: Array[String]): Unit = {

    val df = ss.createDataFrame(sc.parallelize(Seq(xxx(1, 1), xxx(2, 2), xxx(3,3))))

    val acc = sc.longAccumulator

    val filteredDf = df.filter(r => {
      if (r.getAs[Int]("a") > 2) {
        true
      } else {
        acc.add(1)
        false
      }
    }).toDF()


    filteredDf.show()

    println(acc.value)

  }
}

这篇关于计算数据框SPARK上的过滤项的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆