从 pyspark 中的数据帧构建 StructType [英] Building a StructType from a dataframe in pyspark
问题描述
我是 spark 和 python 新手,面临着从元数据文件构建架构的困难,该架构可以应用于我的数据文件.场景:数据文件的元数据文件(csv格式),包含列及其类型:例如:
id,int,10,"","",id,"","",TRUE,"",0created_at,timestamp,"","","",created_at,"","",FALSE,"",0
我已成功将其转换为如下所示的数据框:
+--------------------+--------------+|姓名|类型|+--------------------+--------------+|身份证|整数类型()||created_at|时间戳类型()||更新时间|字符串类型()|
但是当我尝试使用此将其转换为 StructField 格式时
fields = schemaLoansNew.map(lambda l:([StructField(l.name, l.type, 'true')]))
或
schemaList = schemaLoansNew.map(lambda l: ("StructField(" + l.name + "," + l.type + ",true)")).collect()
然后将其转换为 StructType,使用
schemaFinal = StructType(schemaList)
我收到以下错误:
回溯(最近一次调用最后一次):文件<stdin>",第 1 行,在 <module> 中文件/opt/mapr/spark/spark-1.4.1/python/pyspark/sql/types.py",第 372 行,在 __init__ 中assert all(isinstance(f, DataType) for f in fields),字段应该是数据类型列表"断言错误:字段应该是数据类型列表
由于我对数据帧缺乏了解,我被困在这个问题上,请您提供建议,如何继续进行.一旦我准备好架构,我想使用 createDataFrame 应用于我的数据文件.必须对许多表执行此过程,因此我不想对类型进行硬编码,而是使用元数据文件来构建架构,然后应用于 RDD.
提前致谢.
字段的参数必须是 DataType
对象的列表.这:
.map(lambda l:([StructField(l.name, l.type, 'true')]))
在collect
之后生成tuples
(Rows
)的lists
的list
DataType
(list[list[tuple[DataType]]]
) 更不用说 nullable
参数应该是布尔值而不是字符串.>
您的第二次尝试:
.map(lambda l: ("StructField(" + l.name + "," + l.type + ",true)")).
在 collect
之后生成 str
对象的 list
.
您显示的记录的正确架构应该或多或少像这样:
from pyspark.sql.types import *结构类型([StructField("id", IntegerType(), True),StructField("created_at", TimestampType(), True),StructField("updated_at", StringType(), True)])
虽然对这样的任务使用分布式数据结构是一种严重的矫枉过正,更不用说效率低下,你可以尝试调整你的第一个解决方案如下:
StructType([StructField(name, eval(type), True) for (name, type) in df.rdd.collect()])
但它不是特别安全(eval
).从 JSON/字典构建模式可能更容易.假设您具有从类型描述映射到规范类型名称的函数:
def get_type_name(s: str) ->字符串:""">>>get_type_name("int")'整数'"""_map = {'int': IntegerType().typeName(),'时间戳':时间戳类型().typeName(),# ...}返回 _map.get(s, StringType().typeName())
您可以构建以下形状的字典:
schema_dict = {'fields': [{'metadata': {}, 'name': 'id', 'nullable': True, 'type': 'integer'},{'metadata': {}, 'name': 'created_at', 'nullable': True, 'type': 'timestamp'}], '类型': '结构'}
并将其提供给 StructType.fromJson
:
StructType.fromJson(schema_dict)
I am new spark and python and facing this difficulty of building a schema from a metadata file that can be applied to my data file. Scenario: Metadata File for the Data file(csv format), contains the columns and their types: for example:
id,int,10,"","",id,"","",TRUE,"",0
created_at,timestamp,"","","",created_at,"","",FALSE,"",0
I have successfully converted this to a dataframe that looks like:
+--------------------+---------------+
| name| type|
+--------------------+---------------+
| id| IntegerType()|
| created_at|TimestampType()|
| updated_at| StringType()|
But when I try to convert this to a StructField format using this
fields = schemaLoansNew.map(lambda l:([StructField(l.name, l.type, 'true')]))
OR
schemaList = schemaLoansNew.map(lambda l: ("StructField(" + l.name + "," + l.type + ",true)")).collect()
And then later convert it to StructType, using
schemaFinal = StructType(schemaList)
I get the following error:
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/opt/mapr/spark/spark-1.4.1/python/pyspark/sql/types.py", line 372, in __init__
assert all(isinstance(f, DataType) for f in fields), "fields should be a list of DataType"
AssertionError: fields should be a list of DataType
I am stuck on this due to my lack of knowledge on Data Frames, can you please advise, how to proceed on this. once I have schema ready I want to use createDataFrame to apply to my data File. This process has to be done for many tables so I do not want to hardcode the types rather use the metadata file to build the schema and then apply to the RDD.
Thanks in advance.
Fields have argument have to be a list of DataType
objects. This:
.map(lambda l:([StructField(l.name, l.type, 'true')]))
generates after collect
a list
of lists
of tuples
(Rows
) of DataType
(list[list[tuple[DataType]]]
) not to mention that nullable
argument should be boolean not a string.
Your second attempt:
.map(lambda l: ("StructField(" + l.name + "," + l.type + ",true)")).
generates after collect
a list
of str
objects.
Correct schema for the record you've shown should look more or less like this:
from pyspark.sql.types import *
StructType([
StructField("id", IntegerType(), True),
StructField("created_at", TimestampType(), True),
StructField("updated_at", StringType(), True)
])
Although using distributed data structures for task like this is a serious overkill, not to mention inefficient, you can try to adjust your first solution as follows:
StructType([
StructField(name, eval(type), True) for (name, type) in df.rdd.collect()
])
but it is not particularly safe (eval
). It could be easier to build a schema from JSON / dictionary. Assuming you have function which maps from type description to canonical type name:
def get_type_name(s: str) -> str:
"""
>>> get_type_name("int")
'integer'
"""
_map = {
'int': IntegerType().typeName(),
'timestamp': TimestampType().typeName(),
# ...
}
return _map.get(s, StringType().typeName())
You can build dictionary of following shape:
schema_dict = {'fields': [
{'metadata': {}, 'name': 'id', 'nullable': True, 'type': 'integer'},
{'metadata': {}, 'name': 'created_at', 'nullable': True, 'type': 'timestamp'}
], 'type': 'struct'}
and feed it to StructType.fromJson
:
StructType.fromJson(schema_dict)
这篇关于从 pyspark 中的数据帧构建 StructType的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!