带有自定义架构的Spark读取实木复合地板 [英] Spark read parquet with custom schema

查看:66
本文介绍了带有自定义架构的Spark读取实木复合地板的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试使用自定义架构以镶木地板格式导入数据,但它返回:TypeError:option()缺少1个必需的位置参数:值"

I'm trying to import data with parquet format with custom schema but it returns : TypeError: option() missing 1 required positional argument: 'value'

   ProductCustomSchema = StructType([
        StructField("id_sku", IntegerType(), True),
        StructField("flag_piece", StringType(), True),
        StructField("flag_weight", StringType(), True),
        StructField("ds_sku", StringType(), True),
        StructField("qty_pack", FloatType(), True)])

def read_parquet_(path, schema) : 
    return spark.read.format("parquet")\
                             .option(schema)\
                             .option("timestampFormat", "yyyy/MM/dd HH:mm:ss")\
                             .load(path)

product_nomenclature = 'C:/Users/alexa/Downloads/product_nomenc'
product_nom = read_parquet_(product_nomenclature, ProductCustomSchema)

推荐答案

如注释中所述,您应将 .option(schema)更改为 .schema(schema). option()要求您指定一个 key (要设置的选项的名称)和一个 value (要设置的值)分配给该选项).之所以得到 TypeError ,是因为您只是将一个名为 schema 的变量传递给 option ,而未指定您实际上试图设置的选项变量.

As mentioned in the comments you should change .option(schema) to .schema(schema). option() requires you to specify a key (the name of the option you're setting) and a value (what value you want to assign to that option). You are getting the TypeError because you were just passing a variable called schema to option without specifying what that option you were actually trying to set with that variable.

由于在 schema 变量中定义的架构与DataFrame中的数据不匹配,因此引发了您在注释中发布的 QueryExecutionException .如果要指定自定义架构,则必须确保该架构与您正在读取的数据匹配.在您的示例中,列 id_sku 被存储为 BinaryType ,但是在您的模式中,您将列定义为 IntegerType . pyspark 不会尝试协调您提供的架构与数据中实际类型之间的差异,并且会引发异常.

The QueryExecutionException you posted in the comments is being raised because the schema you've defined in your schema variable does not match the data in your DataFrame. If you're going to specify a custom schema you must make sure that schema matches the data you are reading. In your example the column id_sku is stored as a BinaryType, but in your schema you're defining the column as an IntegerType. pyspark will not try to reconcile differences between the schema you provide and what the actual types are in the data and an exception will be thrown.

要解决您的错误,请确保所定义的架构正确地代表了存储在镶木地板文件中的数据(即,将架构中 id_sku 的数据类型更改为 BinaryType).这样做的好处是,不必在每次读取镶木地板文件时都推断文件模式,从而可以稍微提高性能.

To fix your error make sure the schema you're defining correctly represents your data as it is stored in the parquet file (i.e. change the datatype of id_sku in your schema to be BinaryType). The benefit to doing this is you get a slight performance gain by not having to infer the file schema each time the parquet file is read.

这篇关于带有自定义架构的Spark读取实木复合地板的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆