将RDD中的JSON行转换为Apache Spark中的数据框 [英] Convert lines of JSON in RDD to dataframe in Apache Spark

查看:58
本文介绍了将RDD中的JSON行转换为Apache Spark中的数据框的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我在S3中有大约17,000个文件,如下所示:

I have some 17,000 files in S3 that look like this:

{"hour": "00", "month": "07", "second": "00", "year": "1970", "timezone": "-00:00", "day": "12", "minute": "00"}
{"hour": "00", "month": "07", "second": "01", "year": "1970", "timezone": "-00:00", "day": "12", "minute": "00"}
{"hour": "00", "month": "07", "second": "02", "year": "1970", "timezone": "-00:00", "day": "12", "minute": "00"}
{"hour": "00", "month": "07", "second": "03", "year": "1970", "timezone": "-00:00", "day": "12", "minute": "00"}
{"hour": "00", "month": "07", "second": "04", "year": "1970", "timezone": "-00:00", "day": "12", "minute": "00"}

我每天只有一个文件.每个文件包含每秒的记录.a 86,000条记录在一个文件中.每个文件的文件名都类似于"YYYY-MM-DD".

I have one file per day. Each file contains a record for each second. ∴ 86,000 records in a file. Each file has a file name like "YYYY-MM-DD".

使用boto3,我会生成存储桶中文件的列表.在这里,我仅使用前缀选择10个文件.

Using boto3 I generate a list of the files in the bucket. Here I am selecting only 10 files using the prefix.

import boto3
s3_list = []
s3 = boto3.resource('s3')
my_bucket = s3.Bucket('time-waits-for-no-man')
for object in my_bucket.objects.filter(Prefix='1972-05-1):
    s3_list.append(object.key)

此函数返回文件列表(S3键).然后,我定义一个函数来提取文件并返回行:

This function returns a list of files(S3 keys). I then define a function to fetch a file and return the rows:

def FileRead(s3Key):
    s3obj = boto3.resource('s3').Object(bucket_name='bucket', key=s3Key)
    contents = s3obj.get()['Body'].read().decode('utf-8')
    yield Row(**contents)

然后我使用flatMap分发此功能:

I then distribute this function using flatMap:

job = sc.parallelize(s3_list)
foo = job.flatMap(FileRead)

问题

但是我无法弄清楚如何将这些行正确地泵入数据框.

Problem

I'm not able to work out how to properly pump these rows into a Dataframe however.

>>> foo.toDF().show()
+--------------------+                                                          
|                  _1|
+--------------------+
|{"hour": "00", "m...|
|{"hour": "00", "m...|
|{"hour": "00", "m...|
|{"hour": "00", "m...|
|{"hour": "00", "m...|
|{"hour": "00", "m...|
|{"hour": "00", "m...|
|{"hour": "00", "m...|
|{"hour": "00", "m...|
|{"hour": "00", "m...|
+--------------------+

>>> foo.toDF().count()
10  

请有人告诉我该怎么做?

Please could someone show me how to do this?

推荐答案

您可能应该直接使用 json 阅读器( spark.read.json / sqlContext.read.json ),但是如果您知道该架构,则可以尝试手动解析JSON字符串:

You should probably use json reader directly (spark.read.json / sqlContext.read.json) but if you know the schema you can try parsing JSON string manually:

from pyspark.sql.types import StructField, StructType, StringType
from pyspark.sql import Row
import json

fields = ['day', 'hour', 'minute', 'month', 'second', 'timezone', 'year']
schema =  StructType([
  StructField(field, StringType(), True) for field in fields
])

def parse(s, fields):
    try:
        d = json.loads(s[0])
        return [tuple(d.get(field) for field in fields)]
    except:
        return []

spark.createDataFrame(foo.flatMap(lambda s: parse(s, fields)), schema)

您还可以使用 get_json_object :

from pyspark.sql.functions import get_json_object

df.select([
    get_json_object("value", "$.{0}".format(field)).alias(field)
    for field in fields
])

这篇关于将RDD中的JSON行转换为Apache Spark中的数据框的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆