如何在解析期间获取无效数据的计数 [英] How to get count of invalid data during parse

查看:26
本文介绍了如何在解析期间获取无效数据的计数的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我们正在使用 spark 来解析一个大的 csv 文件,其中可能包含无效数据.我们希望将有效数据保存到数据存储中,同时返回我们导入了多少有效数据和多少无效数据.

We are using spark to parse a big csv file, which may contain invalid data. We want to save valid data into the data store, and also return how many valid data we imported and how many invalid data.

我想知道我们如何在 spark 中做到这一点,读取数据时的标准方法是什么?

I am wondering how we can do this in spark, what's the standard approach when reading data?

我目前的方法使用 Accumulator,但由于 Accumulator 在 spark 中的工作方式,它并不准确.

My current approach uses Accumulator, but it's not accurate due to how Accumulator works in spark.

// we define case class CSVInputData: all fields are defined as string
val csvInput = spark.read.option("header", "true").csv(csvFile).as[CSVInputData]

val newDS = csvInput
  .flatMap { row =>
    Try {
      val data = new DomainData()
      data.setScore(row.score.trim.toDouble)
      data.setId(UUID.randomUUID().toString())
      data.setDate(Util.parseDate(row.eventTime.trim))
      data.setUpdateDate(new Date())
      data
    } match {
      case Success(map) => Seq(map)
      case _ => {
        errorAcc.add(1)
        Seq()
      }
    }
}

我尝试使用 Either,但它失败了:

I tried to use Either, but it failed with the exception:

java.lang.NoClassDefFoundError: 没有找到对应于带有可序列化的产品的 Java 类,带有 scala.util.Either[xx.CSVInputData,xx.DomainData]

java.lang.NoClassDefFoundError: no Java class corresponding to Product with Serializable with scala.util.Either[xx.CSVInputData,xx.DomainData] found

更新

我认为两者都不适用于 spark 2.0 数据集 api:

I think Either doesn't work with spark 2.0 dataset api:

      spark.read.option("header", "true").csv("any.csv").map { row => 
      try {
        Right("")
      } catch {  case e: Throwable => Left(""); }
      }

如果我们改用sc(rdd api),就可以了:

If we change to use sc(rdd api), it works:

      sc.parallelize('a' to 'z').map { row => 
      try {
        Right("")
      } catch {  case e: Throwable => Left(""); }
      }.collect()

在当前最新的 scala http://www.scala-lang.org/api/2.11.x/index.html#scala.util.Either:要么没有实现Serializable trait

In current latest scala http://www.scala-lang.org/api/2.11.x/index.html#scala.util.Either: Either doesn't implements Serializable trait

sealed abstract class Either[+A, +B] extends AnyRef

在未来的 2.12 http://www.scala-lang.org/api/2.12.x/​​scala/util/Either.html,它确实:

In future 2.12 http://www.scala-lang.org/api/2.12.x/scala/util/Either.html, it does:

sealed abstract class Either[+A, +B] extends Product with Serializable

使用变通方法更新了 2

更多信息见 Spark ETL:使用任一处理无效数据

由于 spark 数据集不适用于任何一种,因此解决方法是调用 ds.rdd,然后使用 try-left-right 捕获有效和无效数据.

As spark dataset doesn't work with Either, so the workaround is to call ds.rdd, then use try-left-right to capture both valid and invalid data.

   spark.read.option("header", "true").csv("/Users/yyuan/jyuan/1.csv").rdd.map ( { row => 
   try {
     Right("")
   } catch {  case e: Throwable => Left(""); }
   }).collect()

推荐答案

您是否考虑过使用 要么

val counts = csvInput
      .map { row =>
        try {
          val data = new DomainData()
          data.setScore(row.score.trim.toDouble)
          data.setId(UUID.randomUUID().toString())
          data.setDate(Util.parseDate(row.eventTime.trim))
          data.setUpdateDate(new Date())
          Right(data)
        } catch {
          case e: Throwable => Left(row)
        }
      }
      val failedCount = counts.map(_.left).filter(_.e.isLeft).count()
      val successCount = counts.map(_.right).filter(_.e.isRight).count()

这篇关于如何在解析期间获取无效数据的计数的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆