如何使用 Keras 在深度学习中利用多处理和多线程? [英] How can take advantage of multiprocessing and multithreading in Deep learning using Keras?
问题描述
我认为像 keras/tensorflow/... 这样的大多数框架会自动使用所有 CPU 内核,但实际上似乎并非如此.我只能找到很少的来源可以让我们在深度学习过程中使用 CPU 的全部容量.我发现了一篇文章,它是关于
I'd assume that most frameworks like keras/tensorflow/... automatically use all CPU cores but in practice it seems they are not. I just could find few sources which can lead us to use whole capacity of CPU during Deep learning process. I found an article which is written about usage of
from multiprocessing import Pool
import psutil
import ray
另一方面,基于这个 answer 在多个进程中使用 keras 模型,没有上面的跟踪 -提到的图书馆.是否有更优雅的方式来利用 Keras 的 多处理,因为它在实现中非常受欢迎.
in another hand, based on this answer for using a keras model in multiple processes there is no track of above-mentioned libraries. Is there the more elegant way to take advantage of Multiprocessing for Keras since it's very popular for implementation.
例如,在学习过程中,如何修改以下简单的 RNN 实现以实现至少 50% 的 CPU 容量?
For instance , how can modify following simple RNN implementation to achieve at least 50% capacity of CPU during learning process?
我应该像 LSTM 一样使用第二个模型作为多任务处理吗?我的意思是我们可以通过使用更多的 CPU 容量来同时管理多个模型吗?
Should I use 2nd model as multitasking like LSTM which I comment bellow? I mean can we simultaneously manage to run multi-models by using more capacity of CPU?
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from keras.layers.normalization import BatchNormalization
from sklearn.preprocessing import MinMaxScaler
from sklearn.metrics import mean_squared_error
from sklearn.metrics import accuracy_score
from sklearn.model_selection import train_test_split
from keras.layers import Dense
from keras.layers import Dropout
from keras.layers import LSTM,SimpleRNN
from keras.models import Sequential
from keras.optimizers import Adam, RMSprop
df = pd.read_csv("D:Train.csv", header=None)
index = [i for i in list(range(1440)) if i%3==2]
Y_train= df[index]
df = df.values
#making history by using look-back to prediction next
def create_dataset(dataset,data_train,look_back=1):
dataX,dataY = [],[]
print("Len:",len(dataset)-look_back-1)
for i in range(len(dataset)-look_back-1):
a = dataset[i:(i+look_back), :]
dataX.append(a)
dataY.append(data_train[i + look_back, :])
return np.array(dataX), np.array(dataY)
Y_train=np.array(Y_train)
df=np.array(df)
look_back = 10
trainX,trainY = create_dataset(df,Y_train, look_back=look_back)
#Split data into train & test
trainX, testX, trainY, testY = train_test_split(trainX,trainY, test_size=0.2 , shuffle=False)
#Shape of train and test data
trainX, testX, trainY, testY = train_test_split(trainX,trainY, test_size=0.2 , shuffle=False)
print("train size: {}".format(trainX.shape))
print("train Label size: {}".format(trainY.shape))
print("test size: {}".format(testX.shape))
print("test Label size: {}".format(testY.shape))
#train size: (23, 10, 1440)
#train Label size: (23, 960)
#test size: (6, 10, 1440)
#test Label size: (6, 960)
model_RNN = Sequential()
model_RNN.add(SimpleRNN(units=1440, input_shape=(trainX.shape[1], trainX.shape[2])))
model_RNN.add(Dense(960))
model_RNN.add(BatchNormalization())
model_RNN.add(Activation('tanh'))
# Compile model
model_RNN.compile(loss='mean_squared_error', optimizer='adam')
callbacks = [
EarlyStopping(patience=10, verbose=1),
ReduceLROnPlateau(factor=0.1, patience=3, min_lr=0.00001, verbose=1)]
# Fit the model
hist_RNN=model_RNN.fit(trainX, trainY, epochs =50, batch_size =20,validation_data=(testX,testY),verbose=1, callbacks=callbacks)
#predict
Y_train=np.array(trainY)
Y_test=np.array(testX)
Y_RNN_Train_pred=model_RNN.predict(trainX)
Y_RNN_Test_pred=model_RNN.predict(testX)
train_MSE=mean_squared_error(trainY, Y_RNN_Train_pred)
test_MSE=mean_squared_error(testY, Y_RNN_Test_pred)
# create and fit the Simple LSTM model as 2nd model for multi-tasking
#model_LSTM = Sequential()
#model_LSTM.add(LSTM(units = 1440, input_shape=(trainX.shape[1], trainX.shape[2])))
#model_LSTM.add(Dense(units = 960))
#model_LSTM.add(BatchNormalization())
#model_LSTM.add(Activation('tanh'))
#model_LSTM.compile(loss='mean_squared_error', optimizer='adam')
#hist_LSTM=model_LSTM.fit(trainX, trainY, epochs =50, batch_size =20,validation_data=(testX,testY),verbose=1, callbacks=callbacks)
#Y_train=np.array(trainY)
#Y_test=np.array(testX)
#Y_LSTM_Train_pred=model_LSTM.predict(trainX)
#Y_LSTM_Test_pred=model_LSTM.predict(testX)
#train_MSE=mean_squared_error(trainY, Y_LSTM_Train_pred)
#test_MSE=mean_squared_error(testY, Y_LSTM_Test_pred)
#plot losses for RNN + LSTM
f, ax = plt.subplots(figsize=(20, 15))
plt.subplot(1, 2, 1)
ax=plt.plot(hist_RNN.history['loss'] ,label='Train loss')
ax=plt.plot(hist_RNN.history['val_loss'],label='Test/Validation/Prediction loss')
plt.xlabel('Training steps (Epochs = 50)')
plt.ylabel('Loss (MSE) for Sx-Sy & Sxy')
plt.title(' RNN Loss on Train and Test data')
plt.legend()
plt.subplot(1, 2, 2)
ax=plt.plot(hist_LSTM.history['loss'] ,label='Train loss')
ax=plt.plot(hist_LSTM.history['val_loss'],label='Test/Validation/Prediction loss')
plt.xlabel('Training steps (Epochs = 50)')
plt.ylabel('Loss (MSE) for Sx-Sy & Sxy')
plt.title('LSTM Loss on Train and Test data')
plt.legend()
plt.subplots_adjust(top=0.80, bottom=0.38, left=0.12, right=0.90, hspace=0.37, wspace=0.28)
#plt.savefig('All_Losses_history_.png')
plt.show()
注意我无法访问CUDA,只是访问没有VGA的强大服务器.我的目标是利用多处理和多线程来使用 CPU 的最大容量,而不是 30%,这意味着只有一个内核,而我有四核!任何建议将不胜感激.我上传了一个格式化的 csv 数据集.
Note I don't access to CUDA just I access powerful server without VGA. My aim is to take advantage of multiprocessing and multithreading for use maximum capacity of CPU instead of 30% it means just one core while I have Quad-core! Any advice would be greatly appreciated. I have uploaded a formatted csv dataset.
更新:我的硬件配置如下:
- CPU:AMD A8-7650K Radeon R7 10 计算核心 4C+6G 3.30 GHz
- 内存:16GB
- 操作系统:Win 7
- Python 3.6.6 版
- TensorFlow 1.8.0 版
- Keras 2.2.4 版
推荐答案
训练一个模型并没有 100% 使用 CPU 的所有情况是一件好事!现在我们有空间并行训练多个模型并加快您的整体训练时间.
It's a good thing that training one model doesn't use all 100% of your CPU! Now we have space to train multiple models in parallel and speed up your overall training times.
注意:如果您只想加速此模型,请查看 GPU 或更改超参数,例如批量大小和神经元数量(层大小).
以下是如何使用 multiprocessing
同时训练多个模型(使用在机器的每个独立 CPU 内核上并行运行的进程).
Here's how you can use multiprocessing
to train multiple models at the same time (using processes running in parallel on each separate CPU core of your machine).
multiprocessing.Pool
基本上创建了一个需要做的工作池.这些进程将获取这些作业并运行它们.当一个作业完成时,该进程将从池中选择另一个作业.
The multiprocessing.Pool
basically creates a pool of jobs that need doing. The processes will pick up these jobs and run them. When a job is finished, the process will pick up another job from the pool.
import time
import signal
import multiprocessing
def init_worker():
''' Add KeyboardInterrupt exception to mutliprocessing workers '''
signal.signal(signal.SIGINT, signal.SIG_IGN)
def train_model(layer_size):
'''
This code is parallelised and runs on each process
It trains a model with different layer sizes (hyperparameters)
It saves the model and returns the score (error)
'''
import keras
from keras.models import Sequential
from keras.layers import Dense
print(f'Training a model with layer size {layer_size}')
# build your model here
model_RNN = Sequential()
model_RNN.add(Dense(layer_size))
# fit the model (the bit that takes time!)
model_RNN.fit(...)
# lets demonstrate with a sleep timer
time.sleep(5)
# save trained model to a file
model_RNN.save(...)
# you can also return values eg. the eval score
return model_RNN.evaluate(...)
num_workers = 4
hyperparams = [800, 960, 1100]
pool = multiprocessing.Pool(num_workers, init_worker)
scores = pool.map(train_model, hyperparams)
print(scores)
输出:
Training a model with layer size 800
Training a model with layer size 960
Training a model with layer size 1100
[{'size':960,'score':1.0}, {'size':800,'score':1.2}, {'size':1100,'score':0.7}]
这可以通过代码中的 time.sleep
轻松演示.您将看到所有 3 个进程都开始训练作业,然后它们几乎同时完成.如果这是单次处理,则您必须等待每个完成后才能开始下一个(哈欠!).
This is easily demonstrated with a time.sleep
in the code. You'll see that all 3 processes start the training job, and then they all finish at about the same time. If this was single processed, you'd have to wait for each to finish before starting the next (yawn!).
编辑OP 还想要完整的代码.这在 Stack Overflow 上很困难,因为我无法在您的环境和代码中进行测试.我冒昧地将您的代码复制并粘贴到我上面的模板中.您可能需要添加一些导入,但这与可运行"和完整"代码一样接近.
EDIT OP also wanted full code. This is difficult on Stack Overflow because I can't test in your environment and with your code. I've taken the liberty of copying and pasting your code into my template above. You may need to add some imports but this is as close as you'll get to "runnable" and "full" code.
import time
import signal
import numpy as np
import pandas as pd
import multiprocessing
from sklearn.preprocessing import MinMaxScaler
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_squared_error
from sklearn.metrics import accuracy_score
def init_worker():
''' Add KeyboardInterrupt exception to mutliprocessing workers '''
signal.signal(signal.SIGINT, signal.SIG_IGN)
def train_model(model_type):
'''
This code is parallelised and runs on each process
It trains a model with different layer sizes (hyperparameters)
It saves the model and returns the score (error)
'''
from keras.layers import LSTM, SimpleRNN, Dense, Activation
from keras.models import Sequential
from keras.callbacks import EarlyStopping, ReduceLROnPlateau
from keras.layers.normalization import BatchNormalization
print(f'Training a model: {model_type}')
callbacks = [
EarlyStopping(patience=10, verbose=1),
ReduceLROnPlateau(factor=0.1, patience=3, min_lr=0.00001, verbose=1),
]
model = Sequential()
if model_type == 'rnn':
model.add(SimpleRNN(units=1440, input_shape=(trainX.shape[1], trainX.shape[2])))
elif model_type == 'lstm':
model.add(LSTM(units=1440, input_shape=(trainX.shape[1], trainX.shape[2])))
model.add(Dense(480))
model.add(BatchNormalization())
model.add(Activation('tanh'))
model.compile(loss='mean_squared_error', optimizer='adam')
model.fit(
trainX,
trainY,
epochs=50,
batch_size=20,
validation_data=(testX, testY),
verbose=1,
callbacks=callbacks,
)
# predict
Y_Train_pred = model.predict(trainX)
Y_Test_pred = model.predict(testX)
train_MSE = mean_squared_error(trainY, Y_Train_pred)
test_MSE = mean_squared_error(testY, Y_Test_pred)
# you can also return values eg. the eval score
return {'type': model_type, 'train_MSE': train_MSE, 'test_MSE': test_MSE}
# Your code
# ---------
df = pd.read_csv("D:Train.csv", header=None)
index = [i for i in list(range(1440)) if i % 3 == 2]
Y_train = df[index]
df = df.values
# making history by using look-back to prediction next
def create_dataset(dataset, data_train, look_back=1):
dataX, dataY = [], []
print("Len:", len(dataset) - look_back - 1)
for i in range(len(dataset) - look_back - 1):
a = dataset[i : (i + look_back), :]
dataX.append(a)
dataY.append(data_train[i + look_back, :])
return np.array(dataX), np.array(dataY)
Y_train = np.array(Y_train)
df = np.array(df)
look_back = 10
trainX, trainY = create_dataset(df, Y_train, look_back=look_back)
# Split data into train & test
trainX, testX, trainY, testY = train_test_split(
trainX, trainY, test_size=0.2, shuffle=False
)
# My Code
# -------
num_workers = 2
model_types = ['rnn', 'lstm']
pool = multiprocessing.Pool(num_workers, init_worker)
scores = pool.map(train_model, model_types)
print(scores)
程序输出:
[{'type': 'rnn', 'train_MSE': 0.06648435491248038, 'test_MSE': 0.062323388902691866},
{'type': 'lstm', 'train_MSE': 0.10114341514420684, 'test_MSE': 0.09998065769499974}]
这篇关于如何使用 Keras 在深度学习中利用多处理和多线程?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!