airflow相关内容
好吧,我可能很愚蠢,但无论如何;如何通过 气流? 我假设它们应该是获取 requirements.txt 或其他内容的标准功能.在检查他们的 repo 时,我确实看到了一些像 ADDITIONAL_PYTHON_DEPS 这样的 ENV 变量暗示我这应该是可能的,但是在 docker-compose 文件中设置这些变量实际上并没有安装库的. 版本:'3'x-气流-共同:&airflow-c
..
我阅读了之前相关或问过的所有问题和答案,但仍然没有找到适合我的问题的答案. 我使用的是 python 3.6.5 并且 pip(和 setuptools)是最新的.我从这里安装了 Microsoft Visual C++ Redistributable for Visual Studio 2017:https://www.visualstudio.com/downloads/#build-t
..
每当我的 DAG 中的任务无法运行或重试运行时,我都会尝试让 Airflow 使用 AWS SES 向我发送电子邮件.我也在使用我的 AWS SES 凭证,而不是我的通用 AWS 凭证. 我目前的airflow.cfg [email]email_backend =airflow.utils.email.send_email_smtp[smtp]# 如果您希望气流在重试、失败时发送电子邮件,
..
我正在使用 docker-compose 来设置可扩展的气流集群.我的方法基于这个 Dockerfile https://hub.docker.com/r/puckel/docker-airflow/ 我的问题是将日志设置为从 s3 写入/读取.当 dag 完成时,我收到这样的错误 *** 日志文件不是本地的.*** 在这里获取:http://ea43d4d49f35:8793/log/x
..
我有 Airflow 作业,它们在 EMR 集群上运行良好.我需要的是,假设我有 4 个需要 EMR 集群的气流作业,假设需要 20 分钟才能完成任务.为什么我们不能在 DAG 运行时创建 EMR 集群,一旦作业完成,它将终止创建的 EMR 集群. 解决方案 当然,这将是最有效地利用资源.让我警告你:这里面有很多细节;我会尽量列出尽可能多的内容.我鼓励您添加自己的综合答案,列出您遇到的任何
..
如何在 EMR 主集群(由 Terraform 创建)和 Airflow 之间建立连接.我在具有相同 SG、VPC 和子网的 AWS EC2 服务器下设置了气流. 我需要解决方案,以便 Airflow 可以与 EMR 对话并执行 Spark 提交. https://aws.amazon.com/blogs/big-data/build-a-concurrent-data-orchest
..
我在 EC2 上配置了一个气流网络服务器,它侦听端口 8080. 我在 EC2 前面有一个 AWS ALB(应用程序负载均衡器),监听 https 80(面向互联网),实例目标端口面向 http 8080. 我无法浏览 https:// 来自浏览器,因为气流网络服务器将我重定向到 http :///admin,ALB 不监听. 如果我上网 https:
..
我正在尝试实施基于令牌的身份验证,作为触发气流 dag 的一部分.有没有办法添加 JWT 身份验证来生成 access_token 来触发 API?非常感谢任何帮助! 解决方案 我们的身份验证服务返回一个 JSON 响应,如下所示: {“clientToken":“322e8df6-0597-479e-984d-db6d8705ee66"} 这是我在气流 2.1 中使用 SimpleH
..
我正在尝试实施基于令牌的身份验证,作为触发气流 dag 的一部分.有没有办法添加 JWT 身份验证来生成 access_token 来触发 API?非常感谢任何帮助! 解决方案 我们的身份验证服务返回一个 JSON 响应,如下所示: {“clientToken":“322e8df6-0597-479e-984d-db6d8705ee66"} 这是我在气流 2.1 中使用 SimpleH
..
我正在尝试实施基于令牌的身份验证,作为触发气流 dag 的一部分.有没有办法添加 JWT 身份验证来生成 access_token 来触发 API?非常感谢任何帮助! 解决方案 我们的身份验证服务返回一个 JSON 响应,如下所示: {“clientToken":“322e8df6-0597-479e-984d-db6d8705ee66"} 这是我在气流 2.1 中使用 SimpleH
..
我想澄清一下 Cloud Dataflow 或 Cloud Composer 是否适合这项工作,但我从 Google 文档中不清楚. 目前,我正在使用 Cloud Dataflow 读取非标准 csv 文件——进行一些基本处理——并将其加载到 BigQuery 中. 让我举一个非常基本的例子: # file.csv输入\x01日期房子\x0112/27/1982汽车\x0111/9/
..
我正在努力提供有用的信息,但我远不是一名数据工程师. 我目前正在使用 python 库 pandas 对我的数据执行一系列转换,这些数据有很多输入(目前是 CSV 和 excel 文件).输出是几个excel文件.我希望能够使用并行计算执行计划的受监控批处理作业(我的意思是不像我对 Pandas 所做的那样顺序),每月一次. 我不太了解 Beam 或 Airflow,我很快通读了文档,
..
是否有任何直接的方法可以将 shell 脚本运行到 dataproc 集群中.目前我可以通过 pysparkoperator 运行 shell(它调用另一个 python 文件,然后这个 python 文件调用 shell 脚本).我搜索了很多链接,但到目前为止还没有找到任何直接的方法. 如果有人能告诉我最简单的方法,那对我真的很有帮助. 解决方案 PIG job with sh o
..
我收到以下错误: 文件“",第 1 行,在模板中TemplateSyntaxError:205 处的意外字符 u'\\' 当我将其包含在我的代码中时: '{{ macros.ds_format(macros.ds_add(ds, -13), "%Y-%m-%d", "%Y%m%d") }}' 我在vim中重新输入,以防编码错误,但仍然没有运气! 解决方案 改为: "{{ macr
..
我正在使用 GoogleCloudStorageToBigQueryOperator 将数据从 Google Storage 加载到 bigQuery Json 文件的列数可能比我定义的要多.在这种情况下,我希望加载作业继续 - 只需忽略这个无法识别的列.我尝试使用 ignore_unknown_values 参数,但没有任何区别. 我的接线员: def dc():返回 [{"name
..
我已经看过下面的帖子,但我正在尝试为气流 1.7.1.3 设置这个 Airflow 远程日志记录不起作用 有没有人有连接中指定密钥文件以访问项目相关存储桶所需的格式的具体示例?{"项目":"","key_path":""}这是我试过的. 解决方案 您似乎在使用 Google 凭据时遇到了问题.有很多方法可以解决它,我将解释我的方法.您必须首先在 Airflow 中创建连接 ID
..
我有循环创建任务的列表.就大小而言,该列表是静态的. for counter, account_id in enumerate(ACCOUNT_LIST):task_id = f“bash_task_{counter}";如果 account_id:trigger_task = BashOperator(task_id=task_id,bash_command="echo hello ther
..
我在从已弃用的 BigQueryOperator 转换为 BigQueryInsertJobOperator 时遇到了一些问题.我有以下任务: bq_extract = BigQueryInsertJobOperator(dag="big_query_task,task_id='bq_query',gcp_conn_id='google_cloud_default',params={'data'
..
我尝试在终止 postgres 挂起查询的气流中记录触发我的 DAG 的用户,但它不起作用.你能帮忙看看有什么问题吗?我错过了什么?当我检查气流中的日志而不是用户名时,到处都是“无". utils.py(描述会话逻辑的地方) 导入日志从airflow.models.log 导入日志从airflow.utils.db 导入create_session从airflow.operators.py
..
我的 DAG 中有一个 python 运算符和 BigQueryInsertJobOperator.python 运算符返回的结果应传递给 params 字段中的 BigQueryInsertJobOperator. 下面是我正在运行的脚本. def get_columns():字段=“名称"返回字段与模型.DAG(“xcom_test",default_args=default_args
..