Python-在多处理环境中使用Streamhandler [英] Python - Using Streamhandler in multiprocessing environment

查看:326
本文介绍了Python-在多处理环境中使用Streamhandler的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个CLI脚本,可将其所有进程记录到日志文件中. CLI的功能之一是通过以下方式上传大文件: 将其拆分并并行上传. 在linux中,整个过程就像一个符咒,但在 windows 中, 使用Streamhandler似乎无法子进程(_upload_for_multipart)的日志条目 从logging模块. logger.info语句在 _upload_for_multipart已正确登录到日志文件(my_logfile.txt) 但当verbose自变量出现在终端时,它们不会在终端上流式传输 选择了CLI.所有其他语句(在其他函数中)都进行流式传输和记录. 有什么帮助吗? 以下是我面临的问题的完整工作示例.您不需要任何其他库即可运行它.

I have a CLI script which logs all its processes into a log file. One of the functions of the CLI is to upload a large file by splitting it up into pieces and uploading them in parallel. In linux, the whole things works like a charm but in windows I cant seem to stream the log entries of the child process (_upload_for_multipart) using Streamhandler from the logging module. The logger.info statements in the _upload_for_multipart are correctly logged into the logfile ( my_logfile.txt) but they are not streamed on the terminal when the verbose argument of the CLI is chosen. All other statements( in other functions) are streamed as well as logged. Any help? Below is complete working example of the problem I face . You will not need any extra libraries to run it.

import argparse, glob, logging, math, os
from timeit import default_timer as timer
from filechunkio.filechunkio import FileChunkIO
from multiprocessing import cpu_count, Pool, freeze_support, current_process
from sys import exit, exc_info, argv, stdout, version_info, stdout, platform
from time import mktime, strptime

logger = None

def _upload_for_multipart(keyname, offset, multipart, part_num,bytes, parts):
    try:
        with FileChunkIO(keyname, 'r', offset=offset, bytes=bytes) as fp:
                try:
                    start = timer()
                    logger.info( 'Uploading part {0}/{1}'.format ( part_num, parts ) )
                    logger.info('Uploading im MP')
                    end = timer()
                except Exception as e:
                    logger.error('Some error occured')
                    exit()
        logger.info( 'UPLOADED part {0}/{1} time = {2:0.1f}s Size: {3}'.format (part_num, parts, (end - start), bytes ) )
    except Exception as e:
        logger.error( 'FAILED uploading {0}.{1}'.format(keyname), e )
        exit(1)

def _upload_part(argFile, argBucket, **core_chunk):
    file_path = argFile
    bucket_name = argBucket
    file_name = os.path.basename( file_path )
    source_size = os.stat( file_path ).st_size

    chunk_size = max(int(math.sqrt(5242880) * math.sqrt(source_size)),
                         5242880)

    chunk_amount = int(math.ceil(source_size / float(chunk_size)))
    #mp = s3_bucket.initiate_multipart_upload( file_name )
    mp = ''
    logger.info('Initiate multipart upload')
    logger.info( 'File size of {0} is {1}. Parallel uploads will be used to speed up the process'\
            .format( file_name, source_size  ) )

    start_time = timer()
    pool = Pool(processes=1, initializer = init_log, initargs = ( logFile, ) )
    for i in range( chunk_amount ):
        offset = i * chunk_size
        remaining_bytes = source_size - offset
        bytes = min( [chunk_size, remaining_bytes] )
        part_num = i + 1
        start = timer()
        pool.apply_async( _upload_for_multipart, [file_name, offset, mp, part_num, bytes, chunk_amount] )
    pool.close()
    pool.join()
    end = timer()
    logger.info('Process complete')

def _get_logger( pdir, ldir, lname, level, fmt ):
    try:
        logs_dir = os.path.join( pdir, ldir )
        if not os.path.exists( logs_dir ):
            os.makedirs( logs_dir )
    except Exception as e:
        print ('{}'.format(e))
        exit(1)

    logging.basicConfig(
            filename=os.path.join(logs_dir, lname),
            level=level,
            format=fmt
    )
    return logging.getLogger( lname )

def init_log(logFile):
    global logger
    exec_file = os.path.abspath( argv[0] )
    exec_dir = os.path.dirname( exec_file )
    default_logger = dict( pdir=exec_dir, ldir='logs', lname='error.log', level='ERROR',
                           fmt='%(asctime)s %(levelname)s: %(message)s' )

    log_filename = logFile
    level = 'INFO'
    format = '%(asctime)s %(levelname)s: %(message)s'

    default_logger.update( fmt=format, level=level, lname = log_filename )
    if os.path.isabs( log_filename ):
        bdir, log_filename = os.path.split( log_filename )
        default_logger.update(pdir='', ldir = bdir, lname = log_filename )
    logger = _get_logger( **default_logger )

if __name__ == "__main__":

    freeze_support()

    parser = argparse.ArgumentParser( description="CLI." )
    group = parser.add_mutually_exclusive_group()
    group.add_argument( "-v", "--verbose", action='store_const', const=True, help="Output process messages to stdout \
                        channel")
    args = parser.parse_args()

    logFile = 'mylogfile.txt'
    init_log(logFile)

    bucket_name = 'some-bucket'

    if args.verbose:
        try:
            print_handler = logging.StreamHandler( stdout )
            print_handler.setLevel( logging.DEBUG )
            formatter = logging.Formatter( '%(asctime)s %(levelname)s: %(message)s' )
            print_handler.setFormatter( formatter )
            logger.addHandler( print_handler )
        except (NoOptionError, NoSectionError) as e:
            logger.exception( e )

    logger.info('Establishing Connection')
    _upload_part('large_testfile.log', bucket_name)

推荐答案

StreamHandler在子级中不起作用,因为您仅在父级进程中进行设置.您需要在init_log内完成 all 的日志设置,以使其在子级中生效:

The StreamHandler isn't working in the children because you're only setting it up in the parent process. You need to do all your logging setup inside of init_log for it to take effect in the children:

# ... This stuff is the same...    

def _upload_part(argFile, argBucket, verbose, **core_chunk):  # Add verbose argument
    #... Same until you declare the Pool
    pool = Pool(processes=1, initializer=init_log, initargs=(logFile, verbose))  # Add verbose to initargs
   # All the same ...

def init_log(logFile, verbose):
    global logger
    exec_file = os.path.abspath( argv[0] )
    exec_dir = os.path.dirname( exec_file )
    default_logger = dict( pdir=exec_dir, ldir='logs', lname='error.log', level='ERROR',
                           fmt='%(asctime)s %(levelname)s: %(message)s' )

    log_filename = logFile
    level = 'INFO'
    format = '%(asctime)s %(levelname)s: %(message)s'

    default_logger.update( fmt=format, level=level, lname = log_filename )
    if os.path.isabs( log_filename ):
        bdir, log_filename = os.path.split( log_filename )
        default_logger.update(pdir='', ldir = bdir, lname = log_filename )
    logger = _get_logger( **default_logger )

    if verbose:  # Set up StreamHandler here
        try:
            print_handler = logging.StreamHandler(stdout)
            print_handler.setLevel(logging.DEBUG)
            formatter = logging.Formatter('%(asctime)s %(levelname)s: %(message)s')
            print_handler.setFormatter(formatter)
            logger.addHandler(print_handler)
        except (NoOptionError, NoSectionError) as e:
            logger.exception(e)

if __name__ == "__main__":

    freeze_support()

    parser = argparse.ArgumentParser( description="CLI." )
    group = parser.add_mutually_exclusive_group()
    group.add_argument( "-v", "--verbose", action='store_const', const=True, help="Output process messages to stdout \
                        channel")
    args = parser.parse_args()

    logFile = 'mylogfile.txt'
    init_log(logFile)

    bucket_name = 'some-bucket'

    logger.info('Establishing Connection')
    _upload_part('large_testfile.log', bucket_name, args.verbose)  # Pass args.verbose

这篇关于Python-在多处理环境中使用Streamhandler的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆