如何使用预训练的Keras模型使用分布式Dask进行模型预测? [英] How To Do Model Predict Using Distributed Dask With a Pre-Trained Keras Model?

查看:243
本文介绍了如何使用预训练的Keras模型使用分布式Dask进行模型预测?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在加载预先训练的keras模型,然后尝试使用dask并行化大量输入数据吗?不幸的是,我与此有关的一些问题与如何创建我的dask数组有关.任何指导将不胜感激!

I am loading my pre-trained keras model and then trying to parallelize a large number of input data using dask? Unfortunately, I'm running into some issues with this relating to how I'm creating my dask array. Any guidance would be greatly appreciated!

设置:

首先,我从此存储库中克隆了 https://github.com/sanchit2843/dlworkshop.git

First I cloned from this repo https://github.com/sanchit2843/dlworkshop.git

可复制代码示例:

import numpy as np
import pandas as pd
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.pipeline import Pipeline, FeatureUnion
from sklearn.model_selection import train_test_split
from keras.models import load_model
import keras
from keras.models import Sequential
from keras.layers import Dense
from dask.distributed import Client
import warnings
import dask.array as DaskArray
warnings.filterwarnings('ignore')

dataset = pd.read_csv('data/train.csv')
X = dataset.drop(['price_range'], axis=1).values
y = dataset[['price_range']].values

# scale data
sc = StandardScaler()
X = sc.fit_transform(X)
ohe = OneHotEncoder()
y = ohe.fit_transform(y).toarray()

X_train,X_test,y_train,y_test = train_test_split(X,y,test_size = 0.2)

# Neural network
model = Sequential()
model.add(Dense(16, input_dim=20, activation="relu"))
model.add(Dense(12, activation="relu"))
model.add(Dense(4, activation="softmax"))
model.compile(loss='categorical_crossentropy', optimizer='adam', metrics=['accuracy'])
model.fit(X_train, y_train, epochs=100, batch_size=64)

# Use dask
client = Client()
def load_and_predict(input_data_chunk):

    def contrastive_loss(y_true, y_pred):
        margin = 1
        square_pred = K.square(y_pred)
        margin_square = K.square(K.maximum(margin - y_pred, 0))
        return K.mean(y_true * square_pred + (1 - y_true) * margin_square)

    mlflow.set_tracking_uri('<uri>')
    mlflow.set_experiment('clean_parties_ml')
    runs = mlflow.search_runs()
    artifact_uri = runs.loc[runs['start_time'].idxmax()]['artifact_uri']
    model = mlflow.keras.load_model(artifact_uri + '/model', custom_objects={'contrastive_loss': contrastive_loss})
    y_pred = model.predict(input_data_chunk)
    return y_pred

da_input_data = da.from_array(X_test, chunks=(100, None))
prediction_results = da_input_data.map_blocks(load_and_predict, dtype=X_test.dtype).compute()

我收到的错误:

AttributeError: '_thread._local' object has no attribute 'value'

推荐答案

Keras/Tensorflow在其他线程系统上不能很好地发挥作用.此处存在有关此主题的持续问题: https://github.com/dask/dask-examples/issues/35

Keras/Tensorflow don't play nicely with other threaded systems. There is an ongoing issue on this topic here: https://github.com/dask/dask-examples/issues/35

这篇关于如何使用预训练的Keras模型使用分布式Dask进行模型预测?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆