spark.ml StringIndexer 在 fit() 上抛出“Unseen label" [英] spark.ml StringIndexer throws 'Unseen label' on fit()

查看:78
本文介绍了spark.ml StringIndexer 在 fit() 上抛出“Unseen label"的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在准备一个玩具 spark.ml 示例.Spark 1.6.0 版,运行在 Oracle JDK 1.8.0_65 版、pyspark、ipython notebook 之上.

I'm preparing a toy spark.ml example. Spark version 1.6.0, running on top of Oracle JDK version 1.8.0_65, pyspark, ipython notebook.

首先,它几乎与 Spark、ML、StringIndexer:处理看不见的标签.将管道拟合到数据集时抛出异常,而不是转换它.在这里抑制异常可能不是解决方案,因为恐怕在这种情况下数据集会变得非常糟糕.

First, it hardly has anything to do with Spark, ML, StringIndexer: handling unseen labels. The exception is thrown while fitting a pipeline to a dataset, not transforming it. And suppressing the exception might not be a solution here, since, I'm afraid, the dataset gets messed pretty bad in this case.

我的数据集未压缩约为 800Mb,因此可能难以重现(较小的子集似乎可以避免此问题).

My dataset is about 800Mb uncompressed, so it might be hard to reproduce (smaller subsets seem to dodge this issue).

数据集如下所示:

+--------------------+-----------+-----+-------+-----+--------------------+
|                 url|         ip|   rs|   lang|label|                 txt|
+--------------------+-----------+-----+-------+-----+--------------------+
|http://3d-detmold...|217.160.215|378.0|     de|  0.0|homwillkommskip c...|
|   http://3davto.ru/| 188.225.16|891.0|     id|  1.0|оформить заказ пе...|
| http://404.szm.com/|  85.248.42| 58.0|     cs|  0.0|kliknite tu alebo...|
|  http://404.xls.hu/| 212.52.166|168.0|     hu|  0.0|honlapkészítés404...|
|http://a--m--a--t...|    66.6.43|462.0|     en|  0.0|back top archiv r...|
|http://a-wrf.ru/c...|  78.108.80|126.0|unknown|  1.0|                    |
|http://a-wrf.ru/s...|  78.108.80|214.0|     ru|  1.0|установк фаркопна...|
+--------------------+-----------+-----+-------+-----+--------------------+

被预测的值是label.整个管道应用于它:

The value being predicted is label. The whole pipeline applied to it:

from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler, StringIndexer, OneHotEncoder, Tokenizer, HashingTF
from pyspark.ml.classification import LogisticRegression

train, test = munge(src_dataframe).randomSplit([70., 30.], seed=12345)

pipe_stages = [
    StringIndexer(inputCol='lang', outputCol='lang_idx'),
    OneHotEncoder(inputCol='lang_idx', outputCol='lang_onehot'),
    Tokenizer(inputCol='ip', outputCol='ip_tokens'),
    HashingTF(numFeatures=2**10, inputCol='ip_tokens', outputCol='ip_vector'),
    Tokenizer(inputCol='txt', outputCol='txt_tokens'),
    HashingTF(numFeatures=2**18, inputCol='txt_tokens', outputCol='txt_vector'),
    VectorAssembler(inputCols=['lang_onehot', 'ip_vector', 'txt_vector'], outputCol='features'),
    LogisticRegression(labelCol='label', featuresCol='features')
]

pipe = Pipeline(stages=pipe_stages)
pipemodel = pipe.fit(train)

这是堆栈跟踪:

Py4JJavaError: An error occurred while calling o10793.fit.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 18 in stage 627.0 failed 1 times, most recent failure: Lost task 18.0 in stage 627.0 (TID 23259, localhost): org.apache.spark.SparkException: Unseen label: pl-PL.
    at org.apache.spark.ml.feature.StringIndexerModel$$anonfun$4.apply(StringIndexer.scala:157)
    at org.apache.spark.ml.feature.StringIndexerModel$$anonfun$4.apply(StringIndexer.scala:153)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.evalExpr2$(Unknown Source)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
    at org.apache.spark.sql.execution.Project$$anonfun$1$$anonfun$apply$1.apply(basicOperators.scala:51)
    at org.apache.spark.sql.execution.Project$$anonfun$1$$anonfun$apply$1.apply(basicOperators.scala:49)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
    at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:389)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
    at org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:282)
    at org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:171)
    at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:78)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:268)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
    at org.apache.spark.scheduler.Task.run(Task.scala:89)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1431)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1419)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1418)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1418)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
    at scala.Option.foreach(Option.scala:236)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:799)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1640)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1599)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1588)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1832)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1952)
    at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:1025)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
    at org.apache.spark.rdd.RDD.reduce(RDD.scala:1007)
    at org.apache.spark.rdd.RDD$$anonfun$treeAggregate$1.apply(RDD.scala:1136)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
    at org.apache.spark.rdd.RDD.treeAggregate(RDD.scala:1113)
    at org.apache.spark.ml.classification.LogisticRegression.train(LogisticRegression.scala:271)
    at org.apache.spark.ml.classification.LogisticRegression.train(LogisticRegression.scala:159)
    at org.apache.spark.ml.Predictor.fit(Predictor.scala:90)
    at org.apache.spark.ml.Predictor.fit(Predictor.scala:71)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:497)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:381)
    at py4j.Gateway.invoke(Gateway.java:259)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:209)
    at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.spark.SparkException: Unseen label: pl-PL.
    at org.apache.spark.ml.feature.StringIndexerModel$$anonfun$4.apply(StringIndexer.scala:157)
    at org.apache.spark.ml.feature.StringIndexerModel$$anonfun$4.apply(StringIndexer.scala:153)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.evalExpr2$(Unknown Source)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
    at org.apache.spark.sql.execution.Project$$anonfun$1$$anonfun$apply$1.apply(basicOperators.scala:51)
    at org.apache.spark.sql.execution.Project$$anonfun$1$$anonfun$apply$1.apply(basicOperators.scala:49)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
    at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:389)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
    at org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:282)
    at org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:171)
    at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:78)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:268)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
    at org.apache.spark.scheduler.Task.run(Task.scala:89)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    ... 1 more

最有趣的一行是:

org.apache.spark.SparkException: Unseen label: pl-PL.

不知道,来自 lang 列的值 pl-PL 怎么会在 label 列中混淆,这是一个 float,而不是 string 一些草率的结论,感谢@zero323

No idea, how pl-PL which is a value from lang column could have gotten mixed up in the label column, which is a float, not string edited: some hasty coclusions, corrected thanks to @zero323

我进一步研究发现,pl-PL 是来自数据集测试部分的值,而不是训练值.所以现在我什至不知道去哪里寻找罪魁祸首:它很可能是 randomSplit 代码,而不是 StringIndexer,谁知道还有什么.

I've looked further into it and found, that pl-PL is a value from the testing part of the dataset, not training. So now I don't even know where to look for the culprit: it might easily be the randomSplit code, not StringIndexer, and who knows what else.

我该如何调查?

推荐答案

好吧,我想我明白了.至少我有这个工作.

Okay I think I got this. At least I got this working.

缓存数据帧(包括训练/测试部分)解决了这个问题.这就是我在这个 JIRA 问题中发现的:https://issues.apache.org/jira/浏览/SPARK-12590.

Caching the dataframe(including train/test partes) solves the problem. That's what I found in this JIRA issue: https://issues.apache.org/jira/browse/SPARK-12590.

所以这不是错误,只是 randomSample 可能在相同但不同分区的数据集上产生不同的结果.显然,我的一些处理函数(或 Pipeline)涉及重新分区,因此,根据其定义重新计算训练集的结果可能会有所不同.

So it's not a bug, just the fact that randomSample might yield a different result on the same, but differently partitioned dataset. And apparently, some of my munging functions (or Pipeline) involve repartition, therefore, results of the trainset recomputation from its definition might diverge.

我仍然感兴趣的是 - 它是可重复性:它总是混合在数据集的错误部分中的pl-PL"行,即它不是随机重新分区.它是确定性的,只是不一致.我想知道它究竟是怎么发生的.

What still interests me - it's the reproducibility: it's always 'pl-PL' row that gets mixed in the wrong part of the dataset, i.e. it's not random repartition. It's deterministic, just inconsistent. I wonder how exactly it happens.

这篇关于spark.ml StringIndexer 在 fit() 上抛出“Unseen label"的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆