airflow相关内容
我使用的是 Airflow 2.0.0,我的任务偶尔会被“外部"终止;运行几秒钟或几分钟后.这些任务通常会成功运行(对于通过 airflow tasks test ... 启动的手动任务和计划的 DAG 运行),所以我相信这与我的 DAG 代码无关. 当任务失败时,这似乎是任务日志中的关键错误: {local_task_job.py:170} 警告 - 此实例的状态已从外部设置为失败.终止
..
我在我的计算机(Mac AirBook、1.6 GHz Intel Core i5 和 8 GB 2133 MHz LPDDR3)上运行气流.包含多个任务的 DAG,失败并出现以下错误.在网上查了几篇文章,但几乎没有帮助.任务本身没有问题(仔细检查). 非常感谢任何帮助. [2019-08-27 13:01:55,372] {sequential_executor.py:45} INFO
..
我开始使用 Apache Airflow,我想知道如何有效地使用存储在 Vault 中的机密和密码.不幸的是,除了 Airflow 中尚未实现的钩子之外,搜索不会返回有意义的答案 项目本身. 我总是可以使用 Python 的 hvac 模块从 PythonOperator 访问 Vault,但我想知道是否有任何更好的 方法或 良好实践(例如,我错过了一个 Airflow 插件). 解决
..
我在 Airflow(在 GCP 上)部署了一个 dag,但收到错误“没有名为‘scipy’的模块".如何在 Airflow 中安装软件包? 我尝试添加一个单独的 DAG 来运行 def pip_install(package):subprocess.call([sys.executable, "-m", "pip", "install", package])def update_pack
..
我今天尝试创建我的第一个气流 DAG: from datetime import timedelta从气流导入 DAG从airflow.operators.bash_operator 导入BashOperator从airflow.operators.dummy_operator 导入DummyOperator从airflow.operators.python_operator 导入PythonO
..
很难给出正确的标题.好的,就到这里了.我正在按照本教程在我的 Mac(Mojave 版本)上安装 Apache Airflow - https://towardsdatascience.com/getting-started-with-apache-气流-df1aa77d7b1b 就在执行 pip 安装气流任务后的第一步,当我运行气流版本命令时,我收到以下错误,然后出现气流版本 -
..
有谁知道我将如何从气流中获取当前用户?我们将后端启用到 airflow/contrib/auth/backends/ldap_auth.py,因此用户通过该身份验证登录,我想知道如何让当前用户点击某些内容(自定义将我们作为插件查看). 解决方案 您可以通过调用{{ current_user.user.username }} 或 {{ current_user.user }} 来获取 在您的
..
我正在尝试运行一个非常简单的测试 DAG 来掌握 GCP Cloud Composer 的基本功能,但是每次我触发 DAG 时,都会弹出一个令人讨厌的错误,我似乎找不到任何有关怎么解决. 错误是: 2020-03-18 22:20:56,627] {taskinstance.py:1059} 错误 - __init__() 得到一个意外的关键字参数 'min'@-@{"workflow":
..
我正在尝试安排一个 dag 每 x 秒运行一次.我把开始时间作为过去的日期,catchup = False,结束时间为未来几秒钟. 尽管 dag 按预期开始,但它并没有结束并且永远持续下去. 如果我使用像 datetime(2019,9,26) 这样的绝对结束时间,但不使用 datetime.now()+timedelta(seconds=100),则 dag 结束> start_d
..
Airflow 调度程序在过去的几天里让我摸不着头脑,因为它甚至在 catchup=False 之后回填了 dag 运行.我的时区感知 dag 的开始日期为 13-04-2021 19:30 PST 或 14-04-2021 2:30 UTC 并具有以下配置: # 定义 DAG 及其参数DAG = DAG('backup_dag',default_args=default_args,start_
..
我正在尝试设置一个 DAG,其中每分钟运行一个任务,然后在第 5 分钟(就在 1 分钟任务之前)运行另一个任务.这真的只是测试,我不打算在这么短的时间间隔内运行作业. 从视觉上看,我的 DAG 如下所示: 代码本身是这样的: from 气流导入 DAG从airflow.operators.bash_operator 导入BashOperator从airflow.operators.p
..
我是 Airflow 的 xcom 功能的新手.我用 PythonOperator 进行了尝试,它工作正常(即,我可以将值从上下文中推入和拉出),但是当我在 BashOperator 上尝试时,它不起作用.但是,我只能通过在任务创建期间添加 xcom_push=True 属性来提取最终的 stdout 语句.这是一回事.2)但我也希望像我们在 PythonOp 中那样根据它们的键(到 BashOp
..
我正在测试通过 Apache Airflow 的托管工作流 (MWAA).AWS 为我部署和管理的 Airflow 版本是 1.10.12. 当我尝试在 /api/experimental/test 我得到状态代码 403 Forbidden. 是否可以在 MWAA 中启用实验性 API?怎么样? 解决方案 默认情况下,api.auth_backend 配置选项设置为 airf
..
我正在使用 puckel/docker-airflow 来部署气流.目前,网络服务器不要求任何登录凭据.如何向其中添加用户?也许我必须在 docker-compose.yml 中添加一些环境变量,但我找不到它.docker-compose 文件位于这里 提前致谢. 解决方案 创建自己的airflow.cfg(假设它存储在./config/airflow.cfg)并按照Airflow
..
我有 3 个任务要在相同的 dag 中运行.而 Task1 返回字典 task2 和 task3 的列表尝试使用结果返回的一个字典元素任务1. def get_list():....返回 listOfDictdef parse_1(example_dict):...def parse_2(example_dict):...dag = DAG('dagexample', default_args=
..
我正在使用气流(谷歌作曲家),但在下面遇到了一些例外 类型错误:无法pickle _thread.RLock 对象 糟糕.____/( ( ) ) \___/( ( ( ) _ )) ) )\(( ( )( ) ) ( ) )((/( _( ) ( _) ) ( () ) )( ( ( (_) (( ( ) .((_ ) . )_( ( ) ( ( ) ) ).) ( )( ( ( ( )
..
Apache Airflow 版本:v2.1.1 Kubernetes 版本(如果您使用的是 kubernetes)(使用 kubectl 版本):-客户端版本:version.Info{Major:“1",Minor:“21",GitVersion:“v1.21.2",GitCommit:“092fbfbf53427de67cac1e9fa54aaa09a28371d7",Build2-0
..
我们使用 Amazon MWAA Airflow,很少有标记为“FAILED"的任务;但根本没有日志.就好像容器在没有注意到我们的情况下关闭了一样. 我找到了这个链接:https://cloud.google.com/composer/docs/how-to/using/troubleshooting-dags#task_fails_without_Emitting_logs这由机器上的OO
..
我已经设置了气流并且正在使用以下 vscode 调试配置运行 DAG: {“版本":“0.2.0",“配置":[{“名称":“Python:当前文件",“类型":“蟒蛇",“请求":“启动",“程序":“${file}",“控制台":“集成终端",“justMyCode":假,“环境":{“AIRFLOW__CORE__EXECUTOR":“DebugExecutor",“AIRFLOW__DE
..
想要尝试新的任务流 API,我到了需要 2 个并行任务的地步. 使用 Airflow v1,我曾经做过类似的事情 task_1 >>[任务_2,任务_3][任务_2,任务_3] >>任务_4 对于PythonOperator ,我们现在调用任务的方式不同了 我如何使用 TaskFlow 做列表? 谢谢 解决方案 如果每个任务都依赖于上一个任务的值,您可以通过以下方式实现
..