Spark SQL - 加载带有一些格式错误的记录的 csv/psv 文件 [英] Spark SQL - loading csv/psv files with some malformed records

查看:27
本文介绍了Spark SQL - 加载带有一些格式错误的记录的 csv/psv 文件的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我们正在使用 Spark 加载文件目录的层次结构并将它们转换为 Parquet.数百个管道分隔文件中有数十 GB 的空间.有些本身就很大.

We are loading hierarchies of directories of files with Spark and converting them to Parquet. There are tens of gigabytes in hundreds of pipe-separated files. Some are pretty big themselves.

比如说,每个第 100 个文件都有一两行有一个额外的分隔符,使整个进程(或文件)中止.

Every, say, 100th file has a row or two that has an extra delimiter that makes the whole process (or the file) abort.

我们正在加载:

sqlContext.read
        .format("com.databricks.spark.csv")
        .option("header", format("header"))
        .option("delimiter", format("delimeter"))
        .option("quote", format("quote"))
        .option("escape", format("escape"))
        .option("charset", "UTF-8")
        // Column types are unnecessary for our current use cases.
        //.option("inferschema", "true")
        .load(glob)

Spark 是否有任何扩展或事件处理机制,我们可以将其附加到读取行的逻辑上,也就是说,如果遇到格式错误的行,则跳过该行而不是使进程失败?

Is there any extension or a event handling mechanism with Spark that we could attach to the logic that reads rows, that, if the malformed row is encountered, just skips the row instead of failing the process on it?

(我们计划做更多的预处理,但这将是最直接和关键的修复.)

(We are planning to do more pre-processing, but this would be the most immediate and critical fix.)

推荐答案

在你的情况下,它可能不是 Spark 解析它失败的部分,而是默认值实际上是 PERMISSIVE从而将尽力而为解析为格式错误的记录,然后在处理逻辑的下游导致问题.

In your case it may not be the Spark parsing part of it which fails, but rather the fact that the default is actually PERMISSIVE such that it parses best-effort into a malformed record that then causes problems further downstream in your processing logic.

您应该能够简单地添加选项:

You should be able to simply add the option:

.option("mode", "DROPMALFORMED")

像这样:

sqlContext.read
        .format("com.databricks.spark.csv")
        .option("header", format("header"))
        .option("delimiter", format("delimeter"))
        .option("quote", format("quote"))
        .option("escape", format("escape"))
        .option("charset", "UTF-8")
        // Column types are unnecessary for our current use cases.
        //.option("inferschema", "true")
        .option("mode", "DROPMALFORMED")
        .load(glob)

它会跳过分隔符数量不正确或与模式不匹配的行,而不是让它们在代码中导致错误.

and it'll skip the lines with incorrect number of delimiters or which don't match the schema, rather than letting them cause errors later on in the code.

这篇关于Spark SQL - 加载带有一些格式错误的记录的 csv/psv 文件的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆