apache气流-无法加载dag袋以处理故障 [英] apache airflow - Cannot load the dag bag to handle failure

查看:243
本文介绍了apache气流-无法加载dag袋以处理故障的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我创建了一个on_failure_callback函数(请参阅 Airflow默认on_failure_callback )来处理任务的失败。



当DAG中只有一个任务时,它很好用,但是,如果还有两个任务,则任务随机失败由于运算符为空,因此可以稍后手动恢复。在airflow-scheduler.out中的日志是:


[2018-05-08 14:24:21,237] {models.py:1595 }错误-执行器报告
任务实例%s完成(%s),尽管任务说它的%s。
任务是否在外部被杀死? NoneType [2018-05-08 14:24:21,238]
{jobs.py:1435}错误-无法加载dag包以处理
的故障。将任务设置为FAILED,而无需
回调或重试。您是否有足够的资源?


DAG代码为:

 从气流进口DAG 
从airflow.operators.bash_operator进口BashOperator
从日期时间进口timedelta
进口气流
从devops.util进口WechatUtil $ b来自devops.util的$ b导入JiraUtil

def on_failure_callback(context):
ti = context ['task_instance']
log_url = ti.log_url
owner = ti .task.owner
ti_str = str(context ['task_instance'])
wechat_msg =%s-所有者:%s%(ti_str,owner)
WeChatUtil.notify_team(wechat_msg)

jira_desc =请检查URL%s中的日志%(log_url)
JiraUtil.create_incident( DW,ti_str,jira_desc,owner)


args = {
'queue':'default',
'start_date':airflow.utils.dates.days_ago(1),
'retry_delay':timedelta(minutes = 1 ),
'on_failure_callback':on_failure_callback,
'所有者':'user1',
}
dag = DAG(dag_id ='test_dependence1',default_args = args,schedule_interval = '10 16 * * *')

load_crm_goods = BashOperator(
task_id ='crm_goods_job',
bash_command ='date',
dag = dag)

load_crm_memeber = BashOperator(
task_id ='crm_member_job',
bash_command ='date',
dag = dag)

load_crm_order = BashOperator(
task_id ='crm_order_job',
bash_command ='date',
dag = dag)

load_crm_eur_invt = BashOperator(
task_id ='crm_eur_invt_job',
bash_command ='date',
dag = dag)

crm_member_cohort_analysis = BashOperator(
task_id ='crm_member_cohort_analysis_job',
bash_command ='date',
dag = dag)

crm_member_cohort_analysis.set_upstream(load_crm_goods)
crm_member_cohort_analysisset。 load_crm_memeber)
crm_member_cohort_analysis.set_upstream(load_crm_order)
crm_member_cohort_analysis.set_upstream(load_crm_eur_i nvt)

crm_member_kpi_daily = BashOperator(
task_id ='crm_member_kpi_daily_job',
bash_command ='date',
dag = dag)

crm_member_kpi_daily.set_upstream(crm_member_cohort_analysis)

我试图通过添加默认内存来更新airflow.cfg 512甚至4096,但没有运气。有人会提出任何建议吗?



我还尝试如下更新我的JiraUtil和WechatUtil,从而遇到相同的错误



WechatUtil:

 导入请求

类WechatUtil:
@staticmethod
def notify_trendy_user(user_ldap_id,message):
返回None

@staticmethod
def notify_bigdata_team(message):
return None

JiraUtil:

  import json 
导入请求
类JiraUtil:
@staticmethod
def execute_jql(jql):
返回None

@staticmethod
def create_incident(projectKey ,摘要,desc,受让人=无):
返回无


解决方案



(我在这里射击示踪剂,因此,如果第一次尝试时答案不正确,请多多包涵。)



空运算符问题具有多个任务实例的方法很奇怪...如果您可以将当前代码简化为 MCVE ,例如,它将有助于解决此问题1-2运算符,如果它们与回调失败无关,则不包括JiraUtil和WechatUtil部分。



这里有2个主意:



1。您可以尝试更改从上下文中获取任务实例的行以查看是否有区别吗?



之前:

  def on_failure_callback(context):
ti = context ['task_instance']
...

之后:

  def on_failure_callback(上下文): 
ti = context ['ti']
...

我看到了气流回购中的这种用法( https ://github.com/apache/incubator-airflow/blob/c1d583f91a0b4185f760a64acbeae86739479cdb/airflow/contrib/hooks/qubole_check_hook.py#L88 )。

2。

$ b是否可以尝试在操作符上添加 provide_context = True 或添加到 default_args 中? $ b

I have created a on_failure_callback function(refering Airflow default on_failure_callback) to handle task's failure.

It works well when there is only one task in a DAG, however, if there are 2 more tasks, a task is randomly failed since the operator is null, it can resume later by manully . In airflow-scheduler.out the log is:

[2018-05-08 14:24:21,237] {models.py:1595} ERROR - Executor reports task instance %s finished (%s) although the task says its %s. Was the task killed externally? NoneType [2018-05-08 14:24:21,238] {jobs.py:1435} ERROR - Cannot load the dag bag to handle failure for . Setting task to FAILED without callbacks or retries. Do you have enough resources?

The DAG code is:

from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import timedelta
import airflow
from devops.util import WechatUtil
from devops.util import JiraUtil

def on_failure_callback(context):
    ti = context['task_instance']
    log_url = ti.log_url
    owner = ti.task.owner
    ti_str = str(context['task_instance'])
    wechat_msg = "%s - Owner:%s"%(ti_str,owner)
    WeChatUtil.notify_team(wechat_msg)

    jira_desc = "Please check log from url %s"%(log_url)
    JiraUtil.create_incident("DW",ti_str,jira_desc,owner)


args = {
    'queue': 'default',
    'start_date': airflow.utils.dates.days_ago(1),
    'retry_delay': timedelta(minutes=1),
    'on_failure_callback': on_failure_callback,
    'owner': 'user1',
    }
dag = DAG(dag_id='test_dependence1',default_args=args,schedule_interval='10 16 * * *')

load_crm_goods = BashOperator(
    task_id='crm_goods_job',
    bash_command='date',
    dag=dag)

load_crm_memeber = BashOperator(
    task_id='crm_member_job',
    bash_command='date',
    dag=dag)

load_crm_order = BashOperator(
    task_id='crm_order_job',
    bash_command='date',
    dag=dag)

load_crm_eur_invt = BashOperator(
    task_id='crm_eur_invt_job',
    bash_command='date',
    dag=dag)

crm_member_cohort_analysis = BashOperator(
    task_id='crm_member_cohort_analysis_job',
    bash_command='date',
    dag=dag)

crm_member_cohort_analysis.set_upstream(load_crm_goods)
crm_member_cohort_analysis.set_upstream(load_crm_memeber)
crm_member_cohort_analysis.set_upstream(load_crm_order)
crm_member_cohort_analysis.set_upstream(load_crm_eur_invt)

crm_member_kpi_daily = BashOperator(
    task_id='crm_member_kpi_daily_job',
    bash_command='date',
    dag=dag)

crm_member_kpi_daily.set_upstream(crm_member_cohort_analysis)

I had tried to update the airflow.cfg by adding the default memory from 512 to even 4096, but no luck. Would anyone have any advice ?

Ialso try to updated my JiraUtil and WechatUtil as following, encoutering the same error

WechatUtil:

import requests

class WechatUtil:
    @staticmethod
    def notify_trendy_user(user_ldap_id, message):
        return None

    @staticmethod
    def notify_bigdata_team(message):
        return None

JiraUtil:

import json
import requests
class JiraUtil:
    @staticmethod
    def execute_jql(jql):
        return None

    @staticmethod
    def create_incident(projectKey, summary, desc, assignee=None):
        return None

解决方案

(I'm shooting tracer bullets a bit here, so bear with me if this answer doesn't get it right on the first try.)

The null operator issue with multiple task instances is weird... it would help approaching troubleshooting this if you could boil the current code down to a MCVE e.g., 1–2 operators and excluding the JiraUtil and WechatUtil parts if they're not related to the callback failure.

Here are 2 ideas:

1. Can you try changing the line that fetches the task instance out of the context to see if this makes a difference?

Before:

def on_failure_callback(context):
    ti = context['task_instance']
    ...

After:

def on_failure_callback(context):
    ti = context['ti']
    ...

I saw this usage in the Airflow repo (https://github.com/apache/incubator-airflow/blob/c1d583f91a0b4185f760a64acbeae86739479cdb/airflow/contrib/hooks/qubole_check_hook.py#L88). It's possible it can be accessed both ways.

2. Can you try adding provide_context=True on the operators either as a kwarg or in default_args?

这篇关于apache气流-无法加载dag袋以处理故障的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆