Spark 管道错误 [英] Spark Pipeline error

查看:26
本文介绍了Spark 管道错误的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试运行多项 Logistic 回归模型

I am trying to run a Multinomial Logistic Regression model

from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('prepare_data').getOrCreate()

from pyspark.sql.types import *
spark.sql("DROP TABLE IF EXISTS customers")
spark.sql("CREATE TABLE customers (
            Customer_ID DOUBLE, 
            Name STRING, 
            Gender STRING, 
            Address STRING, 
            Nationality DOUBLE, 
            Account_Type STRING, 
            Age DOUBLE, 
            Education STRING, 
            Employment STRING, 
            Salary DOUBLE, 
            Employer_Stability STRING, 
            Customer_Loyalty DOUBLE, 
            Balance DOUBLE, 
            Residential_Status STRING, 
            Service_Level STRING)")
spark.sql("LOAD DATA LOCAL INPATH '../datasets/dummyTrain.csv' INTO TABLE 
            customers")

dataset = spark.table("customers")
cols = dataset.columns
display(dataset)

from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler

categoricalColumns = ["Education", "Employment", "Employer_Stability", 
                      "Residential_Status"]
stages = [] 

for categoricalCol in categoricalColumns:
    stringIndexer = StringIndexer(inputCol=categoricalCol, 
        outputCol=categoricalCol+"Index")
    encoder = OneHotEncoder(inputCol=categoricalCol+"Index", 
        outputCol=categoricalCol+"classVec")
   stages += [stringIndexer, encoder]

label_stringIdx = StringIndexer(inputCol = "Service_Level", outputCol = 
    "label")
stages += [label_stringIdx]

numericCols = ["Age", "Salary", "Customer_Loyalty", "Balance"]
assemblerInputs = map(lambda c: c + "classVec", categoricalColumns) + 
    numericCols
assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")
stages += [assembler]

pipeline = Pipeline(stages=stages)
pipelineModel = pipeline.fit(dataset)
dataset = pipelineModel.transform(dataset)
selectedcols = ["label", "features"] + cols
dataset = dataset.select(selectedcols)
display(dataset)

我收到以下错误:

---------------------------------------------------------------------------
Py4JJavaError                             Traceback (most recent call last)
<ipython-input-31-07d2fb5cecc8> in <module>()
      4 # - fit() computes feature statistics as needed
      5 # - transform() actually transforms the features
----> 6 pipelineModel = pipeline.fit(dataset)
      7 dataset = pipelineModel.transform(dataset)
      8 

/srv/spark/python/pyspark/ml/base.py in fit(self, dataset, params)
     62                 return self.copy(params)._fit(dataset)
     63             else:
---> 64                 return self._fit(dataset)
     65         else:
     66             raise ValueError("Params must be either a param map or a 
list/tuple of param maps, "

/srv/spark/python/pyspark/ml/pipeline.py in _fit(self, dataset)
    109                     transformers.append(model)
    110                     if i < indexOfLastEstimator:
--> 111                         dataset = model.transform(dataset)
    112             else:
    113                 transformers.append(stage)

/srv/spark/python/pyspark/ml/base.py in transform(self, dataset, params)
    103                 return self.copy(params)._transform(dataset)
    104             else:
--> 105                 return self._transform(dataset)
    106         else:
    107             raise ValueError("Params must be a param map but got 
%s." % type(params))

/srv/spark/python/pyspark/ml/wrapper.py in _transform(self, dataset)
    250     def _transform(self, dataset):
    251         self._transfer_params_to_java()
--> 252         return DataFrame(self._java_obj.transform(dataset._jdf), 
dataset.sql_ctx)
    253 
    254 

/srv/spark/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py in 
__call__(self, *args)
   1131         answer = self.gateway_client.send_command(command)
   1132         return_value = get_return_value(
-> 1133             answer, self.gateway_client, self.target_id, self.name)
   1134 
   1135         for temp_arg in temp_args:

/srv/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
     61     def deco(*a, **kw):
     62         try:
---> 63             return f(*a, **kw)
     64         except py4j.protocol.Py4JJavaError as e:
     65             s = e.java_exception.toString()

/srv/spark/python/lib/py4j-0.10.4-src.zip/py4j/protocol.py in 
get_return_value(answer, gateway_client, target_id, name)
    317                 raise Py4JJavaError(
    318                     "An error occurred while calling {0}{1}{2}.\n".
--> 319                     format(target_id, ".", name), value)
    320             else:
    321                 raise Py4JError(

Py4JJavaError: An error occurred while calling o798.transform.
: java.lang.NullPointerException at 

我无法弄清楚我做错了什么,似乎问题可能出在 transform() 方法上.任何帮助将不胜感激.

I have failed to figure out what I have done wrong and seems the issue may be on the transform() method. Any help would be appreciated.

推荐答案

您需要确保数据中没有缺失值 -- 这就是您收到 NullPointerException 的原因.另外,请确保 VectorAssembler 的所有输入特征都是数字.

You need to make sure there are no missing values in your data -- that's why you get the NullPointerException. Also, make sure that all your input features to the VectorAssembler are numeric.

顺便说一句,当您创建编码器时,您可能会考虑将 inputCol 指定为 StringIndexer.getOuputCol().

BTW, when you create encoder you might consider specifying the inputCol as StringIndexer.getOuputCol().

这篇关于Spark 管道错误的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆