如何在解析期间获取无效数据的计数 [英] How to get count of invalid data during parse

查看:347
本文介绍了如何在解析期间获取无效数据的计数的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我们正在使用spark解析一个较大的csv文件,该文件可能包含无效数据. 我们希望将有效数据保存到数据存储中,并返回导入的有效数据和无效的数据.

We are using spark to parse a big csv file, which may contain invalid data. We want to save valid data into the data store, and also return how many valid data we imported and how many invalid data.

我想知道我们如何才能做到这一点,读取数据时的标准方法是什么?

I am wondering how we can do this in spark, what's the standard approach when reading data?

我目前的方法使用Accumulator,但由于Accumulator在spark中的工作方式,因此不准确.

My current approach uses Accumulator, but it's not accurate due to how Accumulator works in spark.

// we define case class CSVInputData: all fields are defined as string
val csvInput = spark.read.option("header", "true").csv(csvFile).as[CSVInputData]

val newDS = csvInput
  .flatMap { row =>
    Try {
      val data = new DomainData()
      data.setScore(row.score.trim.toDouble)
      data.setId(UUID.randomUUID().toString())
      data.setDate(Util.parseDate(row.eventTime.trim))
      data.setUpdateDate(new Date())
      data
    } match {
      case Success(map) => Seq(map)
      case _ => {
        errorAcc.add(1)
        Seq()
      }
    }
}

我尝试使用Either,但失败,但出现以下异常:

I tried to use Either, but it failed with the exception:

java.lang.NoClassDefFoundError:未找到与带有scala.util.Either [xx.CSVInputData,xx.DomainData]可序列化的产品对应的Java类

java.lang.NoClassDefFoundError: no Java class corresponding to Product with Serializable with scala.util.Either[xx.CSVInputData,xx.DomainData] found

更新

我认为Ether都不能与spark 2.0数据集api一起使用:

I think Either doesn't work with spark 2.0 dataset api:

      spark.read.option("header", "true").csv("any.csv").map { row => 
      try {
        Right("")
      } catch {  case e: Throwable => Left(""); }
      }

如果我们更改为使用sc(rdd api),它将起作用:

If we change to use sc(rdd api), it works:

      sc.parallelize('a' to 'z').map { row => 
      try {
        Right("")
      } catch {  case e: Throwable => Left(""); }
      }.collect()

在最新的Scala中, http://www.scala-lang.org/api/2.11.x/index.html#scala.util.Either :均未实现可序列化特征

In current latest scala http://www.scala-lang.org/api/2.11.x/index.html#scala.util.Either: Either doesn't implements Serializable trait

sealed abstract class Either[+A, +B] extends AnyRef

将来的2.12 http://www.scala -lang.org/api/2.12.x/​​scala/util/Either.html ,它可以:

In future 2.12 http://www.scala-lang.org/api/2.12.x/scala/util/Either.html, it does:

sealed abstract class Either[+A, +B] extends Product with Serializable

使用解决方法更新了2

更多信息,请参见 Spark ETL:使用两种方法处理无效数据

由于spark数据集不适用于Either,因此解决方法是调用ds.rdd,然后使用try-left-right捕获有效和无效数据.

As spark dataset doesn't work with Either, so the workaround is to call ds.rdd, then use try-left-right to capture both valid and invalid data.

   spark.read.option("header", "true").csv("/Users/yyuan/jyuan/1.csv").rdd.map ( { row => 
   try {
     Right("")
   } catch {  case e: Throwable => Left(""); }
   }).collect()

推荐答案

您是否考虑过使用 查看全文

登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆