从pyspark中的json文件中使用架构读取固定宽度的文件 [英] Read fixed width file using schema from json file in pyspark
问题描述
我的固定宽度文件如下
00120181120xyz12341
00220180203abc56792
00320181203pqr25483
以及指定架构的相应 JSON
文件:
And a corresponding JSON
file that specifies the schema:
{"Column":"id","From":"1","To":"3"}
{"Column":"date","From":"4","To":"8"}
{"Column":"name","From":"12","To":"3"}
{"Column":"salary","From":"15","To":"5"}
我使用以下方法将架构文件读入DataFrame中:
I read the schema file into DataFrame using:
SchemaFile = spark.read\
.format("json")\
.option("header","true")\
.json('C:\Temp\schemaFile\schema.json')
SchemaFile.show()
#+------+----+---+
#|Column|From| To|
#+------+----+---+
#| id| 1| 3|
#| date| 4| 8|
#| name| 12| 3|
#|salary| 15| 5|
#+------+----+---+
同样,我将固定宽度文件解析为pyspark DataFrame,如下所示:
Likewise, I am parsing the fixed width file into a pyspark DataFrame as below:
File = spark.read\
.format("csv")\
.option("header","false")\
.load("C:\Temp\samplefile.txt")
File.show()
#+-------------------+
#| _c0|
#+-------------------+
#|00120181120xyz12341|
#|00220180203abc56792|
#|00320181203pqr25483|
#+-------------------+
显然,我可以对每列的位置和长度的值进行硬编码以获得所需的输出:
I can obviously hard code the values for the positions and lengths of each column to get the desired output:
from pyspark.sql.functions import substring
data = File.select(
substring(File._c0,1,3).alias('id'),
substring(File._c0,4,8).alias('date'),
substring(File._c0,12,3).alias('name'),
substring(File._c0,15,5).alias('salary')
)
data.show()
#+---+--------+----+------+
#| id| date|name|salary|
#+---+--------+----+------+
#|001|20181120| xyz| 12341|
#|002|20180203| abc| 56792|
#|003|20181203| pqr| 25483|
#+---+--------+----+------+
但是如何使用 SchemaFile
DataFrame指定行的宽度和列名,以便可以在运行时动态应用该模式(无需硬编码)?
But how can I use the SchemaFile
DataFrame to specify the widths and column names for the lines so that the schema can be applied dynamically (without hard coding) at run time?
推荐答案
最简单的方法是收集
SchemaFile 的内容并遍历其行提取所需的数据.
The easiest thing to do here would be to collect
the contents of SchemaFile
and loop over its rows to extract the desired data.
首先将模式文件作为JSON读取到DataFrame中.然后调用collect并将每一行映射到字典:
First read the schema file as JSON into a DataFrame. Then call collect and map each row to a dictionary:
sfDict = map(lambda x: x.asDict(), SchemaFile.collect())
print(sfDict)
#[{'Column': u'id', 'From': u'1', 'To': u'3'},
# {'Column': u'date', 'From': u'4', 'To': u'8'},
# {'Column': u'name', 'From': u'12', 'To': u'3'},
# {'Column': u'salary', 'From': u'15', 'To': u'5'}]
现在,您可以循环遍历 sfDict
中的行,并使用这些值对您的列进行子字符串化:
Now you can loop over the rows in sfDict
and use the values to substring your column:
from pyspark.sql.functions import substring
File.select(
*[
substring(
str='_c0',
pos=int(row['From']),
len=int(row['To'])
).alias(row['Column'])
for row in sfDict
]
).show()
#+---+--------+----+------+
#| id| date|name|salary|
#+---+--------+----+------+
#|001|20181120| xyz| 12341|
#|002|20180203| abc| 56792|
#|003|20181203| pqr| 25483|
#+---+--------+----+------+
请注意,我们必须将 To
和 From
转换为整数,因为在 json
文件中将它们指定为字符串.
Note that we have to cast To
and From
to integers since they are specified as strings in your json
file.
这篇关于从pyspark中的json文件中使用架构读取固定宽度的文件的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!