Pyspark:解析一列json字符串 [英] Pyspark: Parse a column of json strings
问题描述
我有一个 pyspark 数据框,由一列组成,称为 json
,其中每一行都是 json 的 unicode 字符串.我想解析每一行并返回一个新的数据帧,其中每一行都是解析后的 json.
# 示例数据帧jstr1 = u'{"header":{"id":12345,"foo":"bar"},"body":{"id":111000,"name":"foobar","sub_json":{"id":54321,"sub_sub_json":{"col1":20,"col2":"somethong"}}}}'jstr2 = u'{"header":{"id":12346,"foo":"baz"},"body":{"id":111002,"name":"barfoo","sub_json":{"id":23456,"sub_sub_json":{"col1":30,"col2":"别的东西"}}}}'jstr3 = u'{"header":{"id":43256,"foo":"foobaz"},"body":{"id":20192,"name":"bazbar","sub_json":{"id":39283,"sub_sub_json":{"col1":50,"col2":"另一件事"}}}}'df = sql_context.createDataFrame([Row(json=jstr1),Row(json=jstr2),Row(json=jstr3)])
我试过用 json.loads
映射每一行:
(df.select('json').rdd.map(lambda x: json.loads(x)).toDF()).展示()
但这会返回一个 TypeError: expected string or buffer
我怀疑部分问题是从 dataframe
转换为 rdd
时,模式信息丢失了,所以我也尝试手动输入架构信息:
schema = StructType([StructField('json', StringType(), True)])rdd = (df.select('json').rdd.map(lambda x: json.loads(x)))new_df = sql_context.createDataFrame(rdd, schema)new_df.show()
但我得到相同的TypeError
.
查看 这个答案,看起来用 flatMap
展平行可能在这里有用,但我也没有成功:
schema = StructType([StructField('json', StringType(), True)])rdd = (df.select('json').rdd.flatMap(lambda x: x).flatMap(lambda x: json.loads(x)).map(lambda x: x.get('body')))new_df = sql_context.createDataFrame(rdd, schema)new_df.show()
我收到此错误:AttributeError: 'unicode' object has no attribute 'get'
.
将带有 json 字符串的数据帧转换为结构化数据帧实际上在 spark 中非常简单,如果您之前将数据帧转换为字符串的 RDD(请参阅:http://spark.apache.org/docs/latest/sql-编程指南.html#json-datasets)
例如:
<预><代码>>>>new_df = sql_context.read.json(df.rdd.map(lambda r: r.json))>>>new_df.printSchema()根|-- body: struct (nullable = true)||-- id: long (nullable = true)||-- 名称:字符串(可为空 = 真)||-- sub_json: struct (nullable = true)|||-- id: long (nullable = true)|||-- sub_sub_json: struct (nullable = true)||||-- col1: long (nullable = true)||||-- col2: string (nullable = true)|-- 标头:结构(可为空 = 真)||-- foo: string (nullable = true)||-- id: long (nullable = true)I have a pyspark dataframe consisting of one column, called json
, where each row is a unicode string of json. I'd like to parse each row and return a new dataframe where each row is the parsed json.
# Sample Data Frame
jstr1 = u'{"header":{"id":12345,"foo":"bar"},"body":{"id":111000,"name":"foobar","sub_json":{"id":54321,"sub_sub_json":{"col1":20,"col2":"somethong"}}}}'
jstr2 = u'{"header":{"id":12346,"foo":"baz"},"body":{"id":111002,"name":"barfoo","sub_json":{"id":23456,"sub_sub_json":{"col1":30,"col2":"something else"}}}}'
jstr3 = u'{"header":{"id":43256,"foo":"foobaz"},"body":{"id":20192,"name":"bazbar","sub_json":{"id":39283,"sub_sub_json":{"col1":50,"col2":"another thing"}}}}'
df = sql_context.createDataFrame([Row(json=jstr1),Row(json=jstr2),Row(json=jstr3)])
I've tried mapping over each row with json.loads
:
(df
.select('json')
.rdd
.map(lambda x: json.loads(x))
.toDF()
).show()
But this returns a TypeError: expected string or buffer
I suspect that part of the problem is that when converting from a dataframe
to an rdd
, the schema information is lost, so I've also tried manually entering in the schema info:
schema = StructType([StructField('json', StringType(), True)])
rdd = (df
.select('json')
.rdd
.map(lambda x: json.loads(x))
)
new_df = sql_context.createDataFrame(rdd, schema)
new_df.show()
But I get the same TypeError
.
Looking at this answer, it looks like flattening out the rows with flatMap
might be useful here, but I'm not having success with that either:
schema = StructType([StructField('json', StringType(), True)])
rdd = (df
.select('json')
.rdd
.flatMap(lambda x: x)
.flatMap(lambda x: json.loads(x))
.map(lambda x: x.get('body'))
)
new_df = sql_context.createDataFrame(rdd, schema)
new_df.show()
I get this error: AttributeError: 'unicode' object has no attribute 'get'
.
Converting a dataframe with json strings to structured dataframe is'a actually quite simple in spark if you convert the dataframe to RDD of strings before (see: http://spark.apache.org/docs/latest/sql-programming-guide.html#json-datasets)
For example:
>>> new_df = sql_context.read.json(df.rdd.map(lambda r: r.json))
>>> new_df.printSchema()
root
|-- body: struct (nullable = true)
| |-- id: long (nullable = true)
| |-- name: string (nullable = true)
| |-- sub_json: struct (nullable = true)
| | |-- id: long (nullable = true)
| | |-- sub_sub_json: struct (nullable = true)
| | | |-- col1: long (nullable = true)
| | | |-- col2: string (nullable = true)
|-- header: struct (nullable = true)
| |-- foo: string (nullable = true)
| |-- id: long (nullable = true)
这篇关于Pyspark:解析一列json字符串的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!