要检查的 UDF 是非零向量,在通过 spark-submit CountVectorizer 后不起作用 [英] UDF to check is non zero vector, not working after CountVectorizer through spark-submit

查看:26
本文介绍了要检查的 UDF 是非零向量,在通过 spark-submit CountVectorizer 后不起作用的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

根据这个问题,我是在 CountVectorizer 之后应用 udf 过滤空向量.

As per this question, I am applying udf to filter empty vector after CountVectorizer.

val tokenizer = new RegexTokenizer().setPattern("\\|").setInputCol("dataString").setOutputCol("dataStringWords")
val vectorizer = new CountVectorizer().setInputCol("dataStringWords").setOutputCol("features")

val pipelineTV = new Pipeline().setStages(Array(tokenizer, vectorizer))
val modelTV = pipelineTV.fit(dataset1)

val isNoneZeroVector = udf({v: Vector => v.numNonzeros > 0}, DataTypes.BooleanType)

val dataset1_TV = modelTV.transform(dataset1).filter(isNoneZeroVector(col("features")))
val dataset2_TV = modelTV.transform(dataset2).filter(isNoneZeroVector(col("features")))

val lsh = new MinHashLSH().setNumHashTables(20).setInputCol("features").setOutputCol("hashValues")
val pipelineLSH = new Pipeline().setStages(Array(lsh))
val modelLSH = pipelineLSH.fit(dataset1_TV) // Fails at this line

这段代码通过 spark-shell 完美运行.但是当我通过 spark-submit 运行相同的代码彻底的驱动程序应用程序时,它给出以下错误:

This piece of code works perfectly through spark-shell. But when I run same code thorough driver application via spark-submit, then its giving following error:

java.lang.ClassCastException: 不能分配实例scala.collection.immutable.List$SerializationProxy 到字段org.apache.spark.rdd.RDD.org$apache$spark$rdd$RDD$$dependencies_在实例中输入 scala.collection.Seqorg.apache.spark.rdd.MapPartitionsRDD

java.lang.ClassCastException: cannot assign instance of scala.collection.immutable.List$SerializationProxy to field org.apache.spark.rdd.RDD.org$apache$spark$rdd$RDD$$dependencies_ of type scala.collection.Seq in instance of org.apache.spark.rdd.MapPartitionsRDD

Driver stacktrace:
        at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1499) ~[spark-core_2.11-2.2.0.jar:2.2.0]
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1487) ~[spark-core_2.11-2.2.0.jar:2.2.0]
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1486) ~[spark-core_2.11-2.2.0.jar:2.2.0]
        at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) ~[scala-library-2.11.8.jar:?]
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) ~[scala-library-2.11.8.jar:?]
        at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1486) ~[spark-core_2.11-2.2.0.jar:2.2.0]
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814) ~[spark-core_2.11-2.2.0.jar:2.2.0]
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:814) ~[spark-core_2.11-2.2.0.jar:2.2.0]
        at scala.Option.foreach(Option.scala:257) ~[scala-library-2.11.8.jar:?]
        at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:814) ~[spark-core_2.11-2.2.0.jar:2.2.0]
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1714) ~[spark-core_2.11-2.2.0.jar:2.2.0]
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1669) ~[spark-core_2.11-2.2.0.jar:2.2.0]
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1658) ~[spark-core_2.11-2.2.0.jar:2.2.0]
        at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) ~[spark-core_2.11-2.2.0.jar:2.2.0]
        at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:630) ~[spark-core_2.11-2.2.0.jar:2.2.0]
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2022) ~[spark-core_2.11-2.2.0.jar:2.2.0]
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2043) ~[spark-core_2.11-2.2.0.jar:2.2.0]
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2062) ~[spark-core_2.11-2.2.0.jar:2.2.0]
        at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:336) ~[spark-sql_2.11-2.2.0.jar:2.2.0]
        at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38) ~[spark-sql_2.11-2.2.0.jar:2.2.0]
        at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:2853) ~[spark-sql_2.11-2.2.0.jar:2.2.0]
        at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2153) ~[spark-sql_2.11-2.2.0.jar:2.2.0]
        at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2153) ~[spark-sql_2.11-2.2.0.jar:2.2.0]
        at org.apache.spark.sql.Dataset$$anonfun$55.apply(Dataset.scala:2837) ~[spark-sql_2.11-2.2.0.jar:2.2.0]
        at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65) ~[spark-sql_2.11-2.2.0.jar:2.2.0]
        at org.apache.spark.sql.Dataset.withAction(Dataset.scala:2836) ~[spark-sql_2.11-2.2.0.jar:2.2.0]
        at org.apache.spark.sql.Dataset.head(Dataset.scala:2153) ~[spark-sql_2.11-2.2.0.jar:2.2.0]
        at org.apache.spark.sql.Dataset.head(Dataset.scala:2160) ~[spark-sql_2.11-2.2.0.jar:2.2.0]
        at org.apache.spark.ml.feature.LSH.fit(LSH.scala:322) ~[spark-mllib_2.11-2.2.0.jar:2.2.0]
        at org.apache.spark.ml.feature.LSH.fit(LSH.scala:298) ~[spark-mllib_2.11-2.2.0.jar:2.2.0]
        at org.apache.spark.ml.Pipeline$$anonfun$fit$2.apply(Pipeline.scala:153) ~[spark-mllib_2.11-2.2.0.jar:2.2.0]
        at org.apache.spark.ml.Pipeline$$anonfun$fit$2.apply(Pipeline.scala:149) ~[spark-mllib_2.11-2.2.0.jar:2.2.0]
        at scala.collection.Iterator$class.foreach(Iterator.scala:893) ~[scala-library-2.11.8.jar:?]
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1336) ~[scala-library-2.11.8.jar:?]
        at scala.collection.IterableViewLike$Transformed$class.foreach(IterableViewLike.scala:44) ~[scala-library-2.11.8.jar:?]
        at scala.collection.SeqViewLike$AbstractTransformed.foreach(SeqViewLike.scala:37) ~[scala-library-2.11.8.jar:?]
        at org.apache.spark.ml.Pipeline.fit(Pipeline.scala:149) ~[spark-mllib_2.11-2.2.0.jar:2.2.0]
        at com.sheel.driver.job.XJob.ymethod(XJob.scala:170) ~[MyApplication-full.jar:1.0.0]
        at com.sheel.driver.job.XJob.zmethod(XJob.scala:135) [MyApplication-full.jar:1.0.0]
Caused by: java.lang.ClassCastException: cannot assign instance of scala.collection.immutable.List$SerializationProxy to field org.apache.spark.rdd.RDD.org$apache$spark$rdd$RDD$$dependencies_ of type scala.collection.Seq in instance of org.apache.spark.rdd.MapPartitionsRDD
        at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2233) ~[?:1.8.0_161]
        at java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1405) ~[?:1.8.0_161]
        at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2284) ~[?:1.8.0_161]
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2202) ~[?:1.8.0_161]
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2060) ~[?:1.8.0_161]
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1567) ~[?:1.8.0_161]
        at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2278) ~[?:1.8.0_161]
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2202) ~[?:1.8.0_161]
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2060) ~[?:1.8.0_161]
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1567) ~[?:1.8.0_161]
        at java.io.ObjectInputStream.readObject(ObjectInputStream.java:427) ~[?:1.8.0_161]
        at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:75) ~[spark-core_2.11-2.2.0.jar:2.2.0]
        at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:114) ~[spark-core_2.11-2.2.0.jar:2.2.0]
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:80) ~[spark-core_2.11-2.2.0.jar:2.2.0]
        at org.apache.spark.scheduler.Task.run(Task.scala:108) ~[spark-core_2.11-2.2.0.jar:2.2.0]
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335) ~[spark-core_2.11-2.2.0.jar:2.2.0]

如果我从进程中删除 isNoneZeroVector UDF,那么它的作用就像魅力一样.但根据 MinHash 标准,输入向量必须至少有 1 个非零条目.

If I remove isNoneZeroVector UDF from process then its works like charm. But as per MinHash criteria, input vector must have at least 1 non-zero entry.

那么有什么不同的方法可以为 Spark ML 编写 UDF.我之前为 Spark SQL 编写过 UDF,并且按照上述语法运行良好.

So is there any different way write UDF for Spark ML. I have written UDF previously for Spark SQL and which works fine as per above syntax.

推荐答案

我在使用 isNoneZeroVector udf 时遇到了类似的错误.

I had a similar error with the isNoneZeroVector udf as it stands in the question.

org.apache.spark.sql.AnalysisException: You're using untyped Scala UDF, which does not have the input type information. 

如果你重新定义那么它会起作用(也可能有一种方法可以在一行中做到这一点)

If you redefine then it will work (there may be a way to do this in one line too)

def isNotEmptyVector(vect: Vector): Boolean = {
   val ifNotEmpty = vect.numNonzeros > 0
   return ifNotEmpty
}

val isNoneZeroVector = udf[Boolean, Vector](isNotEmptyVector)

df.withColumn("notEmpty", isNoneZeroVector($"features"))

这篇关于要检查的 UDF 是非零向量,在通过 spark-submit CountVectorizer 后不起作用的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆