格式化JSON中的气流日志 [英] Format Airflow Logs in JSON
问题描述
我需要将 Apache Airflow 日志以JSON格式记录到stdout.气流似乎并没有开箱即用地投射出这种功能.我发现了几个可以执行此任务的python模块,但是我无法使实现正常工作.
I have a requirement to log the Apache Airflow logs to stdout in JSON format. Airflow does not seem to project this capability out of the box. I have found a couple python modules that are capable of this task, but I cannot get the implementation to work.
当前,我在 airflow/utils/logging.py
中应用一个类来修改记录器,如下所示:
Currently, I am applying a class in airflow/utils/logging.py
to modify the logger, shown below:
from pythonjsonlogger import jsonlogger
class StackdriverJsonFormatter(jsonlogger.JsonFormatter, object):
def __init__(self, fmt="%(levelname) %(asctime) %(nanotime) %(severity) %(message)", style='%', *args, **kwargs):
jsonlogger.JsonFormatter.__init__(self, fmt=fmt, *args, **kwargs)
def process_log_record(self, log_record):
if log_record.get('level'):
log_record['severity'] = log_record['level']
del log_record['level']
else:
log_record['severity'] = log_record['levelname']
del log_record['levelname']
if log_record.get('asctime'):
log_record['timestamp'] = log_record['asctime']
del log_record['asctime']
now = datetime.datetime.now().strftime('%Y-%m-%dT%H:%M:%S.%fZ')
log_record['nanotime'] = now
return super(StackdriverJsonFormatter, self).process_log_record(log_record)
我在 /airflow/settings.py
中实现此代码,如下所示:
I am implementing this code in /airflow/settings.py
as shown below:
from airflow.utils import logging as logconf
def configure_logging(log_format=LOG_FORMAT):
handler = logconf.logging.StreamHandler(sys.stdout)
formatter = logconf.StackdriverJsonFormatter()
handler.setFormatter(formatter)
logging = logconf.logging.getLogger()
logging.addHandler(handler)
''' code below was original airflow source code
logging.root.handlers = []
logging.basicConfig(
format=log_format, stream=sys.stdout, level=LOGGING_LEVEL)
'''
我尝试了几种不同的变体,但是无法让python-json-logger将日志转换为JSON.也许我没有进入根记录器?我考虑过的另一个选项是手动将日志格式化为JSON字符串.也没有运气.任何其他想法,技巧或支持都将受到赞赏.
I have tried a couple different variations of this and can't get the python-json-logger to transform the logs to JSON. Perhaps I'm not getting to the root logger? Another option I have considered is manually formatting the logs to a JSON string. No luck with that yet either. Any alternative ideas, tips, or support are appreciated.
干杯!
推荐答案
我不知道您是否曾经解决过此问题,但是经过一番令人沮丧的修补后,我最终使它与气流保持了良好的配合.作为参考,我遵循了本文的大部分内容,以使其正常工作:io/guides/logging/.主要问题是气流日志记录只接受日志格式的字符串模板,json-logging 无法插入.因此,您必须创建自己的日志记录类并将其连接到自定义的日志记录配置类.
I don't know if you ever solved this problem, but after some frustrating tinkering, I ended up getting this to play nice with airflow. For reference, I followed a lot of this article to get it working: https://www.astronomer.io/guides/logging/. The main issue was that the airflow logging only accepts a string template for the logging format, which json-logging can't plug into. So you have to create your own logging classes and connect it to a custom logging config class.
-
在此处进入您的
$ AIRFLOW_HOME/config
文件夹,然后将DEFAULT_CONFIG_LOGGING
更改为CONFIG_LOGGING
.成功后,请启动气流,并在气流启动时收到一条日志消息,内容为从logging_config.LOGGING_CONFIG
成功导入了用户定义的日志记录配置.如果这是config文件夹中的第一个.py文件,请不要忘记添加一个空白的__ init __.py
文件来让python将其拾取
Copy the log template here into your
$AIRFLOW_HOME/config
folder, and changeDEFAULT_CONFIG_LOGGING
toCONFIG_LOGGING
. When you're successful, bring up airflow and you'll get a log message on airflow startup that saysSuccessfully imported user-defined logging config from logging_config.LOGGING_CONFIG
. If this is the first .py file in the config folder don't forget to add a blank__init__.py
file to get python to pick it up
编写您的自定义 JsonFormatter 以注入您的处理程序.我确实是从这个一个中挖出的.
Write your custom JsonFormatter to inject into your handler. I did mine off of this one.
编写自定义日志处理程序类.由于我一直在寻找JSON日志记录,因此我的样子是这样的:
Write the custom log handler classes. Since I was looking for JSON logging, mine look like this:
从airflow.utils.log.file_processor_handler中的
from airflow.utils.log.file_processor_handler import FileProcessorHandler
from airflow.utils.log.file_task_handler import FileTaskHandler
from airflow.utils.log.logging_mixin import RedirectStdHandler
from pythonjsonlogger import jsonlogger
class JsonStreamHandler(RedirectStdHandler):
def __init__(self, stream):
super(JsonStreamHandler, self).__init__(stream)
json_formatter = CustomJsonFormatter('(timestamp) (level) (name) (message)')
self.setFormatter(json_formatter)
class JsonFileTaskHandler(FileTaskHandler):
def __init__(self, base_log_folder, filename_template):
super(JsonFileTaskHandler, self).__init__(base_log_folder, filename_template)
json_formatter = CustomJsonFormatter('(timestamp) (level) (name) (message)')
self.setFormatter(json_formatter)
class JsonFileProcessorHandler(FileProcessorHandler):
def __init__(self, base_log_folder, filename_template):
super(JsonFileProcessorHandler, self).__init__(base_log_folder, filename_template)
json_formatter = CustomJsonFormatter('(timestamp) (level) (name) (message)')
self.setFormatter(json_formatter)
class JsonRotatingFileHandler(RotatingFileHandler):
def __init__(self, filename, mode, maxBytes, backupCount):
super(JsonRotatingFileHandler, self).__init__(filename, mode, maxBytes, backupCount)
json_formatter = CustomJsonFormatter('(timestamp) (level) (name) (message)')
self.setFormatter(json_formatter)
- 将其连接到自定义logging_config.py文件中的日志记录配置.
'handlers': {
'console': {
'class': 'logging_handler.JsonStreamHandler',
'stream': 'sys.stdout'
},
'task': {
'class': 'logging_handler.JsonFileTaskHandler',
'base_log_folder': os.path.expanduser(BASE_LOG_FOLDER),
'filename_template': FILENAME_TEMPLATE,
},
'processor': {
'class': 'logging_handler.JsonFileProcessorHandler',
'base_log_folder': os.path.expanduser(PROCESSOR_LOG_FOLDER),
'filename_template': PROCESSOR_FILENAME_TEMPLATE,
}
}
...
和
DEFAULT_DAG_PARSING_LOGGING_CONFIG = {
'handlers': {
'processor_manager': {
'class': 'logging_handler.JsonRotatingFileHandler',
'formatter': 'airflow',
'filename': DAG_PROCESSOR_MANAGER_LOG_LOCATION,
'mode': 'a',
'maxBytes': 104857600, # 100MB
'backupCount': 5
}
}
...
并且应该在DAG日志和输出中输出json日志.
And json logs should be output, both in the DAG logs and the output as well.
希望这会有所帮助!
这篇关于格式化JSON中的气流日志的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!