apache-airflow相关内容
与该问题相关的原始代码可以找到。 我对位移位运算符和 set_upstream / set_downstream 方法在我在DAG中定义的任务循环内工作。当DAG的主执行循环配置如下: dash_workers.get_id_creds()中的uid: clear_tables.set_downstream(id_worker(uid)) 或 用于dash_
..
我是Airflow的新手,并创建了我的第一个DAG。这是我的DAG代码。我希望DAG从现在开始,然后每天运行一次。 从airflow导入DAG 从airflow.operators.bash_operator导入BashOperator 从datetime导入datetime,timedelta default_args = { '所有者':'气流', 'depends_on_p
..
我从 https://airflow.apache.org/tutorial.html照原样,唯一的变化是我将dag设置为以5分钟的间隔运行,开始日期为2017-12-17 T13:40:00 UTC。我在13:40之前启用了dag,因此没有回填,并且我的机器在UTC上运行。 dag的运行符合预期(即从UTC的13:45开始间隔5分钟) 现在,当我进入树状视图时,我无法理解图。总共有3个任务
..
我们是否可以通过将任务/延迟超时设置为“无”并手动触发其运行,来使用气流dag来定义永无止境的工作(即,具有无条件循环以消耗流数据的任务)?配备气流监测器永无止境的任务会引起问题吗? 谢谢 解决方案 通过Airflow运行它有点奇怪,但是是的,我认为这不是问题。请注意,如果重新启动运行该作业的工作程序(假设使用CeleryExecutor),则会中断任务,并且如果未设置重试,则需要再次手
..
我有一个DAG,该DAG是通过查询DynamoDB的列表而创建的,对于列表中的每个项目,都使用PythonOperator创建一个任务并将其添加到DAG中。在下面的示例中未显示,但是需要注意的是,列表中的某些项依赖于其他任务,因此我使用 set_upstream 来强制执行依赖性。 -airflow_home \-dags \-工作流程.py workflow.
..
尝试将Airflow流程拆分为2台服务器。服务器A已经在独立模式下运行,其中包含所有功能,并且具有DAG,我想将其设置为新设置中的工作服务器,并带有附加服务器。 服务器B是新服务器,它将在MySQL上托管元数据数据库。 我可以让Server A运行LocalExecutor,还是必须使用CeleryExecutor? 气流调度程序是否必须在具有DAG的服务器上运行?还是必须在集群中的每
..
我正在运行气流管道,但代码看起来不错,但实际上我正在获得airflow.exceptions.AirflowException:在DAG中检测到周期。错误的任务: 您能帮忙解决此问题吗? 解决方案 由于重复的task_id'a在多个任务。
..
问题:我想在Github上使用最新版本的Apache-Airflow安装 apache-airflow 并具有所有依赖项吗? 如何使用 pip 做到这一点? 在生产环境中可以安全使用它? 解决方案 我发现这更有用: pip安装git + git://github.com/apache/incubator-airflow.git 它将安装开发中的最新版本
..
Airflow在尝试运行DAG时返回错误,提示它找不到环境变量,这很奇怪,因为它能够找到我存储为Python变量的其他3个环境变量。那些变量根本没有问题。 我在〜/ .profile 中拥有所有4个变量,并且也完成了 export var1 =“ varirable1” export var2 =“ varirable2” export var3 =“ varirable3”
..
我有一个非常简单的DAG,其中有两个任务,如下所示: default_args = { 'owner' :'me', 'start_date':dt.datetime.today(), 'retries':0, 'retry_delay':dt.timedelta(minutes = 1) } dag = DAG( 'test DAG', default_args = defau
..
我想构建一些我需要捕获所有叶子任务并向其添加下游依赖关系的东西,以使我们的数据库中的工作完成。有没有简单的方法可以找到Airflow中DAG的所有叶节点? 解决方案 使用 upstream_task_ids 和 downstream_task_ids @property 来自 BaseOperator def get_start_tasks(dag:DAG)-> List [Bas
..
是否有一种优雅的方法来为DAG成功事件定义回调? 我真的不想设置一个任务,它将通过on_sucess_callback置于所有其他任务的上游。 谢谢! 解决方案 因此,如果我理解正确,则DAG的最后一步是在成功的情况下回叫其他系统。因此,我鼓励您完全按照这种方式对DAG进行建模。 为什么要尝试从DAG逻辑中隐藏该部分?这正是上游/下游建模的目的。为了图形的美观,将DAG逻辑的
..
我对气流几乎是全新的。 我有两个步骤: 获取所有符合条件的文件 解压缩文件 文件压缩后为半个演出,未压缩时为2-3个演出。我可以轻松地一次处理20多个文件,这意味着解压缩所有文件的运行时间可能比任何合理的超时时间都要长 我可以使用XCom来获取步骤1的结果,但是我想做的是这样的: def processFiles(reqDir,gvcfDir,matchSuffi
..
问题:我想将文件从Google Cloud Storage Bucket中的一个文件夹(例如Bucket1中的Folder1)复制到另一个Bucket(例如Bucket2)。我找不到用于Google Cloud Storage的任何Airflow操作员来复制文件。 解决方案 我刚刚在contrib中找到了一个新操作员2小时前: https://github.com名为 GoogleClou
..
是否有可能获得Airflow的实际开始时间?开始时间是指dag的第一个任务开始运行的确切时间。 我知道我可以使用宏来获取执行日期。如果作业是使用trigger_dag运行的,这就是我所说的开始时间,但是如果该作业是按日程运行的,则 {{execution_date}} 返回昨天的日期。 / p> 我也试图将 datetime.now()。isoformat()放在dag代码的主体中,然
..
我正在使用 LocalScheduler 选项在EC2实例上使用气流。我调用了 airflow scheduler 和 airflow webserver ,一切似乎都运行良好。也就是说,在将 cron 字符串提供给 schedule_interval 以“每10分钟执行一次”之后,'* / 10 * * * *',默认情况下,作业每24小时继续执行一次。这是代码的标题: from dat
..
我在XCOM中有一个巨大的json文件,稍后dag执行完成后就不需要了,但是我仍然在UI中看到包含所有数据的Xcom对象,有没有办法以编程方式删除XCOM DAG运行完成。 谢谢 解决方案 您必须添加任务取决于您的元数据数据库(sqllite,PostgreSql,MySql ..)可在DAG运行完成后删除XCOM。 delete_xcom_task = PostgresOp
..
气流新手。尝试运行sql并将结果存储在BigQuery表中。 出现以下错误。不确定在哪里设置default_rpoject_id。 请帮助我。 错误: 回溯(最近一次通话): 中的文件“ / usr / local / bin / airflow”,第28行。 args.func(args) 文件“ /usr/local/lib/python2.7/d
..
我想在不与Airflow GUI交互的情况下创建S3连接。是否可以通过airflow.cfg或命令行使用? 我们正在使用AWS角色,并且以下连接参数对我们有用: {“ aws_account_id”:“ xxxx “,” role_arn“:” yyyyy“} 因此,在GUI上为S3手动创建连接是可行的,现在我们要自动化该过程并将其添加为气流部署过程的一部分。可以解决吗? 解决
..
当我们进行一次dagrun时,在Airflow UI上的“图形视图”中,我们将获得每个作业运行的详细信息。 JobID类似于“ scheduled__2017-04-11T10:47:00” 。 我需要此JobID来跟踪和创建日志,在其中我维护每个任务/调试运行所花费的时间。 所以我的问题是我如何在正在运行的同一dag中获取JobID 。 谢谢谢坦 解决方案 此值实
..