将 RDD 中的 JSON 行转换为 Apache Spark 中的数据帧 [英] Convert lines of JSON in RDD to dataframe in Apache Spark
问题描述
我在 S3 中有大约 17,000 个文件,如下所示:
{"hour": "00", "month": "07", "second": "00", "year": "1970", "timezone": "-00:00",天":12",分钟":00"}{"hour": "00", "month": "07", "second": "01", "year": "1970", "timezone": "-00:00", "day": "12", "分钟": "00"}{"hour": "00", "month": "07", "second": "02", "year": "1970", "timezone": "-00:00", "day": "12", "分钟": "00"}{"hour": "00", "month": "07", "second": "03", "year": "1970", "timezone": "-00:00", "day": "12", "分钟": "00"}{"hour": "00", "month": "07", "second": "04", "year": "1970", "timezone": "-00:00", "day": "12", "分钟": "00"}
我每天有一个文件.每个文件包含每秒的记录.∴ 一个文件中有 86,000 条记录.每个文件都有一个文件名,如YYYY-MM-DD".
我使用 boto3 生成存储桶中的文件列表.这里我只选择了 10 个使用前缀的文件.
导入boto3s3_list = []s3 = boto3.resource('s3')my_bucket = s3.Bucket('time-waits-for-no-man')对于 my_bucket.objects.filter(Prefix='1972-05-1) 中的对象:s3_list.append(object.key)
此函数返回文件列表(S3 键).然后我定义一个函数来获取文件并返回行:
def FileRead(s3Key):s3obj = boto3.resource('s3').Object(bucket_name='bucket', key=s3Key)内容 = s3obj.get()['Body'].read().decode('utf-8')产量行(**内容)
然后我使用 flatMap 分发这个函数:
job = sc.parallelize(s3_list)foo = job.flatMap(FileRead)
问题
但是,我无法弄清楚如何将这些行正确地泵入数据帧中.
<预><代码>>>>foo.toDF().show()+--------------------+|_1|+--------------------+|{"小时": "00", "米...||{"小时": "00", "米...||{"小时": "00", "米...||{"小时": "00", "米...||{"小时": "00", "米...||{"小时": "00", "米...||{"小时": "00", "米...||{"小时": "00", "米...||{"小时": "00", "米...||{"小时": "00", "米...|+--------------------+>>>foo.toDF().count()10请有人告诉我怎么做吗?
你或许应该直接使用 json
reader (spark.read.json
/sqlContext.read.json
) 但如果您知道架构,您可以尝试手动解析 JSON 字符串:
from pyspark.sql.types import StructField, StructType, StringType从 pyspark.sql 导入行导入json字段 = ['天', '小时', '分钟', '月', '秒', '时区', '年']架构 = 结构类型([StructField(field, StringType(), True) 用于字段中的字段])定义解析(S,字段):尝试:d = json.loads(s[0])返回 [tuple(d.get(field) for field in fields)]除了:返回 []spark.createDataFrame(foo.flatMap(lambda s: parse(s, fields)), schema)
你也可以使用get_json_object
:
from pyspark.sql.functions import get_json_objectdf.select([get_json_object("value", "$.{0}".format(field)).alias(field)字段中的字段])
I have some 17,000 files in S3 that look like this:
{"hour": "00", "month": "07", "second": "00", "year": "1970", "timezone": "-00:00", "day": "12", "minute": "00"}
{"hour": "00", "month": "07", "second": "01", "year": "1970", "timezone": "-00:00", "day": "12", "minute": "00"}
{"hour": "00", "month": "07", "second": "02", "year": "1970", "timezone": "-00:00", "day": "12", "minute": "00"}
{"hour": "00", "month": "07", "second": "03", "year": "1970", "timezone": "-00:00", "day": "12", "minute": "00"}
{"hour": "00", "month": "07", "second": "04", "year": "1970", "timezone": "-00:00", "day": "12", "minute": "00"}
I have one file per day. Each file contains a record for each second. ∴ 86,000 records in a file. Each file has a file name like "YYYY-MM-DD".
Using boto3 I generate a list of the files in the bucket. Here I am selecting only 10 files using the prefix.
import boto3
s3_list = []
s3 = boto3.resource('s3')
my_bucket = s3.Bucket('time-waits-for-no-man')
for object in my_bucket.objects.filter(Prefix='1972-05-1):
s3_list.append(object.key)
This function returns a list of files(S3 keys). I then define a function to fetch a file and return the rows:
def FileRead(s3Key):
s3obj = boto3.resource('s3').Object(bucket_name='bucket', key=s3Key)
contents = s3obj.get()['Body'].read().decode('utf-8')
yield Row(**contents)
I then distribute this function using flatMap:
job = sc.parallelize(s3_list)
foo = job.flatMap(FileRead)
Problem
I'm not able to work out how to properly pump these rows into a Dataframe however.
>>> foo.toDF().show()
+--------------------+
| _1|
+--------------------+
|{"hour": "00", "m...|
|{"hour": "00", "m...|
|{"hour": "00", "m...|
|{"hour": "00", "m...|
|{"hour": "00", "m...|
|{"hour": "00", "m...|
|{"hour": "00", "m...|
|{"hour": "00", "m...|
|{"hour": "00", "m...|
|{"hour": "00", "m...|
+--------------------+
>>> foo.toDF().count()
10
Please could someone show me how to do this?
You should probably use json
reader directly (spark.read.json
/ sqlContext.read.json
) but if you know the schema you can try parsing JSON string manually:
from pyspark.sql.types import StructField, StructType, StringType
from pyspark.sql import Row
import json
fields = ['day', 'hour', 'minute', 'month', 'second', 'timezone', 'year']
schema = StructType([
StructField(field, StringType(), True) for field in fields
])
def parse(s, fields):
try:
d = json.loads(s[0])
return [tuple(d.get(field) for field in fields)]
except:
return []
spark.createDataFrame(foo.flatMap(lambda s: parse(s, fields)), schema)
You can also use get_json_object
:
from pyspark.sql.functions import get_json_object
df.select([
get_json_object("value", "$.{0}".format(field)).alias(field)
for field in fields
])
这篇关于将 RDD 中的 JSON 行转换为 Apache Spark 中的数据帧的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!