多处理日志记录-如何将Loguru与Joblib并行使用 [英] Multiprocessing Logging - How to use loguru with joblib Parallel

查看:115
本文介绍了多处理日志记录-如何将Loguru与Joblib并行使用的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一堆Python脚本来运行一些数据科学模型.这需要花费相当长的时间,并且加快速度的唯一方法是使用多处理.为此,我使用了 joblib 库,它确实运行良好.但是,不幸的是,这使日志混乱,并且控制台输出也出现了乱码(但是,意料之中的是),因为所有进程同时转储其各自的输出.

I have a bunch of Python scripts to run some data science models. It takes quite a while and the only way to speed it up is to use multiprocessing. To achieve this, I used the joblib library and it works really well. Unfortunately, however, this messes up logging, and the console output is garbled (expectedly so, however) too, as all processes dump their respective outputs simultaneously.

我对使用 logging 库是陌生的,并遵循了一些其他的答案以尝试使其正常工作.我正在使用8个核心进行处理.使用SO上的答案,我写出了日志文件,并希望每次迭代有8个新文件.但是,它在第​​一次迭代时创建了8个文件,并且在每个循环中仅写入/追加了这8个文件.这有点不方便,因此我进行了更多探索,发现了 loguru logzero .虽然它们都涵盖了使用 multiprocessing 的示例,但是它们都没有显示如何与 joblib 一起使用.这是我目前所拥有的:

I am new to using the logging library and followed some other SO answers to try and get it to work. I am using 8 cores for processing. Using the answers on SO, I wrote out to log files, and expected 8 new files every iteration. However, it created 8 files the first iteration, and wrote/appended only to those 8 files every loop. This was a little inconvenient and so I explored a little more and found loguru and logzero. While they both cover examples using multiprocessing, neither of them show how to use it with joblib. Here is what I have so far:

run_models.py

import math
import multiprocessing
import time
from datetime import datetime
from loguru import logger

import pandas as pd
import psutil
from joblib import Parallel, delayed

import helper
import log
import prep_data
import stock_subscriber_data
import train_model


def get_pred(cust_df, stock_id, logger):

    logger.info('--------------------------------Stock loop {}-------------------------------'.format(stock_id))

    cust_stockid_df = stock_subscriber_data.get_stockid_data(cust_df, stock_id)
    weekly_timeseries, last_date, abn_df = prep_data.prep(cust_stockid_df, logger)  
    single_row_df = stock_subscriber_data.get_single_row(cust_df, stock_id)

    stock_subscriber_data.write_data(abn_df, 't1')
    test_y, prd = train_model.read_train_write(cust_df, stock_id, weekly_timeseries, last_date, logger)

    return True


def main():

    cust_df = stock_subscriber_data.get_data()
    cust_df = helper.clean_data(cust_df)
    stock_list = cust_df['intStockID'].unique()

    max_proc = max(math.ceil(((psutil.virtual_memory().total >> 30) - 100) / 50), 1)
    num_cores = min(multiprocessing.cpu_count(), max_proc)

    logger.add("test_loguru.log", format="{time} {level}: ({file}:{module} - {line}) >> {message}", level="INFO", enqueue=True)

    Parallel(n_jobs=num_cores)(delayed(get_pred)(cust_df, s, logger) for s in stock_list)


if __name__ == "__main__":
    main()

train_model.py

import math
from datetime import datetime
from itertools import product
from math import sqrt

import pandas as pd
from keras import backend
from keras.layers import Dense
from keras.layers import LSTM
from keras.models import Sequential
from numpy import array
from numpy import mean
from pandas import DataFrame
from pandas import concat
from sklearn.metrics import mean_squared_error

import helper
import stock_subscriber_data

# bunch of functions here that don't need logging...

# walk-forward validation for univariate data
def walk_forward_validation(logger, data, n_test, cfg):
    #... do stuff here ...
    #... and here ...
    logger.info('{0:.3f}'.format(error))
    return error, model


# score a model, return None on failure
def repeat_evaluate(logger, data, config, n_test, n_repeats=10):
    #... do stuff here ...
    #... and here ...
    logger.info('> Model{0} {1:.3f}'.format(key, result))
    return key, result, best_model



def read_train_write(data_df, stock_id, series, last_date, logger):
    #... do stuff here ...
    #... and here ...
    logger.info('done')

    #... do stuff here ...
    #... and here ...

    # bunch of logger.info() statements here... 
    #
    #
    #
    #

    #... do stuff here ...
    #... and here ...

    return test_y, prd

当一次只有一个进程时,此方法效果很好.但是,我得到一个 _pickle.PicklingError:无法腌制该任务以将其发送给工作人员.错误在多进程模式下运行时.我究竟做错了什么?我该如何补救?我不介意切换到 loguru logzero 之外的其他任何东西,只要我可以创建具有连贯日志甚至是 n 的文件即可.文件,每个文件包含 joblib 每次迭代的日志.

This works well when there is only one process at a time. However, I get an _pickle.PicklingError: Could not pickle the task to send it to the workers. error when running in multiprocess mode. What am I doing wrong? How can I remediate this? I don't mind switching to something other than loguru or logzero, as long as I can create one file with coherent logs, or even n files, each of which contain the logs of every iteration of joblib.

推荐答案

我通过修改 run_models.py 使它起作用.现在,每个循环有一个日志文件.这会创建很多日志文件,但是它们都与每个循环相关,而不是混乱或其他任何内容.我想一次只一步.这是我所做的:

I got it to work by modifying my run_models.py. Now, I have one log file per loop. This creates a LOT of log files, but they're all relevant to each loop and not jumbled or anything. One step at a time, I guess. Here's what I did:

run_models.py

import math
import multiprocessing
import time
from datetime import datetime
from loguru import logger

import pandas as pd
import psutil
from joblib import Parallel, delayed

import helper
import log
import prep_data
import stock_subscriber_data
import train_model


def get_pred(cust_df, stock_id):

    log_file_name = "log_file_{}".format(stock_id)

    logger.add(log_file_name, format="{time} {level}: ({file}:{module} - {line}) >> {message}", level="INFO", enqueue=True)

    logger.info('--------------------------------Stock loop {}-------------------------------'.format(stock_id))

    cust_stockid_df = stock_subscriber_data.get_stockid_data(cust_df, stock_id)
    weekly_timeseries, last_date, abn_df = prep_data.prep(cust_stockid_df, logger)  
    single_row_df = stock_subscriber_data.get_single_row(cust_df, stock_id)

    stock_subscriber_data.write_data(abn_df, 't1')
    test_y, prd = train_model.read_train_write(cust_df, stock_id, weekly_timeseries, last_date, logger)

    return True


def main():

    cust_df = stock_subscriber_data.get_data()
    cust_df = helper.clean_data(cust_df)
    stock_list = cust_df['intStockID'].unique()

    max_proc = max(math.ceil(((psutil.virtual_memory().total >> 30) - 100) / 50), 1)
    num_cores = min(multiprocessing.cpu_count(), max_proc)

    Parallel(n_jobs=num_cores)(delayed(get_pred)(cust_df, s) for s in stock_list)


if __name__ == "__main__":
    main()

这篇关于多处理日志记录-如何将Loguru与Joblib并行使用的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆