从 pyspark 中的数据帧构建 StructType [英] Building a StructType from a dataframe in pyspark

查看:29
本文介绍了从 pyspark 中的数据帧构建 StructType的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我是 spark 和 python 新手,面临着从元数据文件构建架构的困难,该架构可以应用于我的数据文件.场景:数据文件的元数据文件(csv格式),包含列及其类型:例如:

id,int,10,"","",id,"","",TRUE,"",0created_at,timestamp,"","","",created_at,"","",FALSE,"",0

我已成功将其转换为如下所示的数据框:

+--------------------+--------------+|姓名|类型|+--------------------+--------------+|身份证|整数类型()||created_at|时间戳类型()||更新时间|字符串类型()|

但是当我尝试使用此将其转换为 StructField 格式时

fields = schemaLoansNew.map(lambda l:([StructField(l.name, l.type, 'true')]))

schemaList = schemaLoansNew.map(lambda l: ("StructField(" + l.name + "," + l.type + ",true)")).collect()

然后将其转换为 StructType,使用

schemaFinal = StructType(schemaList)

我收到以下错误:

回溯(最近一次调用最后一次):文件<stdin>",第 1 行,在 <module> 中文件/opt/mapr/spark/spark-1.4.1/python/pyspark/sql/types.py",第 372 行,在 __init__ 中assert all(isinstance(f, DataType) for f in fields),字段应该是数据类型列表"断言错误:字段应该是数据类型列表

由于我对数据帧缺乏了解,我被困在这个问题上,请您提供建议,如何继续进行.一旦我准备好架构,我想使用 createDataFrame 应用于我的数据文件.必须对许多表执行此过程,因此我不想对类型进行硬编码,而是使用元数据文件来构建架构,然后应用于 RDD.

提前致谢.

解决方案

字段的参数必须是 DataType 对象的列表.这:

.map(lambda l:([StructField(l.name, l.type, 'true')]))

collect之后生成tuples(Rows)的listslistDataType (list[list[tuple[DataType]]]) 更不用说 nullable 参数应该是布尔值而不是字符串.>

您的第二次尝试:

.map(lambda l: ("StructField(" + l.name + "," + l.type + ",true)")).

collect 之后生成 str 对象的 list.

您显示的记录的正确架构应该或多或少像这样:

from pyspark.sql.types import *结构类型([StructField("id", IntegerType(), True),StructField("created_at", TimestampType(), True),StructField("updated_at", StringType(), True)])

虽然对这样的任务使用分布式数据结构是一种严重的矫枉过正,更不用说效率低下,你可以尝试调整你的第一个解决方案如下:

StructType([StructField(name, eval(type), True) for (name, type) in df.rdd.collect()])

但它不是特别安全(eval).从 JSON/字典构建模式可能更容易.假设您具有从类型描述映射到规范类型名称的函数:

def get_type_name(s: str) ->字符串:""">>>get_type_name("int")'整数'"""_map = {'int': IntegerType().typeName(),'时间戳':时间戳类型().typeName(),# ...}返回 _map.get(s, StringType().typeName())

您可以构建以下形状的字典:

schema_dict = {'fields': [{'metadata': {}, 'name': 'id', 'nullable': True, 'type': 'integer'},{'metadata': {}, 'name': 'created_at', 'nullable': True, 'type': 'timestamp'}], '类型': '结构'}

并将其提供给 StructType.fromJson:

StructType.fromJson(schema_dict)

I am new spark and python and facing this difficulty of building a schema from a metadata file that can be applied to my data file. Scenario: Metadata File for the Data file(csv format), contains the columns and their types: for example:

id,int,10,"","",id,"","",TRUE,"",0
created_at,timestamp,"","","",created_at,"","",FALSE,"",0

I have successfully converted this to a dataframe that looks like:

+--------------------+---------------+
|                name|           type|
+--------------------+---------------+
|                  id|  IntegerType()|
|          created_at|TimestampType()|
|          updated_at|   StringType()|

But when I try to convert this to a StructField format using this

fields = schemaLoansNew.map(lambda l:([StructField(l.name, l.type, 'true')]))

OR

schemaList = schemaLoansNew.map(lambda l: ("StructField(" + l.name + "," + l.type + ",true)")).collect()

And then later convert it to StructType, using

schemaFinal = StructType(schemaList)

I get the following error:

Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/opt/mapr/spark/spark-1.4.1/python/pyspark/sql/types.py", line 372, in __init__
assert all(isinstance(f, DataType) for f in fields), "fields should be a list of DataType"
AssertionError: fields should be a list of DataType

I am stuck on this due to my lack of knowledge on Data Frames, can you please advise, how to proceed on this. once I have schema ready I want to use createDataFrame to apply to my data File. This process has to be done for many tables so I do not want to hardcode the types rather use the metadata file to build the schema and then apply to the RDD.

Thanks in advance.

解决方案

Fields have argument have to be a list of DataType objects. This:

.map(lambda l:([StructField(l.name, l.type, 'true')]))

generates after collect a list of lists of tuples (Rows) of DataType (list[list[tuple[DataType]]]) not to mention that nullable argument should be boolean not a string.

Your second attempt:

.map(lambda l: ("StructField(" + l.name + "," + l.type + ",true)")).

generates after collect a list of str objects.

Correct schema for the record you've shown should look more or less like this:

from pyspark.sql.types import *

StructType([
    StructField("id", IntegerType(), True),
    StructField("created_at", TimestampType(), True),
    StructField("updated_at", StringType(), True)
])

Although using distributed data structures for task like this is a serious overkill, not to mention inefficient, you can try to adjust your first solution as follows:

StructType([
    StructField(name, eval(type), True) for (name, type) in  df.rdd.collect()
])

but it is not particularly safe (eval). It could be easier to build a schema from JSON / dictionary. Assuming you have function which maps from type description to canonical type name:

def get_type_name(s: str) -> str:
    """
    >>> get_type_name("int")
    'integer'
    """
    _map = {
        'int': IntegerType().typeName(),
        'timestamp': TimestampType().typeName(),
        # ...
    } 
    return _map.get(s, StringType().typeName())

You can build dictionary of following shape:

schema_dict = {'fields': [
    {'metadata': {}, 'name': 'id', 'nullable': True, 'type': 'integer'},
    {'metadata': {}, 'name': 'created_at', 'nullable': True, 'type': 'timestamp'}
], 'type': 'struct'}

and feed it to StructType.fromJson:

StructType.fromJson(schema_dict)

这篇关于从 pyspark 中的数据帧构建 StructType的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆