从pyspark中的数据框构建StructType [英] Building a StructType from a dataframe in pyspark

查看:2447
本文介绍了从pyspark中的数据框构建StructType的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我是新火花和python,面临着从可应用于我的数据文件的元数据文件构建模式的困难. 方案:数据文件(csv格式)的元数据文件包含列及其类型:例如:

id,int,10,"","",id,"","",TRUE,"",0
created_at,timestamp,"","","",created_at,"","",FALSE,"",0

我已成功将其转换为如下所示的数据框:

+--------------------+---------------+
|                name|           type|
+--------------------+---------------+
|                  id|  IntegerType()|
|          created_at|TimestampType()|
|          updated_at|   StringType()|

但是当我尝试使用此方法将其转换为StructField格式时

fields = schemaLoansNew.map(lambda l:([StructField(l.name, l.type, 'true')]))

OR

schemaList = schemaLoansNew.map(lambda l: ("StructField(" + l.name + "," + l.type + ",true)")).collect()

然后使用

将其转换为StructType

schemaFinal = StructType(schemaList)

我收到以下错误:

Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/opt/mapr/spark/spark-1.4.1/python/pyspark/sql/types.py", line 372, in __init__
assert all(isinstance(f, DataType) for f in fields), "fields should be a list of DataType"
AssertionError: fields should be a list of DataType

由于对数据框架的了解不足,我对此一无所知,请问您如何进行此操作.准备好架构后,我想使用createDataFrame应用于我的数据文件.必须对许多表执行此过程,因此我不想对类型进行硬编码,而希望使用元数据文件来构建架构,然后将其应用于RDD.

谢谢.

解决方案

具有参数的字段必须是DataType对象的列表.这个:

.map(lambda l:([StructField(l.name, l.type, 'true')]))

collectlist of tuples(Rows)和DataType(list[list[tuple[DataType]]])的lists之后生成,更不用说nullable参数应该是布尔值而不是字符串./p>

您的第二次尝试:

.map(lambda l: ("StructField(" + l.name + "," + l.type + ",true)")).

collect个对象的collectlist之后生成.

您显示的记录的正确架构应大致如下所示:

from pyspark.sql.types import *

StructType([
    StructField("id", IntegerType(), True),
    StructField("created_at", TimestampType(), True),
    StructField("updated_at", StringType(), True)
])

尽管对这样的任务使用分布式数据结构是一个严重的矫kill过正,更不用说效率低下了,但是您可以尝试按以下方式调整第一个解决方案:

StructType([
    StructField(name, eval(type), True) for (name, type) in  df.rdd.collect()
])

,但不是特别安全(eval).从JSON/字典构建模式可能会更容易.假设您具有从类型描述映射到规范类型名称的功能:

def get_type_name(s: str) -> str:
    """
    >>> get_type_name("int")
    'integer'
    """
    _map = {
        'int': IntegerType().typeName(),
        'timestamp': TimestampType().typeName(),
        # ...
    } 
    return _map.get(s, StringType().typeName())

您可以构建以下形状的字典:

schema_dict = {'fields': [
    {'metadata': {}, 'name': 'id', 'nullable': True, 'type': 'integer'},
    {'metadata': {}, 'name': 'created_at', 'nullable': True, 'type': 'timestamp'}
], 'type': 'struct'}

并将其输入到StructType.fromJson:

StructType.fromJson(schema_dict)

I am new spark and python and facing this difficulty of building a schema from a metadata file that can be applied to my data file. Scenario: Metadata File for the Data file(csv format), contains the columns and their types: for example:

id,int,10,"","",id,"","",TRUE,"",0
created_at,timestamp,"","","",created_at,"","",FALSE,"",0

I have successfully converted this to a dataframe that looks like:

+--------------------+---------------+
|                name|           type|
+--------------------+---------------+
|                  id|  IntegerType()|
|          created_at|TimestampType()|
|          updated_at|   StringType()|

But when I try to convert this to a StructField format using this

fields = schemaLoansNew.map(lambda l:([StructField(l.name, l.type, 'true')]))

OR

schemaList = schemaLoansNew.map(lambda l: ("StructField(" + l.name + "," + l.type + ",true)")).collect()

And then later convert it to StructType, using

schemaFinal = StructType(schemaList)

I get the following error:

Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/opt/mapr/spark/spark-1.4.1/python/pyspark/sql/types.py", line 372, in __init__
assert all(isinstance(f, DataType) for f in fields), "fields should be a list of DataType"
AssertionError: fields should be a list of DataType

I am stuck on this due to my lack of knowledge on Data Frames, can you please advise, how to proceed on this. once I have schema ready I want to use createDataFrame to apply to my data File. This process has to be done for many tables so I do not want to hardcode the types rather use the metadata file to build the schema and then apply to the RDD.

Thanks in advance.

解决方案

Fields have argument have to be a list of DataType objects. This:

.map(lambda l:([StructField(l.name, l.type, 'true')]))

generates after collect a list of lists of tuples (Rows) of DataType (list[list[tuple[DataType]]]) not to mention that nullable argument should be boolean not a string.

Your second attempt:

.map(lambda l: ("StructField(" + l.name + "," + l.type + ",true)")).

generates after collect a list of str objects.

Correct schema for the record you've shown should look more or less like this:

from pyspark.sql.types import *

StructType([
    StructField("id", IntegerType(), True),
    StructField("created_at", TimestampType(), True),
    StructField("updated_at", StringType(), True)
])

Although using distributed data structures for task like this is a serious overkill, not to mention inefficient, you can try to adjust your first solution as follows:

StructType([
    StructField(name, eval(type), True) for (name, type) in  df.rdd.collect()
])

but it is not particularly safe (eval). It could be easier to build a schema from JSON / dictionary. Assuming you have function which maps from type description to canonical type name:

def get_type_name(s: str) -> str:
    """
    >>> get_type_name("int")
    'integer'
    """
    _map = {
        'int': IntegerType().typeName(),
        'timestamp': TimestampType().typeName(),
        # ...
    } 
    return _map.get(s, StringType().typeName())

You can build dictionary of following shape:

schema_dict = {'fields': [
    {'metadata': {}, 'name': 'id', 'nullable': True, 'type': 'integer'},
    {'metadata': {}, 'name': 'created_at', 'nullable': True, 'type': 'timestamp'}
], 'type': 'struct'}

and feed it to StructType.fromJson:

StructType.fromJson(schema_dict)

这篇关于从pyspark中的数据框构建StructType的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆