气流中的execution_date:需要作为变量访问 [英] execution_date in airflow: need to access as a variable

查看:541
本文介绍了气流中的execution_date:需要作为变量访问的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我真的是这个论坛的新手。但是一段时间以来,我一直在为公司服务。抱歉,这个问题听起来真是愚蠢。

I am really a newbie in this forum. But I have been playing with airflow, for sometime, for our company. Sorry if this question sounds really dumb.

我正在使用BashOperators一堆编写管道。
基本上,对于每个任务,我只想使用'curl'调用REST api

I am writing a pipeline using bunch of BashOperators. Basically, for each Task, I want to simply call a REST api using 'curl'

这就是我的管道的样子(非常简化的版本):

This is what my pipeline looks like(very simplified version):

from airflow import DAG
from airflow.operators import BashOperator, PythonOperator
from dateutil import tz
import datetime

datetime_obj = datetime.datetime

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime.datetime.combine(datetime_obj.today() - datetime.timedelta(1), datetime_obj.min.time()),
    'email': ['xxxx@xxx.xxx'],
    'email_on_failure': True,
    'email_on_retry': False,
    'retries': 2,
    'retry_delay': datetime.timedelta(minutes=5),
}


current_datetime = datetime_obj.now(tz=tz.tzlocal())

dag = DAG(
    'test_run', default_args=default_args, schedule_interval=datetime.timedelta(minutes=60))

curl_cmd='curl -XPOST "'+hostname+':8000/run?st='+current_datetime +'"'


t1 = BashOperator(
    task_id='rest-api-1',
    bash_command=curl_cmd,
    dag=dag)

如果您注意到我在做 current_datetime = datetime_obj.now(tz = tz.tzlocal())
相反,我在这里想要的是 'execution_date' >

If you notice I am doing current_datetime= datetime_obj.now(tz=tz.tzlocal()) Instead what I want here is 'execution_date'

如何直接使用'execution_date'并将其分配给python文件中的变量?

How do I use 'execution_date' directly and assign it to a variable in my python file?

我遇到了访问args的一般问题。
任何帮助都会得到真正的感谢。

I have having this general issue of accessing args. Any help will be genuinely appreciated.

谢谢

推荐答案

BashOperator bash_command 参数是一个模板 >。您可以使用<$ c $作为 datetime object 在任何模板中访问 execution_date c> execution_date 变量。在模板中,可以使用任何 jinja2 方法对其进行操作。

The BashOperator's bash_command argument is a template. You can access execution_date in any template as a datetime object using the execution_date variable. In the template, you can use any jinja2 methods to manipulate it.

使用以下内容作为 BashOperator bash_command string

Using the following as your BashOperator bash_command string:

# pass in the first of the current month
some_command.sh {{ execution_date.replace(day=1) }}

# last day of previous month
some_command.sh {{ execution_date.replace(day=1) - macros.timedelta(days=1) }}

如果只希望与执行日期等效的字符串, ds 将返回日期戳(YYYY-MM-DD), ds_nodash 返回不带破折号的内容(YYYYMMDD),等等。有关的更多信息,请参见 Api文档

If you just want the string equivalent of the execution date, ds will return a datestamp (YYYY-MM-DD), ds_nodash returns same without dashes (YYYYMMDD), etc. More on macros is available in the Api Docs.

您的最终运算符会看起来像这样:

Your final operator would look like:

command = """curl -XPOST '%(hostname)s:8000/run?st={{ ds }}'""" % locals()
t1 = BashOperator( task_id='rest-api-1', bash_command=command, dag=dag)

这篇关于气流中的execution_date:需要作为变量访问的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆