Airflow 1.10.2未将日志写入S3 [英] Airflow 1.10.2 not writing logs to S3

查看:110
本文介绍了Airflow 1.10.2未将日志写入S3的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试在docker容器中运行气流并将日志发送到s3。我具有以下环境
Airflow版本: 1.10.2



还更新了airflow.cfg



logging_config_class = log_config.LOGGING_CONFIG



其中 LOGGING_CONFIG 在类 config / log_config.py 中定义。



我创建了以下文件:


  1. config / __ init __。py

  2. config / log_config.py

我已经通过以下方式设置了 log_config.py

 #-*-编码:utf-8-*-

#根据一项
#或更多贡献者许可协议授权给Apache软件基金会(ASF)。有关版权拥有权的其他信息,请参见随该工作一起分发的NOTICE文件
#。 ASF根据Apache许可证版本2.0(
#许可证)将此文件
#授予您许可;您不得使用该文件,除非遵守许可证中的
#。您可以通过

#http://www.apache.org/licenses/LICENSE-2.0

#获得许可的副本,除非适用法律要求书面同意,根据本许可分发的
#软件将按
#按现状的基础分发,不附带任何明示或暗示的
#保证或条件。有关许可下
#特定语言的权限和限制
#的信息,请参见许可。

import os
通过输入import Dict,任何

从airflow导入配置为conf
从airflow.utils.file import mkdirs

#待办事项:应在此文件中配置日志记录格式和级别
#,而不是从airflow.cfg中配置。当前

#settings.py和cli.py中还有其他日志格式和级别配置。请参阅AIRFLOW-1455。
LOG_LEVEL = conf.get('core','LOGGING_LEVEL')。upper()


#Flask appbuilder的信息级别日志非常冗长,
#因此默认情况下设置为 WARN。
FAB_LOG_LEVEL = conf.get('core','FAB_LOGGING_LEVEL')。upper()

LOG_FORMAT = conf.get('core','LOG_FORMAT')

BASE_LOG_FOLDER = conf.get('core','BASE_LOG_FOLDER')

PROCESSOR_LOG_FOLDER = conf.get('scheduler','CHILD_PROCESS_LOG_DIRECTORY')

DAG_PROCESSOR_MANAGER_LOG
conf.get('core','DAG_PROCESSOR_MANAGER_LOG_LOCATION')

FILENAME_TEMPLATE = conf.get('core','LOG_FILENAME_TEMPLATE')

PROCESSOR_FILENAME_TEMPLATE = conf。 get('core','LOG_PROCESSOR_FILENAME_TEMPLATE')

#远程日志记录的存储桶url
#s3存储桶应以 s3://开头
#gcs存储桶应开始使用 gs://
#wasb桶应该以 wasb
#开头,只是为了帮助Airflow选择正确的处理程序
REMOTE_BASE_LOG_FOLDER = os.environ ['AIRFLOW__CORE__REMOTE_BASE_LOG_FOLDER']

ELASTICSEARCH_HOST = conf.get('elasticsearch','ELASTICSEARCH_HOST')

LOG_ID_TEMPLATE = conf.get('elastics earch','ELASTICSEARCH_LOG_ID_TEMPLATE')

END_OF_LOG_MARK = conf.get('elasticsearch','ELASTICSEARCH_END_OF_LOG_MARK')

LOGGING_CONFIG = {
'version':1,
'disable_existing_loggers':False,
'formatters':{
'airflow':{
'format':LOG_FORMAT,
},
},
'handlers':{
'console':{
'class':'airflow.utils.log.logging_mixin.RedirectStdHandler',
'formatter':'airflow',
'stream':'sys.stdout'
},
'task':{
'class':'airflow.utils.log.file_task_handler.FileTaskHandler',
'formatter':'airflow',
'base_log_folder':os.path.expanduser(BASE_LOG_FOLDER),
'filename_template':FILENAME_TEMPLATE,
},
'processor' :{{
'class':'airflow.utils.log.file_processor_handler.FileProcessorHandler',
'fo rmatter':'气流',
'base_log_folder':os.path.expanduser(PROCESSOR_LOG_FOLDER),
'filename_template':PROCESSOR_FILENAME_TEMPLATE,
}
},
' loggers :: {
'airflow.processor':{
'handlers':['processor'],
'level':LOG_LEVEL,
'propagate':False,
},
'airflow.task':{
'handlers':['task'],
'level':LOG_LEVEL,
'propagate':False,
},
'flask_appbuilder':{
'handler':['console'],
'level':FAB_LOG_LEVEL,
'propagate':是,
}
},
'root':{
'handlers':['console'],
'level':LOG_LEVEL,
}
}#类型:Dict [str,Any]

DEFAULT_DAG_PARSING_LOGGING_CONFIG = {
'handlers':{
'processor_manager':{
'class':'测井.handlers.RotatingFileHandler',
'formatter':'airflow',
'filename':DAG_PROCESSOR_MANAGER_LOG_LOCATION,
'mode':'a',
'maxBytes':104857600, #100MB
'backupCount':5
}
},
'loggers':{
'airflow.processor_manager':{
'handlers': ['processor_manager'],
'level':LOG_LEVEL,
'propagate':False,
}
}
}

REMOTE_HANDLERS = {
's3':{
'task':{
'class':'airflow.utils.log.s3_task_handler.S3TaskHandler',
'formatter':'airflow ',
'base_log_folder':os.path.expanduser(BASE_LOG_FOLDER),
's3_log_folder':REMOTE_BASE_LOG_FOLDER,
'filename_template':FILENAME_TEMPLATE,
},
'处理器'':{
'class':'airflow.utils.log.s3_task_handler.S3TaskHandler',
'formatter':'airflow',
'base_log_folder':os.path.expanduser(PROCESSOR_LOG_FOLDER),
's3_log_folder':REMOTE_BASE_LOG_FOLDER,
'filename_template':PROCESSOR_FILENAME_TEMPLATE, b $ b},
},
'gcs':{
'task':{
'class':'airflow.utils.log.gcs_task_handler.GCSTaskHandler',
'formatter':'airflow',
'base_log_folder':os.path.expanduser(BASE_LOG_FOLDER),
'gcs_log_folder':REMOTE_BASE_LOG_FOLDER,
'filename_template':FILENAME_TEMPLATE,
},
'处理器':{
'class':'airflow.utils.log.gcs_task_handler.GCSTaskHandler',
'formatter':'airflow',
' base_log_folder':os.path.expanduser(PROCESSOR_LOG_FOLDER),
'gcs_log_folder':REMOTE_BASE_LOG_FOLDER,
'filename_template':PROCESSOR_FILENAME_TEMPLATE,
},
},
'wasb':{
'task':{
'class':'airflow.utils.log.wasb_task_handler.WasbTaskHandler',
'formatter':'airflow',
'base_log_folder':os.path.expanduser(BASE_LOG_FOLDER),
'wasb_log_folder':REMOTE_BASE_LOG_FOLDER,
'wasb_container':'airlogs',
'filename_template':FILENAME_TEMPLATE,
'delete_local_copy':False,
},
'processor':{
'class':'airflow.utils.log.wasb_task_handler .WasbTaskHandler',
'formatter':'airflow',
'base_log_folder':os.path.expanduser(PROCESSOR_LOG_FOLDER),
'wasb_log_folder':REMOTE_BASE_LOG_FOLDER,
'wasb_container' :'airflow-logs',
'filename_template':PROCESSOR_FILENAME_TEMPLATE,
'delete_local_copy':False,
},
},
'elasticsearch':{
'任务':{
'class':'airflow.utils.log.es_task_handler.ElasticsearchTaskHandler',
'formatter':'airflow',
'base_log_folder':os.path.expanduser (BASE_LOG_FOLDER),
'log_id_template':LOG_ID_TEMPLATE,
'filename_template':FILENAME_TEMPLATE,
'end_of_log_mark':END_OF_LOG_MARK,
'host':ELASTICSEARCH_HOST,
},
},
}

REMOTE_LOGGING = os.environ ['AIRFLOW__CORE__REMOTE_LOGGING']

#仅在设置CONFIG_PROCESSOR_MANAGER_LOGGER时更新处理程序和记录器。
#这是为了避免在多个进程中多次初始化RotatingFileHandler
#时出现异常。
如果os.environ.get('CONFIG_PROCESSOR_MANAGER_LOGGER')=='真':
LOGGING_CONFIG ['handlers'] \
.update(DEFAULT_DAG_PARSING_LOGGING_CONFIG ['handlers]])
b LOGGING_CONFIG ['loggers'] \
.update(DEFAULT_DAG_PARSING_LOGGING_CONFIG ['loggers'])

#手动为Processor_manager处理程序创建日志目录,因为RotatingFileHandler
#仅创建文件而不是目录。
processor_manager_handler_config = DEFAULT_DAG_PARSING_LOGGING_CONFIG ['handlers'] [
'processor_manager']
目录= os.path.dirname(processor_manager_handler_config ['filename'])
mkdirs(目录,0o755)

如果REMOTE_LOGGING和REMOTE_BASE_LOG_FOLDER.startswith('s3://'):
LOGGING_CONFIG ['handlers']。update(REMOTE_HANDLERS ['s3'])
elif REMOTE_LOGGING和REMOTE_BASE_LOG_FOLDER.startswith('gs://'):
LOGGING_CONFIG ['handlers']。update(REMOTE_HANDLERS ['gcs'])
elif REMOTE_LOGGING和REMOTE_BASE_LOG_FOLDER.startswith('wasb'):
LOGGING_CONFIG ['handlers'.. update(REMOTE_HANDLERS ['wasb'])
elif REMOTE_LOGGING和ELASTICSEARCH_HOST:
LOGGING_CONFIG ['handlers']。update(REMOTE_HANDLERS ['elasticsearch'])

我已经设置了必需的环境变量,并在 log_config.py中读取了它们

  
1. AIRFLOW__CORE__REMOTE_LOGGING =真
2. A IRFLOW__CORE__REMOTE_BASE_LOG_FOLDER = s3_bucket_name
3. AIRFLOW__CORE__REMOTE_LOG_CONN_ID = s3:// AWS_ACCESS_KEY_ID:AWS_SECRET_ACCESS_KEY @ AIRFLOW_LOG_BUCKET_NAME
我启动容器,可以看到以下日志,该日志显示气流正在读取我的自定义日志配置



但是,气流未将任何日志写入s3存储桶。我查看了其他类似的问题,然后按照他们的意见进行了操作(如上述步骤所述)。关于我所缺少的任何指示。



以下是气流Web ui DAG运行详细信息的快照:





日志是在容器上创建的

解决方案

我认为问题出在AIRFLOW__CORE__REMOTE_BASE_LOG_FOLDER的格式上。应该是s3:// your-bucket-name-here,因为我只是在使用存储桶名称,而不是在s3之前添加


I'm trying to run airflow in a docker container and send the logs to s3. I've the following environment Airflow Version: 1.10.2

Also updated the following in the airflow.cfg

logging_config_class = log_config.LOGGING_CONFIG

where LOGGING_CONFIG is defined in the class config/log_config.py.

I've created following files:

  1. config/__init__.py
  2. config/log_config.py

I've set up the log_config.py in the following way:

# -*- coding: utf-8 -*-
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.

import os
from typing import Dict, Any

from airflow import configuration as conf
from airflow.utils.file import mkdirs

# TODO: Logging format and level should be configured
# in this file instead of from airflow.cfg. Currently
# there are other log format and level configurations in
# settings.py and cli.py. Please see AIRFLOW-1455.
LOG_LEVEL = conf.get('core', 'LOGGING_LEVEL').upper()


# Flask appbuilder's info level log is very verbose,
# so it's set to 'WARN' by default.
FAB_LOG_LEVEL = conf.get('core', 'FAB_LOGGING_LEVEL').upper()

LOG_FORMAT = conf.get('core', 'LOG_FORMAT')

BASE_LOG_FOLDER = conf.get('core', 'BASE_LOG_FOLDER')

PROCESSOR_LOG_FOLDER = conf.get('scheduler', 'CHILD_PROCESS_LOG_DIRECTORY')

DAG_PROCESSOR_MANAGER_LOG_LOCATION = \
    conf.get('core', 'DAG_PROCESSOR_MANAGER_LOG_LOCATION')

FILENAME_TEMPLATE = conf.get('core', 'LOG_FILENAME_TEMPLATE')

PROCESSOR_FILENAME_TEMPLATE = conf.get('core', 'LOG_PROCESSOR_FILENAME_TEMPLATE')

# Storage bucket url for remote logging
# s3 buckets should start with "s3://"
# gcs buckets should start with "gs://"
# wasb buckets should start with "wasb"
# just to help Airflow select correct handler
REMOTE_BASE_LOG_FOLDER = os.environ['AIRFLOW__CORE__REMOTE_BASE_LOG_FOLDER']

ELASTICSEARCH_HOST = conf.get('elasticsearch', 'ELASTICSEARCH_HOST')

LOG_ID_TEMPLATE = conf.get('elasticsearch', 'ELASTICSEARCH_LOG_ID_TEMPLATE')

END_OF_LOG_MARK = conf.get('elasticsearch', 'ELASTICSEARCH_END_OF_LOG_MARK')

LOGGING_CONFIG = {
    'version': 1,
    'disable_existing_loggers': False,
    'formatters': {
        'airflow': {
            'format': LOG_FORMAT,
        },
    },
    'handlers': {
        'console': {
            'class': 'airflow.utils.log.logging_mixin.RedirectStdHandler',
            'formatter': 'airflow',
            'stream': 'sys.stdout'
        },
        'task': {
            'class': 'airflow.utils.log.file_task_handler.FileTaskHandler',
            'formatter': 'airflow',
            'base_log_folder': os.path.expanduser(BASE_LOG_FOLDER),
            'filename_template': FILENAME_TEMPLATE,
        },
        'processor': {
            'class': 'airflow.utils.log.file_processor_handler.FileProcessorHandler',
            'formatter': 'airflow',
            'base_log_folder': os.path.expanduser(PROCESSOR_LOG_FOLDER),
            'filename_template': PROCESSOR_FILENAME_TEMPLATE,
        }
    },
    'loggers': {
        'airflow.processor': {
            'handlers': ['processor'],
            'level': LOG_LEVEL,
            'propagate': False,
        },
        'airflow.task': {
            'handlers': ['task'],
            'level': LOG_LEVEL,
            'propagate': False,
        },
        'flask_appbuilder': {
            'handler': ['console'],
            'level': FAB_LOG_LEVEL,
            'propagate': True,
        }
    },
    'root': {
        'handlers': ['console'],
        'level': LOG_LEVEL,
    }
}  # type: Dict[str, Any]

DEFAULT_DAG_PARSING_LOGGING_CONFIG = {
    'handlers': {
        'processor_manager': {
            'class': 'logging.handlers.RotatingFileHandler',
            'formatter': 'airflow',
            'filename': DAG_PROCESSOR_MANAGER_LOG_LOCATION,
            'mode': 'a',
            'maxBytes': 104857600,  # 100MB
            'backupCount': 5
        }
    },
    'loggers': {
        'airflow.processor_manager': {
            'handlers': ['processor_manager'],
            'level': LOG_LEVEL,
            'propagate': False,
        }
    }
}

REMOTE_HANDLERS = {
    's3': {
        'task': {
            'class': 'airflow.utils.log.s3_task_handler.S3TaskHandler',
            'formatter': 'airflow',
            'base_log_folder': os.path.expanduser(BASE_LOG_FOLDER),
            's3_log_folder': REMOTE_BASE_LOG_FOLDER,
            'filename_template': FILENAME_TEMPLATE,
        },
        'processor': {
            'class': 'airflow.utils.log.s3_task_handler.S3TaskHandler',
            'formatter': 'airflow',
            'base_log_folder': os.path.expanduser(PROCESSOR_LOG_FOLDER),
            's3_log_folder': REMOTE_BASE_LOG_FOLDER,
            'filename_template': PROCESSOR_FILENAME_TEMPLATE,
        },
    },
    'gcs': {
        'task': {
            'class': 'airflow.utils.log.gcs_task_handler.GCSTaskHandler',
            'formatter': 'airflow',
            'base_log_folder': os.path.expanduser(BASE_LOG_FOLDER),
            'gcs_log_folder': REMOTE_BASE_LOG_FOLDER,
            'filename_template': FILENAME_TEMPLATE,
        },
        'processor': {
            'class': 'airflow.utils.log.gcs_task_handler.GCSTaskHandler',
            'formatter': 'airflow',
            'base_log_folder': os.path.expanduser(PROCESSOR_LOG_FOLDER),
            'gcs_log_folder': REMOTE_BASE_LOG_FOLDER,
            'filename_template': PROCESSOR_FILENAME_TEMPLATE,
        },
    },
    'wasb': {
        'task': {
            'class': 'airflow.utils.log.wasb_task_handler.WasbTaskHandler',
            'formatter': 'airflow',
            'base_log_folder': os.path.expanduser(BASE_LOG_FOLDER),
            'wasb_log_folder': REMOTE_BASE_LOG_FOLDER,
            'wasb_container': 'airflow-logs',
            'filename_template': FILENAME_TEMPLATE,
            'delete_local_copy': False,
        },
        'processor': {
            'class': 'airflow.utils.log.wasb_task_handler.WasbTaskHandler',
            'formatter': 'airflow',
            'base_log_folder': os.path.expanduser(PROCESSOR_LOG_FOLDER),
            'wasb_log_folder': REMOTE_BASE_LOG_FOLDER,
            'wasb_container': 'airflow-logs',
            'filename_template': PROCESSOR_FILENAME_TEMPLATE,
            'delete_local_copy': False,
        },
    },
    'elasticsearch': {
        'task': {
            'class': 'airflow.utils.log.es_task_handler.ElasticsearchTaskHandler',
            'formatter': 'airflow',
            'base_log_folder': os.path.expanduser(BASE_LOG_FOLDER),
            'log_id_template': LOG_ID_TEMPLATE,
            'filename_template': FILENAME_TEMPLATE,
            'end_of_log_mark': END_OF_LOG_MARK,
            'host': ELASTICSEARCH_HOST,
        },
    },
}

REMOTE_LOGGING = os.environ['AIRFLOW__CORE__REMOTE_LOGGING']

# Only update the handlers and loggers when CONFIG_PROCESSOR_MANAGER_LOGGER is set.
# This is to avoid exceptions when initializing RotatingFileHandler multiple times
# in multiple processes.
if os.environ.get('CONFIG_PROCESSOR_MANAGER_LOGGER') == 'True':
    LOGGING_CONFIG['handlers'] \
        .update(DEFAULT_DAG_PARSING_LOGGING_CONFIG['handlers'])
    LOGGING_CONFIG['loggers'] \
        .update(DEFAULT_DAG_PARSING_LOGGING_CONFIG['loggers'])

    # Manually create log directory for processor_manager handler as RotatingFileHandler
    # will only create file but not the directory.
    processor_manager_handler_config = DEFAULT_DAG_PARSING_LOGGING_CONFIG['handlers'][
        'processor_manager']
    directory = os.path.dirname(processor_manager_handler_config['filename'])
    mkdirs(directory, 0o755)

if REMOTE_LOGGING and REMOTE_BASE_LOG_FOLDER.startswith('s3://'):
    LOGGING_CONFIG['handlers'].update(REMOTE_HANDLERS['s3'])
elif REMOTE_LOGGING and REMOTE_BASE_LOG_FOLDER.startswith('gs://'):
    LOGGING_CONFIG['handlers'].update(REMOTE_HANDLERS['gcs'])
elif REMOTE_LOGGING and REMOTE_BASE_LOG_FOLDER.startswith('wasb'):
    LOGGING_CONFIG['handlers'].update(REMOTE_HANDLERS['wasb'])
elif REMOTE_LOGGING and ELASTICSEARCH_HOST:
    LOGGING_CONFIG['handlers'].update(REMOTE_HANDLERS['elasticsearch'])

I've set the required environment variables and read them in the log_config.py


 1. AIRFLOW__CORE__REMOTE_LOGGING=True
 2. AIRFLOW__CORE__REMOTE_BASE_LOG_FOLDER=s3_bucket_name
 3. AIRFLOW__CORE__REMOTE_LOG_CONN_ID=s3://AWS_ACCESS_KEY_ID:AWS_SECRET_ACCESS_KEY@AIRFLOW_LOG_BUCKET_NAME

When I start the container, I can see the following log which shows that airflow is reading my custom log config

But, airflow is not writing any logs to the s3 bucket. I've looked at other similar questions, followed what they have said (as stated in the steps above). Any pointers on what I'm missing.

Following is a snapshot of the airflow web ui DAG run details:

The logs are created on the container

解决方案

I think the issue was with the format of AIRFLOW__CORE__REMOTE_BASE_LOG_FOLDER . It should be s3://your-bucket-name-here, where as I was just using bucket name and not pre-prending the s3

这篇关于Airflow 1.10.2未将日志写入S3的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆