如何使用Boto3 get_query_results方法从AWS Athena创建数据框 [英] How to Create Dataframe from AWS Athena using Boto3 get_query_results method

查看:222
本文介绍了如何使用Boto3 get_query_results方法从AWS Athena创建数据框的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在使用AWS Athena查询来自S3的原始数据。由于Athena将查询输出写入S3输出存储桶,所以我经常这样做:

I'm using AWS Athena to query raw data from S3. Since Athena writes the query output into S3 output bucket I used to do:

df = pd.read_csv(OutputLocation)

但这似乎是一种昂贵的方法。最近,我注意到 boto3 get_query_results 方法返回一个复杂的结果字典。

But this seems like an expensive way. Recently I noticed the get_query_results method of boto3 which returns a complex dictionary of the results.

client = boto3.client('athena')
response = client.get_query_results(
        QueryExecutionId=res['QueryExecutionId']
        )

我面临两个主要问题:


  1. 如何将 get_query_results 的结果格式化为 pandas 数据框?

  2. get_query_results 仅返回1000行。如何使用它获得两百万行?

  1. How can I format the results of get_query_results into pandas data frame?
  2. get_query_results only returns 1000 rows. How can I use it to get two million rows?


推荐答案

get_query_results仅返回1000行。我如何使用它将200万行放入Pandas数据框中?

如果尝试添加:

client.get_query_results(QueryExecutionId=res['QueryExecutionId'], MaxResults=2000)

您将获得下一个错误:


调用$ b时发生错误(InvalidRequestException) $ b GetQueryResults操作:MaxResults超过了最大允许长度
的长度1000。

An error occurred (InvalidRequestException) when calling the GetQueryResults operation: MaxResults is more than maximum allowed length 1000.

如果获得,则可以获得数百万行文件直接从存储桶s3(在下一个示例中转换为Pandas Dataframe)中:

You can obtain millions of rows if you obtain the file directly from your bucket s3 (in the next example into a Pandas Dataframe):

def obtain_data_from_s3(self):
    self.resource = boto3.resource('s3', 
                          region_name = self.region_name, 
                          aws_access_key_id = self.aws_access_key_id,
                          aws_secret_access_key= self.aws_secret_access_key)

    response = self.resource \
    .Bucket(self.bucket) \
    .Object(key= self.folder + self.filename + '.csv') \
    .get()

    return pd.read_csv(io.BytesIO(response['Body'].read()), encoding='utf8')   

self.filename可以是:

The self.filename can be:

self.filename = response['QueryExecutionId'] + ".csv"

因为雅典娜将文件命名为QueryExecutionId。我将为您编写所有需要查询的代码,并返回包含所有行和列的数据框。

Because Athena names the files as the QueryExecutionId. I will write you all my code that takes a query and return a dataframe with all the rows and columns.

import time
import boto3
import pandas as pd
import io

class QueryAthena:

    def __init__(self, query, database):
        self.database = database
        self.folder = 'my_folder/'
        self.bucket = 'my_bucket'
        self.s3_input = 's3://' + self.bucket + '/my_folder_input'
        self.s3_output =  's3://' + self.bucket + '/' + self.folder
        self.region_name = 'us-east-1'
        self.aws_access_key_id = "my_aws_access_key_id"
        self.aws_secret_access_key = "my_aws_secret_access_key"
        self.query = query

    def load_conf(self, q):
        try:
            self.client = boto3.client('athena', 
                              region_name = self.region_name, 
                              aws_access_key_id = self.aws_access_key_id,
                              aws_secret_access_key= self.aws_secret_access_key)
            response = self.client.start_query_execution(
                QueryString = q,
                    QueryExecutionContext={
                    'Database': self.database
                    },
                    ResultConfiguration={
                    'OutputLocation': self.s3_output,
                    }
            )
            self.filename = response['QueryExecutionId']
            print('Execution ID: ' + response['QueryExecutionId'])

        except Exception as e:
            print(e)
        return response                

    def run_query(self):
        queries = [self.query]
        for q in queries:
            res = self.load_conf(q)
        try:              
            query_status = None
            while query_status == 'QUEUED' or query_status == 'RUNNING' or query_status is None:
                query_status = self.client.get_query_execution(QueryExecutionId=res["QueryExecutionId"])['QueryExecution']['Status']['State']
                print(query_status)
                if query_status == 'FAILED' or query_status == 'CANCELLED':
                    raise Exception('Athena query with the string "{}" failed or was cancelled'.format(self.query))
                time.sleep(10)
            print('Query "{}" finished.'.format(self.query))

            df = self.obtain_data()
            return df

        except Exception as e:
            print(e)      

    def obtain_data(self):
        try:
            self.resource = boto3.resource('s3', 
                                  region_name = self.region_name, 
                                  aws_access_key_id = self.aws_access_key_id,
                                  aws_secret_access_key= self.aws_secret_access_key)

            response = self.resource \
            .Bucket(self.bucket) \
            .Object(key= self.folder + self.filename + '.csv') \
            .get()

            return pd.read_csv(io.BytesIO(response['Body'].read()), encoding='utf8')   
        except Exception as e:
            print(e)  


if __name__ == "__main__":       
    query = "SELECT * FROM bucket.folder"
    qa = QueryAthena(query=query, database='myAthenaDb')
    dataframe = qa.run_query()

这篇关于如何使用Boto3 get_query_results方法从AWS Athena创建数据框的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆