pySpark:java.lang.UnsupportedOperationException:未实现的类型:StringType [英] pySpark: java.lang.UnsupportedOperationException: Unimplemented type: StringType

查看:165
本文介绍了pySpark:java.lang.UnsupportedOperationException:未实现的类型:StringType的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

在读取不一致的书面形式的镶木地板文件组时,我们在合并模式方面遇到了问题. 在切换到手动指定架构时,出现以下错误.任何指针都会有所帮助.

While reading inconsistent schema written group of parquet files, we have issue on schema merging. On switching to manually specifying schema i get following error. Any pointer will be helpful.

java.lang.UnsupportedOperationException:未实现的类型:StringType 在org.apache.spark.sql.execution.datasources.parquet.VectorizedColumnReader.readDoubleBatch(VectorizedColumnReader.java:389) 在org.apache.spark.sql.execution.datasources.parquet.VectorizedColumnReader.readBatch(VectorizedColumnReader.java:195) 在org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.nextBatch(VectorizedParquetRecordReader.java:230)处 在org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.nextKeyValue(VectorizedParquetRecordReader.java:137)

java.lang.UnsupportedOperationException: Unimplemented type: StringType at org.apache.spark.sql.execution.datasources.parquet.VectorizedColumnReader.readDoubleBatch(VectorizedColumnReader.java:389) at org.apache.spark.sql.execution.datasources.parquet.VectorizedColumnReader.readBatch(VectorizedColumnReader.java:195) at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.nextBatch(VectorizedParquetRecordReader.java:230) at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.nextKeyValue(VectorizedParquetRecordReader.java:137)

source_location = "{}/{}/{}/dt={}/{}/*_{}_{}.parquet".format(source_initial,
                                                                       bucket,
                                                                       source_prefix,
                                                                       date,
                                                                       source_file_pattern,
                                                                       date,
                                                                       source_file_pattern)
schema = StructType([
        StructField("Unnamed", StringType(), True),StructField("nanos", LongType(), True),StructField("book", LongType(), True),
        StructField("X_o", LongType(), True),StructField("Y_o", LongType(), True),StructField("Z_o", LongType(), True),
        StructField("Total", DoubleType(), True),StructField("P_v", DoubleType(), True),StructField("R_v", DoubleType(), True),
        StructField("S_v", DoubleType(), True),StructField("message_type", StringType(), True),StructField("symbol", StringType(), True),
        StructField("date", StringType(), True),StructField("__index_level_0__", StringType(), True)])

print("Querying data from source location {}".format(source_location))
df_raw = spark.read.format('parquet').load(source_location, schema = schema, inferSchema = False,mergeSchema="true")
df_raw = df_raw.filter(df_raw.nanos.between(open_nano,close_nano))
df_raw = df_raw.withColumn("timeInWindow_nano",(fun.ceil(df_raw.nanos/(window_nano))).cast("int"))
df_core = df_raw.groupBy("date","symbol","timeInWindow_nano").agg(fun.sum("Total").alias("Total"),
                                                     fun.sum("P_v").alias("P_v"),
                                                     fun.sum("R_v").alias("R_v"),
                                                     fun.sum("S_v").alias("S_v"))

df_core = df_core.withColumn("P_v",fun.when(df_core.Total < 0,0).otherwise(df_core.P_v))
df_core = df_core.withColumn("R_v",fun.when(df_core.Total < 0,0).otherwise(df_core.R_v))
df_core = df_core.withColumn("S_v",fun.when(df_core.Total < 0,0).otherwise(df_core.S_v))
df_core = df_core.withColumn("P_pct",df_core.P_v*df_core.Total)
df_core = df_core.withColumn("R_pct",df_core.R_v*df_core.Total)
df_core = df_core.withColumn("S_pct",df_core.S_v*df_core.Total)

推荐答案

如果架构不兼容,则不能在一个load中读取镶木地板文件.我的建议是将其分为两个负载,然后在兼容时合并数据框.查看示例代码:

You cannot read parquet files in one load if schemas are not compatible. My advice would be to separate this to two loads and then union dataframes when you have them compatible. See example code:

schema1_df = spark.read.parquet('path/to/files/with/string/field.parquet')
schema2_df = spark.read.parquet('path/to/files/with/double/field.parquet')
df = schema2_df.unionAll(schema1.df.withColumn('invalid_col', schema2_df.invalid_col.cast('double')))

这篇关于pySpark:java.lang.UnsupportedOperationException:未实现的类型:StringType的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
相关文章
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆