如何在s3 parquet中编写json文件 [英] How to write the json file in s3 parquet

查看:118
本文介绍了如何在s3 parquet中编写json文件的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

    import json
    import requests
    import datetime
    import boto3
    import parquet
    import pyarrow
    import pandas as pd
    from pandas import DataFrame
         
    noaa_codes = [
        'KAST',
        'KBDN',
        'KCVO',
        'KEUG',
        'KHIO',
        'KHRI',
        'KMMV',
        'KONP',
        'KPDX',
        'KRDM',
        'KSLE',
        'KSPB',
        'KTMK',
        'KTTD',
        'KUAO'
        ]
     
    urls = [f"https://api.weather.gov/stations/{x}/observations/latest" for x in noaa_codes]
    
    
    s3_bucket="XXXXXX"
    s3_prefix = "XXXXX/parquetfiles"
    s3 = boto3.resource("s3")
    
    def get_datetime():
        dt = datetime.datetime.now()
        return dt.strftime("%Y%m%d"), dt.strftime("%H:%M:%S")
              
    def reshape(r):
        props = r["properties"]
        res = {    
            "stn": props["station"].split("/")[-1],
            "temp": props["temperature"]["value"],
            "dewp": props["dewpoint"]["value"],
            "slp": props["seaLevelPressure"]["value"],
            "stp": props["barometricPressure"]["value"],
            "visib": props["visibility"]["value"],
            "wdsp": props["windSpeed"]["value"],
            "gust": props["windGust"]["value"],
            "max": props["maxTemperatureLast24Hours"]["value"],
            "min": props["minTemperatureLast24Hours"]["value"],
            "prcp": props["precipitationLast6Hours"]["value"]
        }
        return res
               
    def lambda_handler(event, context):
                           
        responses = []
        for url in urls:
            r = requests.get(url)
            responses.append(reshape(r.json()))
        
        datestr, timestr = get_datetime()
        fname = f"noaa_hourly_measurements_{timestr}"    
        file_prefix = "/".join([s3_prefix, datestr, fname])
        s3_obj = s3.Object(s3_bucket, file_prefix)`enter code here`
        serialized = []
        for r in responses:
            serialized.append(json.dumps(r))
        jsonlines_doc = "\n".join(serialized)
        df= pd.read_json(jsonlines_doc,lines=True)
        df.to_parquet(s3_obj, engine='auto', compression='snappy', index=None)
        print("created")

无法在 aws s3 中创建镶木地板文件,但可以在本地创建它.建议一个更好的方法来做到这一点.当我运行代码时,我可以在 s3 中创建一个 json 文件,但是当我尝试创建镶木地板文件时出现以下错误,出现以下错误 errorMessage":无效的文件路径或缓冲区对象类型:<class 'dict'>", "errorType": "ValueError","stackTrace": [["/var/task/lambda_function.py",80,"lambda_handler","df.to_parquet(location, engine='auto', compression='snappy', index=None)"

unable create a parquet file in aws s3 but locally can create it . suggest a better way to do this . When i run the code i could create a json file in s3 but getting the below error when i tried to create parquet file getting the below error errorMessage": "Invalid file path or buffer object type: <class 'dict'>", "errorType": "ValueError","stackTrace": [["/var/task/lambda_function.py",80,"lambda_handler","df.to_parquet(location, engine='auto', compression='snappy', index=None)"

推荐答案

确保您的 s3_object 是 s3 url 字符串.它必须看起来像这样

Make sure your s3_object is an s3 url string. It has to look something like this

s3://my_bucket/path/to/data_folder/my-file.parquet"

除此之外,不建议使用 Pandas 将数据帧作为 parquet 写入 S3.对于 python 3.6+,AWS 有一个名为 aws-data-wrangler 的库,它有助于 Pandas/S3/Parquet 之间的集成

Besides this, it's not recommended to use pandas for writing a dataframe as parquet to S3. For python 3.6+ AWS has a library called aws-data-wrangler that helps with the integration between Pandas/S3/Parquet

安装做;

pip install awswrangler

将您的 df 写入 s3,执行;

to write your df to s3, do;

import awswrangler as wr
wr.s3.to_parquet(df=df, path="s3://my_bucket/path/to/data_folder/my-file.parquet")

这篇关于如何在s3 parquet中编写json文件的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆