带有自定义架构的 Spark 读取镶木地板 [英] Spark read parquet with custom schema

查看:25
本文介绍了带有自定义架构的 Spark 读取镶木地板的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试使用带有自定义架构的镶木地板格式导入数据,但它返回:类型错误:option() 缺少 1 个必需的位置参数:'value'

I'm trying to import data with parquet format with custom schema but it returns : TypeError: option() missing 1 required positional argument: 'value'

   ProductCustomSchema = StructType([
        StructField("id_sku", IntegerType(), True),
        StructField("flag_piece", StringType(), True),
        StructField("flag_weight", StringType(), True),
        StructField("ds_sku", StringType(), True),
        StructField("qty_pack", FloatType(), True)])

def read_parquet_(path, schema) : 
    return spark.read.format("parquet")\
                             .option(schema)\
                             .option("timestampFormat", "yyyy/MM/dd HH:mm:ss")\
                             .load(path)

product_nomenclature = 'C:/Users/alexa/Downloads/product_nomenc'
product_nom = read_parquet_(product_nomenclature, ProductCustomSchema)

推荐答案

如评论中所述,您应该将 .option(schema) 更改为 .schema(schema).option() 要求您指定一个 key(您正在设置的选项的名称)和一个 value(您想要的值)分配给该选项).您收到 TypeError 是因为您只是将一个名为 schema 的变量传递给 option ,而没有指定您实际尝试设置的选项变量.

As mentioned in the comments you should change .option(schema) to .schema(schema). option() requires you to specify a key (the name of the option you're setting) and a value (what value you want to assign to that option). You are getting the TypeError because you were just passing a variable called schema to option without specifying what that option you were actually trying to set with that variable.

您在评论中发布的 QueryExecutionException 正在引发,因为您在 schema 变量中定义的架构与 DataFrame 中的数据不匹配.如果您要指定自定义架构,则必须确保该架构与您正在读取的数据相匹配.在您的示例中,列 id_sku 存储为 BinaryType,但在您的架构中,您将该列定义为 IntegerType.pyspark 不会尝试协调您提供的架构与数据中的实际类型之间的差异,并且会引发异常.

The QueryExecutionException you posted in the comments is being raised because the schema you've defined in your schema variable does not match the data in your DataFrame. If you're going to specify a custom schema you must make sure that schema matches the data you are reading. In your example the column id_sku is stored as a BinaryType, but in your schema you're defining the column as an IntegerType. pyspark will not try to reconcile differences between the schema you provide and what the actual types are in the data and an exception will be thrown.

要修复您的错误,请确保您定义的架构正确地表示您的数据,因为它存储在镶木地板文件中(即将架构中 id_sku 的数据类型更改为 BinaryType).这样做的好处是您无需在每次读取 Parquet 文件时都推断文件架构,从而获得轻微的性能提升.

To fix your error make sure the schema you're defining correctly represents your data as it is stored in the parquet file (i.e. change the datatype of id_sku in your schema to be BinaryType). The benefit to doing this is you get a slight performance gain by not having to infer the file schema each time the parquet file is read.

这篇关于带有自定义架构的 Spark 读取镶木地板的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆