从pyspark中的json文件中使用架构读取固定宽度的文件 [英] Read fixed width file using schema from json file in pyspark

查看:58
本文介绍了从pyspark中的json文件中使用架构读取固定宽度的文件的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我的固定宽度文件如下

00120181120xyz12341
00220180203abc56792
00320181203pqr25483 

以及指定架构的相应 JSON 文件:

And a corresponding JSON file that specifies the schema:

{"Column":"id","From":"1","To":"3"}
{"Column":"date","From":"4","To":"8"}
{"Column":"name","From":"12","To":"3"}
{"Column":"salary","From":"15","To":"5"}

我使用以下方法将架构文件读入DataFrame中:

I read the schema file into DataFrame using:

SchemaFile = spark.read\
    .format("json")\
    .option("header","true")\
    .json('C:\Temp\schemaFile\schema.json')

SchemaFile.show()
#+------+----+---+
#|Column|From| To|
#+------+----+---+
#|    id|   1|  3|
#|  date|   4|  8|
#|  name|  12|  3|
#|salary|  15|  5|
#+------+----+---+

同样,我将固定宽度文件解析为pyspark DataFrame,如下所示:

Likewise, I am parsing the fixed width file into a pyspark DataFrame as below:

File = spark.read\
    .format("csv")\
    .option("header","false")\
    .load("C:\Temp\samplefile.txt")

File.show()
#+-------------------+
#|                _c0|
#+-------------------+
#|00120181120xyz12341|
#|00220180203abc56792|
#|00320181203pqr25483|
#+-------------------+

显然,我可以对每列的位置和长度的值进行硬编码以获得所需的输出:

I can obviously hard code the values for the positions and lengths of each column to get the desired output:

from pyspark.sql.functions import substring
data = File.select(
    substring(File._c0,1,3).alias('id'),
    substring(File._c0,4,8).alias('date'),
    substring(File._c0,12,3).alias('name'),
    substring(File._c0,15,5).alias('salary')
)

data.show()
#+---+--------+----+------+
#| id|    date|name|salary|
#+---+--------+----+------+
#|001|20181120| xyz| 12341|
#|002|20180203| abc| 56792|
#|003|20181203| pqr| 25483|
#+---+--------+----+------+

但是如何使用 SchemaFile DataFrame指定行的宽度和列名,以便可以在运行时动态应用该模式(无需硬编码)?

But how can I use the SchemaFile DataFrame to specify the widths and column names for the lines so that the schema can be applied dynamically (without hard coding) at run time?

推荐答案

最简单的方法是收集 SchemaFile 的内容并遍历其行提取所需的数据.

The easiest thing to do here would be to collect the contents of SchemaFile and loop over its rows to extract the desired data.

首先将模式文件作为JSON读取到DataFrame中.然后调用collect并将每一行映射到字典:

First read the schema file as JSON into a DataFrame. Then call collect and map each row to a dictionary:

sfDict = map(lambda x: x.asDict(), SchemaFile.collect())
print(sfDict)
#[{'Column': u'id', 'From': u'1', 'To': u'3'},
# {'Column': u'date', 'From': u'4', 'To': u'8'},
# {'Column': u'name', 'From': u'12', 'To': u'3'},
# {'Column': u'salary', 'From': u'15', 'To': u'5'}]

现在,您可以循环遍历 sfDict 中的行,并使用这些值对您的列进行子字符串化:

Now you can loop over the rows in sfDict and use the values to substring your column:

from pyspark.sql.functions import substring
File.select(
    *[
        substring(
            str='_c0',
            pos=int(row['From']),
            len=int(row['To'])
        ).alias(row['Column']) 
        for row in sfDict
    ]
).show()
#+---+--------+----+------+
#| id|    date|name|salary|
#+---+--------+----+------+
#|001|20181120| xyz| 12341|
#|002|20180203| abc| 56792|
#|003|20181203| pqr| 25483|
#+---+--------+----+------+

请注意,我们必须将 To From 转换为整数,因为在 json 文件中将它们指定为字符串.

Note that we have to cast To and From to integers since they are specified as strings in your json file.

这篇关于从pyspark中的json文件中使用架构读取固定宽度的文件的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆