有没有一种方法可以在地图过程中跳过/丢弃/忽略Spark中的记录? [英] Is there a way to skip/throw-out/ignore records in Spark during a map?
问题描述
我们有一个非常标准的Spark作业,该作业从s3中读取日志文件,然后对其进行一些处理.非常基本的Spark内容...
We have a very standard Spark job which reads log files from s3 and then does some processing over them. Very basic Spark stuff...
val logs = sc.textFile(somePathTos3)
val mappedRows = logs.map(log => OurRowObject.parseLog(log.split("\t")))
val validRows = mappedRows.filter(log => log._1._1 != "ERROR")
...and continue processing
OurRowObject.parseLine
在哪里获取原始日志行并将其映射到某些(键,值)对(例如,我们可以对其进行处理的( (1,2,3,4), (5,6,7) )
.现在,如果parseLine
遇到问题"日志(格式错误) ,空白等),它将返回一些前哨值(例如( ("ERROR", ...), (...) )
,然后过滤步骤将其过滤掉.
Where OurRowObject.parseLine
takes the raw log line and maps it to some (key, value) pair (e.g. ( (1,2,3,4), (5,6,7) )
that we can then do processing on. Now, if parseLine
encounters a "problem" log (malformed, empty, etc...) it will return some sentinel value (e.g. ( ("ERROR", ...), (...) )
which the filter step then filters out.
现在,我一直试图找到一种解决方法,就是在地图过程中根本不包括问题行...以某种方式告诉火花嘿,这是一个空/格式错误的行,请跳过它且不包括一对",而不是额外的过滤步骤.
Now, what I have been trying to find a way to do is to simply not include the problem row(s) during the map...some way to tell spark "Hey this is an empty/malformed row, skip it and don't include a pair for it", instead of that additional filter step.
我还没有找到实现此目的的方法,并且发现该功能(AFAICanFind)不存在非常有趣.
I have not yet been able to find a way to do this, and find it very interesting that this functionality does not (AFAICanFind) exist.
谢谢
推荐答案
您可以使解析器返回Option [Value]而不是Value.这样,您可以使用flatMap将行映射到行并删除无效的行.
You could make the parser return an Option[Value] instead of a Value. That way you could use flatMap to map the lines to rows and remove those that were invalid.
大致来说是这样的:
def parseLog(line:String):Option[Array[String]] = {
val splitted = log.split("\t")
if (validate(splitted)) Some(splitted) else None
}
val validRows = logs.flatMap(OurRowObject.parseLog(_))
这篇关于有没有一种方法可以在地图过程中跳过/丢弃/忽略Spark中的记录?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!