Airflow 1.10.2未将日志写入S3 [英] Airflow 1.10.2 not writing logs to S3
问题描述
我正在尝试在docker容器中运行气流并将日志发送到s3。我具有以下环境
Airflow版本: 1.10.2
还更新了airflow.cfg
logging_config_class = log_config.LOGGING_CONFIG
其中 LOGGING_CONFIG
在类 config / log_config.py
中定义。
我创建了以下文件:
-
config / __ init __。py
-
config / log_config.py
我已经通过以下方式设置了 log_config.py
:
#-*-编码:utf-8-*-
#
#根据一项
#或更多贡献者许可协议授权给Apache软件基金会(ASF)。有关版权拥有权的其他信息,请参见随该工作一起分发的NOTICE文件
#。 ASF根据Apache许可证版本2.0(
#许可证)将此文件
#授予您许可;您不得使用该文件,除非遵守许可证中的
#。您可以通过
#
#http://www.apache.org/licenses/LICENSE-2.0
#
#获得许可的副本,除非适用法律要求书面同意,根据本许可分发的
#软件将按
#按现状的基础分发,不附带任何明示或暗示的
#保证或条件。有关许可下
#特定语言的权限和限制
#的信息,请参见许可。
import os
通过输入import Dict,任何
从airflow导入配置为conf
从airflow.utils.file import mkdirs
#待办事项:应在此文件中配置日志记录格式和级别
#,而不是从airflow.cfg中配置。当前
#
#settings.py和cli.py中还有其他日志格式和级别配置。请参阅AIRFLOW-1455。
LOG_LEVEL = conf.get('core','LOGGING_LEVEL')。upper()
#Flask appbuilder的信息级别日志非常冗长,
#因此默认情况下设置为 WARN。
FAB_LOG_LEVEL = conf.get('core','FAB_LOGGING_LEVEL')。upper()
LOG_FORMAT = conf.get('core','LOG_FORMAT')
BASE_LOG_FOLDER = conf.get('core','BASE_LOG_FOLDER')
PROCESSOR_LOG_FOLDER = conf.get('scheduler','CHILD_PROCESS_LOG_DIRECTORY')
DAG_PROCESSOR_MANAGER_LOG
conf.get('core','DAG_PROCESSOR_MANAGER_LOG_LOCATION')
FILENAME_TEMPLATE = conf.get('core','LOG_FILENAME_TEMPLATE')
PROCESSOR_FILENAME_TEMPLATE = conf。 get('core','LOG_PROCESSOR_FILENAME_TEMPLATE')
#远程日志记录的存储桶url
#s3存储桶应以 s3://开头
#gcs存储桶应开始使用 gs://
#wasb桶应该以 wasb
#开头,只是为了帮助Airflow选择正确的处理程序
REMOTE_BASE_LOG_FOLDER = os.environ ['AIRFLOW__CORE__REMOTE_BASE_LOG_FOLDER']
ELASTICSEARCH_HOST = conf.get('elasticsearch','ELASTICSEARCH_HOST')
LOG_ID_TEMPLATE = conf.get('elastics earch','ELASTICSEARCH_LOG_ID_TEMPLATE')
END_OF_LOG_MARK = conf.get('elasticsearch','ELASTICSEARCH_END_OF_LOG_MARK')
LOGGING_CONFIG = {
'version':1,
'disable_existing_loggers':False,
'formatters':{
'airflow':{
'format':LOG_FORMAT,
},
},
'handlers':{
'console':{
'class':'airflow.utils.log.logging_mixin.RedirectStdHandler',
'formatter':'airflow',
'stream':'sys.stdout'
},
'task':{
'class':'airflow.utils.log.file_task_handler.FileTaskHandler',
'formatter':'airflow',
'base_log_folder':os.path.expanduser(BASE_LOG_FOLDER),
'filename_template':FILENAME_TEMPLATE,
},
'processor' :{{
'class':'airflow.utils.log.file_processor_handler.FileProcessorHandler',
'fo rmatter':'气流',
'base_log_folder':os.path.expanduser(PROCESSOR_LOG_FOLDER),
'filename_template':PROCESSOR_FILENAME_TEMPLATE,
}
},
' loggers :: {
'airflow.processor':{
'handlers':['processor'],
'level':LOG_LEVEL,
'propagate':False,
},
'airflow.task':{
'handlers':['task'],
'level':LOG_LEVEL,
'propagate':False,
},
'flask_appbuilder':{
'handler':['console'],
'level':FAB_LOG_LEVEL,
'propagate':是,
}
},
'root':{
'handlers':['console'],
'level':LOG_LEVEL,
}
}#类型:Dict [str,Any]
DEFAULT_DAG_PARSING_LOGGING_CONFIG = {
'handlers':{
'processor_manager':{
'class':'测井.handlers.RotatingFileHandler',
'formatter':'airflow',
'filename':DAG_PROCESSOR_MANAGER_LOG_LOCATION,
'mode':'a',
'maxBytes':104857600, #100MB
'backupCount':5
}
},
'loggers':{
'airflow.processor_manager':{
'handlers': ['processor_manager'],
'level':LOG_LEVEL,
'propagate':False,
}
}
}
REMOTE_HANDLERS = {
's3':{
'task':{
'class':'airflow.utils.log.s3_task_handler.S3TaskHandler',
'formatter':'airflow ',
'base_log_folder':os.path.expanduser(BASE_LOG_FOLDER),
's3_log_folder':REMOTE_BASE_LOG_FOLDER,
'filename_template':FILENAME_TEMPLATE,
},
'处理器'':{
'class':'airflow.utils.log.s3_task_handler.S3TaskHandler',
'formatter':'airflow',
'base_log_folder':os.path.expanduser(PROCESSOR_LOG_FOLDER),
's3_log_folder':REMOTE_BASE_LOG_FOLDER,
'filename_template':PROCESSOR_FILENAME_TEMPLATE, b $ b},
},
'gcs':{
'task':{
'class':'airflow.utils.log.gcs_task_handler.GCSTaskHandler',
'formatter':'airflow',
'base_log_folder':os.path.expanduser(BASE_LOG_FOLDER),
'gcs_log_folder':REMOTE_BASE_LOG_FOLDER,
'filename_template':FILENAME_TEMPLATE,
},
'处理器':{
'class':'airflow.utils.log.gcs_task_handler.GCSTaskHandler',
'formatter':'airflow',
' base_log_folder':os.path.expanduser(PROCESSOR_LOG_FOLDER),
'gcs_log_folder':REMOTE_BASE_LOG_FOLDER,
'filename_template':PROCESSOR_FILENAME_TEMPLATE,
},
},
'wasb':{
'task':{
'class':'airflow.utils.log.wasb_task_handler.WasbTaskHandler',
'formatter':'airflow',
'base_log_folder':os.path.expanduser(BASE_LOG_FOLDER),
'wasb_log_folder':REMOTE_BASE_LOG_FOLDER,
'wasb_container':'airlogs',
'filename_template':FILENAME_TEMPLATE,
'delete_local_copy':False,
},
'processor':{
'class':'airflow.utils.log.wasb_task_handler .WasbTaskHandler',
'formatter':'airflow',
'base_log_folder':os.path.expanduser(PROCESSOR_LOG_FOLDER),
'wasb_log_folder':REMOTE_BASE_LOG_FOLDER,
'wasb_container' :'airflow-logs',
'filename_template':PROCESSOR_FILENAME_TEMPLATE,
'delete_local_copy':False,
},
},
'elasticsearch':{
'任务':{
'class':'airflow.utils.log.es_task_handler.ElasticsearchTaskHandler',
'formatter':'airflow',
'base_log_folder':os.path.expanduser (BASE_LOG_FOLDER),
'log_id_template':LOG_ID_TEMPLATE,
'filename_template':FILENAME_TEMPLATE,
'end_of_log_mark':END_OF_LOG_MARK,
'host':ELASTICSEARCH_HOST,
},
},
}
REMOTE_LOGGING = os.environ ['AIRFLOW__CORE__REMOTE_LOGGING']
#仅在设置CONFIG_PROCESSOR_MANAGER_LOGGER时更新处理程序和记录器。
#这是为了避免在多个进程中多次初始化RotatingFileHandler
#时出现异常。
如果os.environ.get('CONFIG_PROCESSOR_MANAGER_LOGGER')=='真':
LOGGING_CONFIG ['handlers'] \
.update(DEFAULT_DAG_PARSING_LOGGING_CONFIG ['handlers]])
b LOGGING_CONFIG ['loggers'] \
.update(DEFAULT_DAG_PARSING_LOGGING_CONFIG ['loggers'])
#手动为Processor_manager处理程序创建日志目录,因为RotatingFileHandler
#仅创建文件而不是目录。
processor_manager_handler_config = DEFAULT_DAG_PARSING_LOGGING_CONFIG ['handlers'] [
'processor_manager']
目录= os.path.dirname(processor_manager_handler_config ['filename'])
mkdirs(目录,0o755)
如果REMOTE_LOGGING和REMOTE_BASE_LOG_FOLDER.startswith('s3://'):
LOGGING_CONFIG ['handlers']。update(REMOTE_HANDLERS ['s3'])
elif REMOTE_LOGGING和REMOTE_BASE_LOG_FOLDER.startswith('gs://'):
LOGGING_CONFIG ['handlers']。update(REMOTE_HANDLERS ['gcs'])
elif REMOTE_LOGGING和REMOTE_BASE_LOG_FOLDER.startswith('wasb'):
LOGGING_CONFIG ['handlers'.. update(REMOTE_HANDLERS ['wasb'])
elif REMOTE_LOGGING和ELASTICSEARCH_HOST:
LOGGING_CONFIG ['handlers']。update(REMOTE_HANDLERS ['elasticsearch'])
我已经设置了必需的环境变量,并在 log_config.py中读取了它们
1. AIRFLOW__CORE__REMOTE_LOGGING =真
2. A IRFLOW__CORE__REMOTE_BASE_LOG_FOLDER = s3_bucket_name
3. AIRFLOW__CORE__REMOTE_LOG_CONN_ID = s3:// AWS_ACCESS_KEY_ID:AWS_SECRET_ACCESS_KEY @ AIRFLOW_LOG_BUCKET_NAME
$$$ c>我启动容器,可以看到以下日志,该日志显示气流正在读取我的自定义日志配置
但是,气流未将任何日志写入s3存储桶。我查看了其他类似的问题,然后按照他们的意见进行了操作(如上述步骤所述)。关于我所缺少的任何指示。
以下是气流Web ui DAG运行详细信息的快照:
日志是在容器上创建的
解决方案我认为问题出在AIRFLOW__CORE__REMOTE_BASE_LOG_FOLDER的格式上。应该是s3:// your-bucket-name-here,因为我只是在使用存储桶名称,而不是在s3之前添加
I'm trying to run airflow in a docker container and send the logs to s3. I've the following environment
Airflow Version: 1.10.2
Also updated the following in the airflow.cfg
logging_config_class = log_config.LOGGING_CONFIG
where LOGGING_CONFIG
is defined in the class config/log_config.py
.
I've created following files:
config/__init__.py
config/log_config.py
I've set up the log_config.py
in the following way:
# -*- coding: utf-8 -*-
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import os
from typing import Dict, Any
from airflow import configuration as conf
from airflow.utils.file import mkdirs
# TODO: Logging format and level should be configured
# in this file instead of from airflow.cfg. Currently
# there are other log format and level configurations in
# settings.py and cli.py. Please see AIRFLOW-1455.
LOG_LEVEL = conf.get('core', 'LOGGING_LEVEL').upper()
# Flask appbuilder's info level log is very verbose,
# so it's set to 'WARN' by default.
FAB_LOG_LEVEL = conf.get('core', 'FAB_LOGGING_LEVEL').upper()
LOG_FORMAT = conf.get('core', 'LOG_FORMAT')
BASE_LOG_FOLDER = conf.get('core', 'BASE_LOG_FOLDER')
PROCESSOR_LOG_FOLDER = conf.get('scheduler', 'CHILD_PROCESS_LOG_DIRECTORY')
DAG_PROCESSOR_MANAGER_LOG_LOCATION = \
conf.get('core', 'DAG_PROCESSOR_MANAGER_LOG_LOCATION')
FILENAME_TEMPLATE = conf.get('core', 'LOG_FILENAME_TEMPLATE')
PROCESSOR_FILENAME_TEMPLATE = conf.get('core', 'LOG_PROCESSOR_FILENAME_TEMPLATE')
# Storage bucket url for remote logging
# s3 buckets should start with "s3://"
# gcs buckets should start with "gs://"
# wasb buckets should start with "wasb"
# just to help Airflow select correct handler
REMOTE_BASE_LOG_FOLDER = os.environ['AIRFLOW__CORE__REMOTE_BASE_LOG_FOLDER']
ELASTICSEARCH_HOST = conf.get('elasticsearch', 'ELASTICSEARCH_HOST')
LOG_ID_TEMPLATE = conf.get('elasticsearch', 'ELASTICSEARCH_LOG_ID_TEMPLATE')
END_OF_LOG_MARK = conf.get('elasticsearch', 'ELASTICSEARCH_END_OF_LOG_MARK')
LOGGING_CONFIG = {
'version': 1,
'disable_existing_loggers': False,
'formatters': {
'airflow': {
'format': LOG_FORMAT,
},
},
'handlers': {
'console': {
'class': 'airflow.utils.log.logging_mixin.RedirectStdHandler',
'formatter': 'airflow',
'stream': 'sys.stdout'
},
'task': {
'class': 'airflow.utils.log.file_task_handler.FileTaskHandler',
'formatter': 'airflow',
'base_log_folder': os.path.expanduser(BASE_LOG_FOLDER),
'filename_template': FILENAME_TEMPLATE,
},
'processor': {
'class': 'airflow.utils.log.file_processor_handler.FileProcessorHandler',
'formatter': 'airflow',
'base_log_folder': os.path.expanduser(PROCESSOR_LOG_FOLDER),
'filename_template': PROCESSOR_FILENAME_TEMPLATE,
}
},
'loggers': {
'airflow.processor': {
'handlers': ['processor'],
'level': LOG_LEVEL,
'propagate': False,
},
'airflow.task': {
'handlers': ['task'],
'level': LOG_LEVEL,
'propagate': False,
},
'flask_appbuilder': {
'handler': ['console'],
'level': FAB_LOG_LEVEL,
'propagate': True,
}
},
'root': {
'handlers': ['console'],
'level': LOG_LEVEL,
}
} # type: Dict[str, Any]
DEFAULT_DAG_PARSING_LOGGING_CONFIG = {
'handlers': {
'processor_manager': {
'class': 'logging.handlers.RotatingFileHandler',
'formatter': 'airflow',
'filename': DAG_PROCESSOR_MANAGER_LOG_LOCATION,
'mode': 'a',
'maxBytes': 104857600, # 100MB
'backupCount': 5
}
},
'loggers': {
'airflow.processor_manager': {
'handlers': ['processor_manager'],
'level': LOG_LEVEL,
'propagate': False,
}
}
}
REMOTE_HANDLERS = {
's3': {
'task': {
'class': 'airflow.utils.log.s3_task_handler.S3TaskHandler',
'formatter': 'airflow',
'base_log_folder': os.path.expanduser(BASE_LOG_FOLDER),
's3_log_folder': REMOTE_BASE_LOG_FOLDER,
'filename_template': FILENAME_TEMPLATE,
},
'processor': {
'class': 'airflow.utils.log.s3_task_handler.S3TaskHandler',
'formatter': 'airflow',
'base_log_folder': os.path.expanduser(PROCESSOR_LOG_FOLDER),
's3_log_folder': REMOTE_BASE_LOG_FOLDER,
'filename_template': PROCESSOR_FILENAME_TEMPLATE,
},
},
'gcs': {
'task': {
'class': 'airflow.utils.log.gcs_task_handler.GCSTaskHandler',
'formatter': 'airflow',
'base_log_folder': os.path.expanduser(BASE_LOG_FOLDER),
'gcs_log_folder': REMOTE_BASE_LOG_FOLDER,
'filename_template': FILENAME_TEMPLATE,
},
'processor': {
'class': 'airflow.utils.log.gcs_task_handler.GCSTaskHandler',
'formatter': 'airflow',
'base_log_folder': os.path.expanduser(PROCESSOR_LOG_FOLDER),
'gcs_log_folder': REMOTE_BASE_LOG_FOLDER,
'filename_template': PROCESSOR_FILENAME_TEMPLATE,
},
},
'wasb': {
'task': {
'class': 'airflow.utils.log.wasb_task_handler.WasbTaskHandler',
'formatter': 'airflow',
'base_log_folder': os.path.expanduser(BASE_LOG_FOLDER),
'wasb_log_folder': REMOTE_BASE_LOG_FOLDER,
'wasb_container': 'airflow-logs',
'filename_template': FILENAME_TEMPLATE,
'delete_local_copy': False,
},
'processor': {
'class': 'airflow.utils.log.wasb_task_handler.WasbTaskHandler',
'formatter': 'airflow',
'base_log_folder': os.path.expanduser(PROCESSOR_LOG_FOLDER),
'wasb_log_folder': REMOTE_BASE_LOG_FOLDER,
'wasb_container': 'airflow-logs',
'filename_template': PROCESSOR_FILENAME_TEMPLATE,
'delete_local_copy': False,
},
},
'elasticsearch': {
'task': {
'class': 'airflow.utils.log.es_task_handler.ElasticsearchTaskHandler',
'formatter': 'airflow',
'base_log_folder': os.path.expanduser(BASE_LOG_FOLDER),
'log_id_template': LOG_ID_TEMPLATE,
'filename_template': FILENAME_TEMPLATE,
'end_of_log_mark': END_OF_LOG_MARK,
'host': ELASTICSEARCH_HOST,
},
},
}
REMOTE_LOGGING = os.environ['AIRFLOW__CORE__REMOTE_LOGGING']
# Only update the handlers and loggers when CONFIG_PROCESSOR_MANAGER_LOGGER is set.
# This is to avoid exceptions when initializing RotatingFileHandler multiple times
# in multiple processes.
if os.environ.get('CONFIG_PROCESSOR_MANAGER_LOGGER') == 'True':
LOGGING_CONFIG['handlers'] \
.update(DEFAULT_DAG_PARSING_LOGGING_CONFIG['handlers'])
LOGGING_CONFIG['loggers'] \
.update(DEFAULT_DAG_PARSING_LOGGING_CONFIG['loggers'])
# Manually create log directory for processor_manager handler as RotatingFileHandler
# will only create file but not the directory.
processor_manager_handler_config = DEFAULT_DAG_PARSING_LOGGING_CONFIG['handlers'][
'processor_manager']
directory = os.path.dirname(processor_manager_handler_config['filename'])
mkdirs(directory, 0o755)
if REMOTE_LOGGING and REMOTE_BASE_LOG_FOLDER.startswith('s3://'):
LOGGING_CONFIG['handlers'].update(REMOTE_HANDLERS['s3'])
elif REMOTE_LOGGING and REMOTE_BASE_LOG_FOLDER.startswith('gs://'):
LOGGING_CONFIG['handlers'].update(REMOTE_HANDLERS['gcs'])
elif REMOTE_LOGGING and REMOTE_BASE_LOG_FOLDER.startswith('wasb'):
LOGGING_CONFIG['handlers'].update(REMOTE_HANDLERS['wasb'])
elif REMOTE_LOGGING and ELASTICSEARCH_HOST:
LOGGING_CONFIG['handlers'].update(REMOTE_HANDLERS['elasticsearch'])
I've set the required environment variables and read them in the log_config.py
1. AIRFLOW__CORE__REMOTE_LOGGING=True
2. AIRFLOW__CORE__REMOTE_BASE_LOG_FOLDER=s3_bucket_name
3. AIRFLOW__CORE__REMOTE_LOG_CONN_ID=s3://AWS_ACCESS_KEY_ID:AWS_SECRET_ACCESS_KEY@AIRFLOW_LOG_BUCKET_NAME
When I start the container, I can see the following log which shows that airflow is reading my custom log config
But, airflow is not writing any logs to the s3 bucket. I've looked at other similar questions, followed what they have said (as stated in the steps above). Any pointers on what I'm missing.
Following is a snapshot of the airflow web ui DAG run details:
The logs are created on the container
解决方案 I think the issue was with the format of AIRFLOW__CORE__REMOTE_BASE_LOG_FOLDER . It should be s3://your-bucket-name-here, where as I was just using bucket name and not pre-prending the s3
这篇关于Airflow 1.10.2未将日志写入S3的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!