spark.ml StringIndexer在fit()上抛出“看不见的标签" [英] spark.ml StringIndexer throws 'Unseen label' on fit()

查看:101
本文介绍了spark.ml StringIndexer在fit()上抛出“看不见的标签"的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在准备玩具spark.ml的示例. Spark version 1.6.0,在Oracle JDK version 1.8.0_65,pyspark和ipython笔记本上运行.

I'm preparing a toy spark.ml example. Spark version 1.6.0, running on top of Oracle JDK version 1.8.0_65, pyspark, ipython notebook.

首先,它与 Spark,ML,StringIndexer几乎没有关系:处理看不见的东西标签.在将管道拟合到数据集而不进行转换时引发异常.而且抑制异常可能不是解决方案,因为在这种情况下,恐怕数据集会变得很糟.

First, it hardly has anything to do with Spark, ML, StringIndexer: handling unseen labels. The exception is thrown while fitting a pipeline to a dataset, not transforming it. And suppressing the exception might not be a solution here, since, I'm afraid, the dataset gets messed pretty bad in this case.

我的数据集未压缩约为800Mb,因此可能难以复制(较小的子集似乎可以避免此问题).

My dataset is about 800Mb uncompressed, so it might be hard to reproduce (smaller subsets seem to dodge this issue).

数据集如下:

+--------------------+-----------+-----+-------+-----+--------------------+
|                 url|         ip|   rs|   lang|label|                 txt|
+--------------------+-----------+-----+-------+-----+--------------------+
|http://3d-detmold...|217.160.215|378.0|     de|  0.0|homwillkommskip c...|
|   http://3davto.ru/| 188.225.16|891.0|     id|  1.0|оформить заказ пе...|
| http://404.szm.com/|  85.248.42| 58.0|     cs|  0.0|kliknite tu alebo...|
|  http://404.xls.hu/| 212.52.166|168.0|     hu|  0.0|honlapkészítés404...|
|http://a--m--a--t...|    66.6.43|462.0|     en|  0.0|back top archiv r...|
|http://a-wrf.ru/c...|  78.108.80|126.0|unknown|  1.0|                    |
|http://a-wrf.ru/s...|  78.108.80|214.0|     ru|  1.0|установк фаркопна...|
+--------------------+-----------+-----+-------+-----+--------------------+

要预测的值是label.整个管道都适用于它:

The value being predicted is label. The whole pipeline applied to it:

from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler, StringIndexer, OneHotEncoder, Tokenizer, HashingTF
from pyspark.ml.classification import LogisticRegression

train, test = munge(src_dataframe).randomSplit([70., 30.], seed=12345)

pipe_stages = [
    StringIndexer(inputCol='lang', outputCol='lang_idx'),
    OneHotEncoder(inputCol='lang_idx', outputCol='lang_onehot'),
    Tokenizer(inputCol='ip', outputCol='ip_tokens'),
    HashingTF(numFeatures=2**10, inputCol='ip_tokens', outputCol='ip_vector'),
    Tokenizer(inputCol='txt', outputCol='txt_tokens'),
    HashingTF(numFeatures=2**18, inputCol='txt_tokens', outputCol='txt_vector'),
    VectorAssembler(inputCols=['lang_onehot', 'ip_vector', 'txt_vector'], outputCol='features'),
    LogisticRegression(labelCol='label', featuresCol='features')
]

pipe = Pipeline(stages=pipe_stages)
pipemodel = pipe.fit(train)

这是堆栈跟踪:

Py4JJavaError: An error occurred while calling o10793.fit.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 18 in stage 627.0 failed 1 times, most recent failure: Lost task 18.0 in stage 627.0 (TID 23259, localhost): org.apache.spark.SparkException: Unseen label: pl-PL.
    at org.apache.spark.ml.feature.StringIndexerModel$$anonfun$4.apply(StringIndexer.scala:157)
    at org.apache.spark.ml.feature.StringIndexerModel$$anonfun$4.apply(StringIndexer.scala:153)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.evalExpr2$(Unknown Source)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
    at org.apache.spark.sql.execution.Project$$anonfun$1$$anonfun$apply$1.apply(basicOperators.scala:51)
    at org.apache.spark.sql.execution.Project$$anonfun$1$$anonfun$apply$1.apply(basicOperators.scala:49)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
    at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:389)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
    at org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:282)
    at org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:171)
    at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:78)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:268)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
    at org.apache.spark.scheduler.Task.run(Task.scala:89)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1431)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1419)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1418)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1418)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
    at scala.Option.foreach(Option.scala:236)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:799)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1640)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1599)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1588)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1832)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1952)
    at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:1025)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
    at org.apache.spark.rdd.RDD.reduce(RDD.scala:1007)
    at org.apache.spark.rdd.RDD$$anonfun$treeAggregate$1.apply(RDD.scala:1136)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
    at org.apache.spark.rdd.RDD.treeAggregate(RDD.scala:1113)
    at org.apache.spark.ml.classification.LogisticRegression.train(LogisticRegression.scala:271)
    at org.apache.spark.ml.classification.LogisticRegression.train(LogisticRegression.scala:159)
    at org.apache.spark.ml.Predictor.fit(Predictor.scala:90)
    at org.apache.spark.ml.Predictor.fit(Predictor.scala:71)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:497)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:381)
    at py4j.Gateway.invoke(Gateway.java:259)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:209)
    at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.spark.SparkException: Unseen label: pl-PL.
    at org.apache.spark.ml.feature.StringIndexerModel$$anonfun$4.apply(StringIndexer.scala:157)
    at org.apache.spark.ml.feature.StringIndexerModel$$anonfun$4.apply(StringIndexer.scala:153)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.evalExpr2$(Unknown Source)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
    at org.apache.spark.sql.execution.Project$$anonfun$1$$anonfun$apply$1.apply(basicOperators.scala:51)
    at org.apache.spark.sql.execution.Project$$anonfun$1$$anonfun$apply$1.apply(basicOperators.scala:49)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
    at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:389)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
    at org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:282)
    at org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:171)
    at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:78)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:268)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
    at org.apache.spark.scheduler.Task.run(Task.scala:89)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    ... 1 more

最有趣的一行是:

org.apache.spark.SparkException: Unseen label: pl-PL.

不知道,如何将pl-PL(来自lang列的值)混入label列(是float而不是string )中:一些草率的结论,通过@ zero323

No idea, how pl-PL which is a value from lang column could have gotten mixed up in the label column, which is a float, not string edited: some hasty coclusions, corrected thanks to @zero323

我进一步研究发现,pl-PL是数据集测试部分的值,而不是训练的值.因此,现在我什至不知道在哪里寻找罪魁祸首:可能很容易是randomSplit代码,而不是StringIndexer,还有谁知道呢.

I've looked further into it and found, that pl-PL is a value from the testing part of the dataset, not training. So now I don't even know where to look for the culprit: it might easily be the randomSplit code, not StringIndexer, and who knows what else.

我该如何调查?

推荐答案

好的,我想我明白了.至少我能做到这一点.

Okay I think I got this. At least I got this working.

缓存数据框(包括训练/测试部分)可以解决此问题.这就是我在JIRA问题中发现的: https://issues.apache.org/jira/浏览/SPARK-12590 .

Caching the dataframe(including train/test partes) solves the problem. That's what I found in this JIRA issue: https://issues.apache.org/jira/browse/SPARK-12590.

所以这不是错误,只是randomSample可能在相同但分区不同的数据集上产生不同结果的事实.显然,我的某些调整功能(或Pipeline)涉及分区,因此,根据其定义对火车对进行重新计算的结果可能会有所不同.

So it's not a bug, just the fact that randomSample might yield a different result on the same, but differently partitioned dataset. And apparently, some of my munging functions (or Pipeline) involve repartition, therefore, results of the trainset recomputation from its definition might diverge.

我仍然感兴趣的是-可重复性:总是"pl-PL"行混入数据集的错误部分,即不是随机分区.它是确定性的,只是不一致的.我想知道它到底是怎么发生的.

What still interests me - it's the reproducibility: it's always 'pl-PL' row that gets mixed in the wrong part of the dataset, i.e. it's not random repartition. It's deterministic, just inconsistent. I wonder how exactly it happens.

这篇关于spark.ml StringIndexer在fit()上抛出“看不见的标签"的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆