如何在结构化查询中使用scikit学习模型? [英] How to use scikit-learn model in structured query?
本文介绍了如何在结构化查询中使用scikit学习模型?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!
问题描述
我正在尝试将使用泡菜检索的scikit模型应用于结构化流数据帧的每一行.
I'm trying to apply a scikit model retrieved using a pickle to every row of a structured streaming dataframe.
我尝试使用pandas_udf(版本代码1),它给了我这个错误:
I've tried using pandas_udf (version code 1), and it gives me this error:
AttributeError: 'numpy.ndarray' object has no attribute 'isnull'
代码:
inputPath = "/FileStore/df_training/streaming_df_1_nh_nd/"
from pyspark.sql import functions as f
from pyspark.sql.types import *
data_schema = data_spark_ts.schema
import pandas as pd
from pyspark.sql.functions import col, pandas_udf, PandasUDFType # User Defines Functions for Pandas Dataframe
from pyspark.sql.types import LongType
get_prediction = pandas_udf(lambda x: gb2.predict(x), IntegerType())
streamingInputDF = (
spark
.readStream
.schema(data_schema) # Set the schema of the JSON data
.option("maxFilesPerTrigger", 1) # Treat a sequence of files as a stream by picking one file at a time
.csv(inputPath)
.fillna(0)
.withColumn("prediction", get_prediction( f.struct([col(x) for x in data_spark.columns]) ))
)
display(streamingInputDF.select("prediction"))
我也尝试过使用普通的udf代替pandas_udf,它给了我这个错误:
I've tried also using a normal udf instead of the pandas_udf, and it gives me this error:
ValueError: Expected 2D array, got 1D array instead:
[.. ... .. ..]
Reshape your data either using array.reshape(-1, 1) if your data has a single feature or array.reshape(1, -1) if it contains a single sample.
我不知道如何重塑数据.
I don't know how to reshape my data.
我尝试应用的模型是通过以下方式检索的:
The model I try to apply is retrieved this way:
#load the pickle
import pickle
gb2 = None
with open('pickle_modello_unico.p', 'rb') as fp:
gb2 = pickle.load(fp)
它的规格是这个:
GradientBoostingClassifier(criterion='friedman_mse', init=None,
learning_rate=0.1, loss='deviance', max_depth=3,
max_features=None, max_leaf_nodes=None,
min_impurity_decrease=0.0, min_impurity_split=None,
min_samples_leaf=1, min_samples_split=2,
min_weight_fraction_leaf=0.0, n_estimators=300,
n_iter_no_change=None, presort='auto', random_state=None,
subsample=1.0, tol=0.0001, validation_fraction=0.1,
verbose=0, warm_start=False)
有什么解决的办法吗?
Any help to solve this?
推荐答案
我解决了从pandas_udf返回pd.Series的问题.
I solved the issue returning a pd.Series from the pandas_udf.
这是工作代码:
inputPath = "/FileStore/df_training/streaming_df_1_nh_nd/"
from pyspark.sql import functions as f
from pyspark.sql.types import *
data_schema = data_spark_ts.schema
import pandas as pd
from pyspark.sql.functions import col, pandas_udf, PandasUDFType # User Defines Functions for Pandas Dataframe
from pyspark.sql.types import LongType
get_prediction = pandas_udf(lambda x: pd.Series(gb2.predict(x)), StringType())
streamingInputDF = (
spark
.readStream
.schema(data_schema) # Set the schema of the JSON data
.option("maxFilesPerTrigger", 1) # Treat a sequence of files as a stream by picking one file at a time
.csv(inputPath)
.withColumn("prediction", get_prediction( f.struct([col(x) for x in data_spark.columns]) ))
)
display(streamingInputDF.select("prediction"))
这篇关于如何在结构化查询中使用scikit学习模型?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!
查看全文