Airflow - 用于动态查询列表的 AWSAthenaOperator [英] Airflow - AWSAthenaOperator for dynamic list of queries

查看:62
本文介绍了Airflow - 用于动态查询列表的 AWSAthenaOperator的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个有步骤 read_date_information_file 的 DAG,它读取文件并返回查询列表(我可以从输出访问).然后我想遍历这个并使用 AWSAthenaOperator 针对此列表中的每个查询在 Athena 上执行查询.

I have a DAG which has step read_date_information_file which reads a file and returns the list of queries (which I can access from output). I then want to loop through this and execute the queries on Athena using AWSAthenaOperator for each query in this list.

def get_date_information(ti):
    s3 = boto3.client('s3')
    data = s3.get_object(Bucket=output_bucket, Key=key)
    contents = data['Body'].read().decode("utf-8")
    print('Date information is: ', contents)
    events_list = contents.split(',')
    return events_list

with DAG(
    dag_id='adserver_split_job_emr_job_dag',
    default_args={
        'owner': 'adserver_airflow',
        'depends_on_past': False,
        'email': ['airflow@example.com'],
        'email_on_failure': False,
        'email_on_retry': False,
    },
    dagrun_timeout=timedelta(hours=2),
    start_date=datetime(2021, 9, 22, 9),
    schedule_interval='20 * * * *',
) as dag:

    read_date_information_file = PythonOperator(
        task_id="read_date_information_file",
        python_callable=get_date_information
    )

    query_list = read_date_information_file.output

    for i, event in enumerate(query_list):
        run_query = AWSAthenaOperator(
            task_id=f'run_query_{i}',
            query=event,
            output_location=config.ATHENA_OUTPUT_LOCATION,
            database=config.ATHENA_DATABASE_NAME,
            aws_conn_id='aws_default'
        )

    read_date_information_file >> run_query

我收到如下错误:

Broken DAG: [/opt/airflow/dags/test.py] Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/models/baseoperator.py", line 593, in __setattr__
    super().__setattr__(key, value)
  File "/home/airflow/.local/lib/python3.6/site-packages/airflow/utils/timeout.py", line 37, in handle_timeout
    raise AirflowTaskTimeout(self.error_message)
airflow.exceptions.AirflowTaskTimeout: DagBag import timeout for /opt/airflow/dags/test.py after 30.0s, PID: 10056

但是如果我将 query_list 设置为硬编码,那么它就可以正常工作,例如:

But if I set query_list to hard coded one then it works fine, like:

query_list = ["SELECT 1;", "SELECT 2;", "SELECT 3;"]

这方面的任何帮助.我正在遵循 此处 中提到的解决方案中的这种循环方法.不同之处在于我在动态列表上循环,这是上一步的输出.

Any help in this regard. I am following this looping approach from the solution mentioned here. The difference is that I am looping on the dynamic list which is the output from the previous step.

推荐答案

您面临的问题与 Athena 没有直接关系.这更像是对 Airflow 的错误使用.

The issue you are facing is not directly related to Athena. It's more of a wrong usage of Airflow.

您遇到此问题是因为 Airflow 由于超时而无法将您的 DAG 导入 DagBag (源代码)

You are experiencing this issue because Airflow was unable to import your DAG into DagBag due to timeout (source code)

发生这种情况是因为您通过尝试从 Xcoms 创建任务来对元数据库进行昂贵的调用.Airflow 每 30 秒解析一次您的 DAG(min_file_process_interval 的默认值)这意味着每 30 秒您打开一次与数据库的连接.这是一个不好的做法,你不应该这样做!它很容易使您的数据库不堪重负.

This happens because you are making expensive call to the meta database by trying to create tasks from Xcoms. Airflow parse your DAG every 30 seconds (default value of min_file_process_interval ) This means that every 30 seconds you are opening connection to the database. This is a bad practice and you should not do that! It can easily overwhelm your database.

如果您仍然希望继续这条危险的路径,您需要更改airflow.cfg中DAGBAG_IMPORT_TIMEOUT的默认值(请参阅源代码)

If you still wish to continue with this dangerous path you need to change the default of DAGBAG_IMPORT_TIMEOUT in airflow.cfg (see source code)

我的建议:不要尝试基于 Xcoms 动态创建任务.

My recommendation: Do not try to dynamically create tasks based on Xcoms.

至于您的具体用例 - 您没有提到您要解决的问题.我会根据函数名称 (read_date_information_file) 假设您正在尝试针对特定日期运行 Athena 查询,并且只要有人解析文件,该日期就会更改.也许您应该尝试使用 Jinja 将日期直接集成到您的查询中.这意味着您在 query=event 中的 event 将直接包含对 xcom 的引用,例如:

As for your specific use case - you did not mention what you are trying to solve. I would assume based on the function name (read_date_information_file) that you are trying to run Athena query for a specific date and this date changes whenever someone parses the file. Maybe you should just try to integrate the date directly into your query using Jinja. Meaning that your event in query=event will contain the reference to the xcom directly something like:

SELECT ...
FROM ...
WHERE something={{ ti.xcom_pull(task_ids='read_date_information_file') }}

这样,您只需要一个静态的 AWSAthenaOperator,但它运行的查询会根据 read_date_information_file 任务推送的 Xcom 值动态更改.

That way all you need is a single static AWSAthenaOperator but the query it runs is dynamically changed based on the Xcom value pushed by read_date_information_file task.

这篇关于Airflow - 用于动态查询列表的 AWSAthenaOperator的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆