在使用Keras进行的深度学习中,如何利用多处理和多线程的优势? [英] How can take advantage of multiprocessing and multithreading in Deep learning using Keras?

查看:95
本文介绍了在使用Keras进行的深度学习中,如何利用多处理和多线程的优势?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我假设像keras/tensorflow/...这样的大多数框架会自动使用所有CPU内核,但实际上似乎并非如此.我只是发现很少的资源可以导致我们在深度学习过程中使用CPU的全部容量.我发现了文章,其中写有关于

I'd assume that most frameworks like keras/tensorflow/... automatically use all CPU cores but in practice it seems they are not. I just could find few sources which can lead us to use whole capacity of CPU during Deep learning process. I found an article which is written about usage of

from multiprocessing import Pool 
import psutil
import ray 

另一方面,基于此答案,该方法用于在多个流程中使用keras模型,因此没有上述的踪迹-提到的图书馆.是否有一种更优雅的方式来利用Keras的 Multiprocessing 优势,因为它在实施中非常受欢迎.

in another hand, based on this answer for using a keras model in multiple processes there is no track of above-mentioned libraries. Is there the more elegant way to take advantage of Multiprocessing for Keras since it's very popular for implementation.

  • 例如,如何通过简单的RNN实现进行修改,以在学习过程中实现至少50%的CPU容量?

  • For instance , how can modify following simple RNN implementation to achieve at least 50% capacity of CPU during learning process?

我应该使用第二模型作为多任务处理,例如LSTM,在下面进行评论吗?我的意思是我们可以通过使用更多的CPU能力同时管理运行多模型吗?

Should I use 2nd model as multitasking like LSTM which I comment bellow? I mean can we simultaneously manage to run multi-models by using more capacity of CPU?

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

from keras.layers.normalization import BatchNormalization
from sklearn.preprocessing import MinMaxScaler
from sklearn.metrics import mean_squared_error
from sklearn.metrics import accuracy_score
from sklearn.model_selection import train_test_split
from keras.layers import Dense
from keras.layers import Dropout
from keras.layers import LSTM,SimpleRNN
from keras.models import Sequential
from keras.optimizers import Adam, RMSprop

df = pd.read_csv("D:\Train.csv", header=None)

index = [i for i in list(range(1440)) if i%3==2]

Y_train= df[index]
df = df.values

#making history by using look-back to prediction next
def create_dataset(dataset,data_train,look_back=1):
    dataX,dataY = [],[]
    print("Len:",len(dataset)-look_back-1)
    for i in range(len(dataset)-look_back-1):
        a = dataset[i:(i+look_back), :]
        dataX.append(a)
        dataY.append(data_train[i + look_back,  :])
    return np.array(dataX), np.array(dataY)

Y_train=np.array(Y_train)
df=np.array(df)

look_back = 10
trainX,trainY = create_dataset(df,Y_train, look_back=look_back)

#Split data into train & test
trainX, testX, trainY, testY = train_test_split(trainX,trainY, test_size=0.2 , shuffle=False)

#Shape of train and test data
trainX, testX, trainY, testY = train_test_split(trainX,trainY, test_size=0.2 , shuffle=False)
print("train size: {}".format(trainX.shape))
print("train Label size: {}".format(trainY.shape))
print("test size: {}".format(testX.shape))
print("test Label size: {}".format(testY.shape))
#train size: (23, 10, 1440)
#train Label size: (23, 960)
#test size: (6, 10, 1440)
#test Label size: (6, 960)


model_RNN = Sequential()
model_RNN.add(SimpleRNN(units=1440, input_shape=(trainX.shape[1], trainX.shape[2])))
model_RNN.add(Dense(960))
model_RNN.add(BatchNormalization())
model_RNN.add(Activation('tanh'))
# Compile model
model_RNN.compile(loss='mean_squared_error', optimizer='adam')
callbacks = [
    EarlyStopping(patience=10, verbose=1),
    ReduceLROnPlateau(factor=0.1, patience=3, min_lr=0.00001, verbose=1)]
# Fit the model
hist_RNN=model_RNN.fit(trainX, trainY, epochs =50, batch_size =20,validation_data=(testX,testY),verbose=1, callbacks=callbacks)


#predict

Y_train=np.array(trainY)
Y_test=np.array(testX)

Y_RNN_Train_pred=model_RNN.predict(trainX)
Y_RNN_Test_pred=model_RNN.predict(testX)

train_MSE=mean_squared_error(trainY, Y_RNN_Train_pred)
test_MSE=mean_squared_error(testY, Y_RNN_Test_pred)

# create and fit the Simple LSTM model as 2nd model for multi-tasking

#model_LSTM = Sequential()
#model_LSTM.add(LSTM(units = 1440, input_shape=(trainX.shape[1], trainX.shape[2])))
#model_LSTM.add(Dense(units = 960))
#model_LSTM.add(BatchNormalization())
#model_LSTM.add(Activation('tanh'))
#model_LSTM.compile(loss='mean_squared_error', optimizer='adam')
#hist_LSTM=model_LSTM.fit(trainX, trainY, epochs =50, batch_size =20,validation_data=(testX,testY),verbose=1, callbacks=callbacks)

#Y_train=np.array(trainY)
#Y_test=np.array(testX)

#Y_LSTM_Train_pred=model_LSTM.predict(trainX)
#Y_LSTM_Test_pred=model_LSTM.predict(testX)

#train_MSE=mean_squared_error(trainY, Y_LSTM_Train_pred)
#test_MSE=mean_squared_error(testY, Y_LSTM_Test_pred)

#plot losses for RNN + LSTM
f, ax = plt.subplots(figsize=(20, 15))
    plt.subplot(1, 2, 1)
    ax=plt.plot(hist_RNN.history['loss']    ,label='Train loss')
    ax=plt.plot(hist_RNN.history['val_loss'],label='Test/Validation/Prediction loss')
    plt.xlabel('Training steps (Epochs = 50)')
    plt.ylabel('Loss (MSE) for Sx-Sy & Sxy')
    plt.title(' RNN Loss on Train and Test data')
    plt.legend()
    plt.subplot(1, 2, 2)
    ax=plt.plot(hist_LSTM.history['loss']    ,label='Train loss')
    ax=plt.plot(hist_LSTM.history['val_loss'],label='Test/Validation/Prediction loss')
    plt.xlabel('Training steps (Epochs = 50)')
    plt.ylabel('Loss (MSE) for Sx-Sy & Sxy')
    plt.title('LSTM Loss on Train and Test data')
    plt.legend()

    plt.subplots_adjust(top=0.80, bottom=0.38, left=0.12, right=0.90, hspace=0.37, wspace=0.28)
    #plt.savefig('All_Losses_history_.png')
    plt.show()

注意,我只能访问功能强大的不带VGA的服务器,而不能访问 CUDA .我的目标是利用多处理和多线程来最大程度地利用CPU而不是30%的容量,这意味着只有四核才能拥有一个核! 任何建议将不胜感激.我已经上传了格式化的 csv 数据集.

Note I don't access to CUDA just I access powerful server without VGA. My aim is to take advantage of multiprocessing and multithreading for use maximum capacity of CPU instead of 30% it means just one core while I have Quad-core! Any advice would be greatly appreciated. I have uploaded a formatted csv dataset.

更新:我的硬件配置如下:

Update: my HW configuration is following:

  • CPU:AMD A8-7650K Radeon R7 10计算核心4C + 6G 3.30 GHz
  • RAM:16GB
  • 操作系统:Win 7
  • Python版本3.6.6
  • Tensorflow版本1.8.0
  • Keras版本2.2.4

推荐答案

很好的是,训练一个模型不会使用所有100%的CPU!现在,我们有空间并行训练多个模型,并加快您的总体训练时间.

It's a good thing that training one model doesn't use all 100% of your CPU! Now we have space to train multiple models in parallel and speed up your overall training times.

NB:如果您只是想加速此模型,请查看GPU或更改超参数,例如批大小和神经元数量(层大小).

在这里,您可以使用multiprocessing同时训练多个模型(使用在计算机的每个单独CPU内核上并行运行的进程).

Here's how you can use multiprocessing to train multiple models at the same time (using processes running in parallel on each separate CPU core of your machine).

multiprocessing.Pool基本上创建了一个需要完成的工作池.流程将拾取并运行这些作业.作业完成后,该过程将从池中提取另一个作业.

The multiprocessing.Pool basically creates a pool of jobs that need doing. The processes will pick up these jobs and run them. When a job is finished, the process will pick up another job from the pool.

import time
import signal
import multiprocessing

def init_worker():
    ''' Add KeyboardInterrupt exception to mutliprocessing workers '''
    signal.signal(signal.SIGINT, signal.SIG_IGN)


def train_model(layer_size):
    '''
    This code is parallelised and runs on each process
    It trains a model with different layer sizes (hyperparameters)
    It saves the model and returns the score (error)
    '''
    import keras
    from keras.models import Sequential
    from keras.layers import Dense

    print(f'Training a model with layer size {layer_size}')

    # build your model here
    model_RNN = Sequential()
    model_RNN.add(Dense(layer_size))

    # fit the model (the bit that takes time!)
    model_RNN.fit(...)

    # lets demonstrate with a sleep timer
    time.sleep(5)

    # save trained model to a file
    model_RNN.save(...)

    # you can also return values eg. the eval score
    return model_RNN.evaluate(...)


num_workers = 4
hyperparams = [800, 960, 1100]

pool = multiprocessing.Pool(num_workers, init_worker)

scores = pool.map(train_model, hyperparams)

print(scores)

输出:

Training a model with layer size 800
Training a model with layer size 960
Training a model with layer size 1100
[{'size':960,'score':1.0}, {'size':800,'score':1.2}, {'size':1100,'score':0.7}]

这很容易在代码中用time.sleep进行演示.您会看到所有3个过程都开始了培训工作,然后它们几乎都在同一时间完成.如果这是单处理的,则必须等待每个处理完成,然后再开始下一个(打哈欠!).

This is easily demonstrated with a time.sleep in the code. You'll see that all 3 processes start the training job, and then they all finish at about the same time. If this was single processed, you'd have to wait for each to finish before starting the next (yawn!).

编辑 OP还需要完整的代码.这在Stack Overflow上很困难,因为我无法在您的环境和您的代码中进行测试.我已经自由地将您的代码复制并粘贴到上面的模板中.您可能需要添加一些导入,但这与您获得可运行的"和完整的"代码非常接近.

EDIT OP also wanted full code. This is difficult on Stack Overflow because I can't test in your environment and with your code. I've taken the liberty of copying and pasting your code into my template above. You may need to add some imports but this is as close as you'll get to "runnable" and "full" code.

import time
import signal
import numpy as np
import pandas as pd
import multiprocessing
from sklearn.preprocessing import MinMaxScaler
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_squared_error
from sklearn.metrics import accuracy_score


def init_worker():
    ''' Add KeyboardInterrupt exception to mutliprocessing workers '''
    signal.signal(signal.SIGINT, signal.SIG_IGN)


def train_model(model_type):
    '''
    This code is parallelised and runs on each process
    It trains a model with different layer sizes (hyperparameters)
    It saves the model and returns the score (error)
    '''
    from keras.layers import LSTM, SimpleRNN, Dense, Activation
    from keras.models import Sequential
    from keras.callbacks import EarlyStopping, ReduceLROnPlateau
    from keras.layers.normalization import BatchNormalization

    print(f'Training a model: {model_type}')

    callbacks = [
        EarlyStopping(patience=10, verbose=1),
        ReduceLROnPlateau(factor=0.1, patience=3, min_lr=0.00001, verbose=1),
    ]

    model = Sequential()

    if model_type == 'rnn':
        model.add(SimpleRNN(units=1440, input_shape=(trainX.shape[1], trainX.shape[2])))
    elif model_type == 'lstm':
        model.add(LSTM(units=1440, input_shape=(trainX.shape[1], trainX.shape[2])))

    model.add(Dense(480))
    model.add(BatchNormalization())
    model.add(Activation('tanh'))
    model.compile(loss='mean_squared_error', optimizer='adam')
    model.fit(
        trainX,
        trainY,
        epochs=50,
        batch_size=20,
        validation_data=(testX, testY),
        verbose=1,
        callbacks=callbacks,
    )

    # predict
    Y_Train_pred = model.predict(trainX)
    Y_Test_pred = model.predict(testX)

    train_MSE = mean_squared_error(trainY, Y_Train_pred)
    test_MSE = mean_squared_error(testY, Y_Test_pred)

    # you can also return values eg. the eval score
    return {'type': model_type, 'train_MSE': train_MSE, 'test_MSE': test_MSE}


# Your code
# ---------

df = pd.read_csv("D:\Train.csv", header=None)

index = [i for i in list(range(1440)) if i % 3 == 2]

Y_train = df[index]
df = df.values

# making history by using look-back to prediction next
def create_dataset(dataset, data_train, look_back=1):
    dataX, dataY = [], []
    print("Len:", len(dataset) - look_back - 1)
    for i in range(len(dataset) - look_back - 1):
        a = dataset[i : (i + look_back), :]
        dataX.append(a)
        dataY.append(data_train[i + look_back, :])
    return np.array(dataX), np.array(dataY)


Y_train = np.array(Y_train)
df = np.array(df)

look_back = 10
trainX, trainY = create_dataset(df, Y_train, look_back=look_back)

# Split data into train & test
trainX, testX, trainY, testY = train_test_split(
    trainX, trainY, test_size=0.2, shuffle=False
)

# My Code
# -------

num_workers = 2
model_types = ['rnn', 'lstm']

pool = multiprocessing.Pool(num_workers, init_worker)

scores = pool.map(train_model, model_types)

print(scores)

程序输出:

[{'type': 'rnn', 'train_MSE': 0.06648435491248038, 'test_MSE': 0.062323388902691866}, 
 {'type': 'lstm', 'train_MSE': 0.10114341514420684, 'test_MSE': 0.09998065769499974}]

这篇关于在使用Keras进行的深度学习中,如何利用多处理和多线程的优势?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆