airflow相关内容

Apache Airflow 调度程序不会在预定时间触发 DAG

当我安排 DAG 在每天的特定时间运行时,DAG 根本不会执行.但是,当我重新启动 Airflow 网络服务器和调度程序时,DAG 会在该特定日期的预定时间执行一次,并且不会从第二天开始执行.我正在使用带有 python 2.7.6 的 Airflow 版本 v1.7.1.3.这是 DAG 代码: from 气流导入 DAG从airflow.operators.bash_operator 导入B ..
发布时间:2022-01-03 23:19:46 Python

气流 TriggerDagRunOperator 如何更改执行日期

我注意到对于计划任务,执行日期根据 设置在过去 Airflow 是为了满足 ETL 需求而开发的.在 ETL 世界中,您通常会汇总数据.所以,如果我想总结数据2016-02-19,我会在格林威治标准时间 2016-02-20 午夜做,这将是在 2016-02-19 的所有数据都可用之后. 然而,当一个 dag 触发另一个 dag 时,执行时间设置为 now(). 有没有办法让触发 ..
发布时间:2022-01-01 21:14:58 其他开发

每天在特定时间运行 google colab

我最近构建了一个运行在 Google Colaboratory 上的 Python 程序,我需要每天在特定时间运行该程序,那么有没有办法安排它在 Google Colab 上运行? 解决方案 您需要创建一个 notebooks.csv 列出所有 Colaboratory URL.然后使用colabctl来运行每个notebook(按顺序,在CSV中同步提到),然后暂停n秒,然后再次运行. ..
发布时间:2021-12-31 23:52:58 Python

如何在 Apache Airflow 中查询 Google Big Query 并将结果作为 Pandas Dataframe 返回?

我正在尝试将 bigquery 查询保存到自定义 Airflow 运算符中的数据帧. 我尝试过使用airflow.contrib.hooks.bigquery_hook 和get_pandas_df 方法.任务卡在身份验证上,因为它希望我手动访问 url 进行身份验证. 因此,我对身份验证进行了硬编码.这有效,但绝对不理想. 工作但不理想(凭证是硬编码的): def execu ..
发布时间:2021-12-30 23:12:30 其他开发

Google Cloud Composer BigQuery Operator - Get Jobs API HTTPError 404

我正在尝试在 GCC 上运行 BigQueryOperator.我已经成功运行了 BigQueryCreateEmptyTableOperator 和 BigQueryTableDeleteOperator. 这是我的 dag 代码: 导入日期时间导入操作系统导入日志从气流导入配置从气流进口模型从气流导入 DAG从airflow.operators 导入email_operator从air ..

从气流中的 BigQueryOperator 获取结果

我正在尝试使用气流从 BigQueryOperator 获取结果,但我找不到办法做到这一点.我尝试在 bq_cursor 成员(在 1.10 中可用)中调用 next() 方法,但它返回 None.这就是我尝试这样做的方式 导入日期时间导入日志从气流进口模型从airflow.contrib.operators 导入bigquery_operator从airflow.operators导入pyth ..
发布时间:2021-12-30 22:52:47 Python

如何将 Airflow 连接到 oracle 数据库

我正在尝试使用 Airflow 创建与 oracle 数据库实例 (oracle:thin) 的连接. 根据他们的文档 我输入了我的主机名,然后是端口号和 SID: 主机:example.com:1524/sid 填写其他字段为: Conn 类型:Oracle 架构:用户名(文档 说:将您的用户名用于架构 ) 登录:用户名 密码:* * * 建立连接 ..
发布时间:2021-12-30 13:51:48 数据库

Airflow HiveCliHook 连接到远程配置单元集群?

我正在尝试从 Airflow 的本地副本连接到我的 Hive 服务器,但 HiveCliHook 似乎正在尝试连接到我的 Hive 本地副本. 我正在运行以进行测试: 导入气流从airflow.models 导入连接从airflow.hooks.hive_hooks 导入HiveCliHookusr = '我的用户名'pss = '我的通行证'会话 = 气流.settings.Sessio ..
发布时间:2021-12-28 23:28:49 其他开发

有没有办法在运行 master 的不同服务器上提交 spark 作业

我们需要安排 spark 作业,因为我们熟悉 apache-airflow,我们希望继续使用它来创建不同的工作流.我在网上搜索,但没有找到在气流上安排火花作业的分步指南,也没有找到在不同服务器上运行主机的选项. 对此的回答将不胜感激.提前致谢. 解决方案 您可以通过 3 种方式使用 Apache Airflow 远程提交 Spark 作业: (1) 使用 SparkSubmit ..
发布时间:2021-12-22 21:37:30 其他开发

如何从谷歌云作曲家调用云功能?

对于我想从 Cloud Composer 管道内部调用/调用云函数的要求,但我找不到关于它的太多信息,我尝试使用 SimpleHTTP 气流运算符,但出现此错误: [2021-09-10 10:35:46,649] {taskinstance.py:1503} 错误 - 任务因异常而失败回溯(最近一次调用最后一次):文件“/opt/python3.8/lib/python3.8/site-pac ..

Airflow xcom pull 只返回字符串

我有一个气流管道,我需要从 pubsub 订阅中获取文件名,然后将该文件导入到云 sql 实例中.我使用 CloudSqlInstanceImportOperator 导入 CSV 文件.该运算符需要一个主体,其中包含文件名和其他参数.由于我在运行时读取了该文件名,因此我还必须在运行时定义主体.这一切都有效.但是当我从 xcom 拉出正文时,它返回一个字符串而不是 python 字典.所以 Clo ..
发布时间:2021-12-20 19:22:05 Python

如何在不使用 cron 作业的情况下在谷歌云中安排 python 脚本?

我有两个 python 脚本在本地环境中每天运行一次.一种是获取数据,另一种是对其进行格式化. 现在我想将这些脚本部署到 Google 的云环境并每天运行一次/两次. 我可以使用 Google Cloud Function 做到这一点,还是需要 App Engine? 为什么没有 cron 作业:因为我不希望我的系统/VM 运行一整天(不使用时). 我可以使用 Cloud ..

Cloud Composer (Airflow) 作业卡住

我的 Cloud Composer 管理的 Airflow 卡住了几个小时,因为我取消了一个耗时太长的任务实例(我们称之为任务 A) 我已经清除了所有的 DAG 运行和任务实例,但是有几个作业正在运行,一个作业处于关闭状态(我想是任务 A 的作业)(我的工作快照). 此外,调度程序似乎没有运行,因为最近删除的 DAG 不断出现在仪表板中 有没有办法终止作业或重置调度程序?欢迎任何 ..

您可以获得 Google Cloud Composer/Airflow 的静态外部 IP 地址吗?

我知道如何为 Compute Engine 分配静态外部 IP 地址,但这可以通过 Google Cloud Composer (Airflow) 完成吗?我想大多数公司都需要该功能,因为他们通常会写回可能位于防火墙后面的仓库,但我找不到任何有关如何执行此操作的文档. 解决方案 现在是可能的.您需要在私有 VPC 中配置 Cloud Composer 并使用 Cloud NAT 公开集群. ..

如何控制 Airflow 安装的并行性或并发性?

在我的一些 Apache Airflow 安装中,即使调度程序似乎没有完全加载,计划运行的 DAG 或任务也不会运行.如何增加可并发运行的 DAG 或任务的数量? 同样,如果我的安装在高负载下并且我想限制我的 Airflow 工作人员拉入队列任务的速度(例如减少资源消耗),我可以调整什么来降低平均负载? 解决方案 这是自 Airflow v1.10.2 以来可用的配置选项的扩展列表. ..
发布时间:2021-12-20 18:35:33 Python

如何在 Windows 上运行 Airflow

运行 Airflow 的通常说明不适用于 Windows 环境: #airflow 需要一个家,~/airflow 是默认的,# 但如果你愿意,你可以在其他地方打基础# (选修的)出口AIRFLOW_HOME=~/airflow# 使用 pip 从 pypi 安装pip 安装气流# 初始化数据库气流初始化数据库# 启动web服务器,默认端口为8080气流网络服务器 -p 8080 Airflo ..
发布时间:2021-12-14 10:50:47 Python