Spark-如何正确处理RDD.map()方法中的错误情况? [英] Spark - How to handle error case in RDD.map() method correctly?

查看:87
本文介绍了Spark-如何正确处理RDD.map()方法中的错误情况?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试使用Spark RDD进行一些文本处理.

I am trying to do some text processing using Spark RDD.

输入文件的格式为:

2015-05-20T18:30 <some_url>/?<key1>=<value1>&<key2>=<value2>&...&<keyn>=<valuen>

我想从文本中提取一些字段并将其转换为CSV格式,例如:

I want to extract some fields from the text and convert them into CSV format like:

<value1>,<value5>,<valuek>,<valuen>

以下代码是我的操作方式:

The following code is how I do this:

val lines = sc.textFile(s"s3n://${MY_BUCKET}/${MY_FOLDER}/test/*.gz")
val records = lines.map { line =>
    val mp = line.split("&")
                 .map(_.split("="))
                 .filter(_.length >= 2)
                 .map(t => (t(0), t(1))).toMap

    (mp.get("key1"), mp.get("key5"), mp.get("keyk"), mp.get("keyn"))
}

我想知道,如果输入文本的某些行格式错误或无效,那么 map()函数将无法返回有效值.这在文本处理中应该很常见,解决此问题的最佳实践是什么?

I would like to know that, if some line of the input text is of wrong format or invalid, then the map() function cannot return a valid value. This should very common in text processing, what is the best practice to deal with this problem?

推荐答案

为了管理此错误,您可以使用scala的类在flatMap操作中尝试,代码如下:

in order to manage this errors you can use the scala's class Try within a flatMap operation, in code:

    val lines = sc.textFile(s"s3n://${MY_BUCKET}/${MY_FOLDER}/test/*.gz")
    val records = lines.flatMap (line =>
        Try{
          val mp = line.split("&")
              .map(_.split("="))
              .filter(_.length >= 2)
              .map(t => (t(0), t(1))).toMap

          (mp.get("key1"), mp.get("key5"), mp.get("keyk"), mp.get("keyn"))
      } match {
        case Success(map) => Seq(map)
        case _ => Seq()
    })

有了这个,您只有好人",但是如果您同时想要(错误和好人),我建议在代码中使用一个返回Scala Either的map函数,然后使用一个Spark过滤器,在代码中:/p>

With this you have only the "good ones" but if you want both (the errors and the good ones) i would recommend to use a map function that returns a Scala Either and then use a Spark filter, in code:

    val lines = sc.textFile(s"s3n://${MY_BUCKET}/${MY_FOLDER}/test/*.gz")
    val goodBadRecords = lines.map (line =>
        Try{
          val mp = line.split("&")
              .map(_.split("="))
              .filter(_.length >= 2)
              .map(t => (t(0), t(1))).toMap

          (mp.get("key1"), mp.get("key5"), mp.get("keyk"), mp.get("keyn"))
      } match {
        case Success(map) => Right(map)
        case Failure(e) => Left(e)
    })
    val records = goodBadRecords.filter(_.isRight)
    val errors = goodBadRecords.filter(_.isLeft)

我希望这会有用

这篇关于Spark-如何正确处理RDD.map()方法中的错误情况?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆