转换数据帧到JavaPairRDD<长,向量> [英] Convert from DataFrame to JavaPairRDD<Long, Vector>

查看:482
本文介绍了转换数据帧到JavaPairRDD<长,向量>的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我想使用Apache的火花与Java API来实现LDA算法。方法LDA()。run()中接受参数JavaPairRDD文件。
我使用斯卡拉创建RDD [(长,向量)如下:

  VAL countVectors = cvModel.transform(filteredTokens)
    。选择(的docId,功能)
    .MAP {情况下排(的docId:长,countVector:矢量)=> (的docId,countVector)}
    .cache()

然后输入到LDA:

  lda.run(countVectors)

但在Java API,我有CountVectorizerModel通过后续code:

  CountVectorizerModel cvModel =新CountVectorizer()
        .setInputCol(过滤)。setOutputCol(特征)
        .setVocabSize(vocabSize).fit(filteredTokens);

看起来像:

 (0,(22,[0,8,9,10,14,16,18]
[1.0,1.0,1.0,1.0,1.0,1.0,1.0))
(1,(22,[0,1,2,3,4,5,6,7,11,12,13,15,17,19,20,21]
1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0]))

我应该怎么做,如果我想从cvModel变换成JavaPairRDD countVectors?
我试试这个:

  JavaPairRDD<长,向量> countVectors = cvModel.transform(filteredTokens)
          。选择(的docId,功能)。toJavaRDD()
          .mapToPair(新PairFunction<行,龙,矢量>(){
            公共Tuple2<长,向量>调用(鳞次栉比)抛出异常{
                返回新Tuple2<长,向量>(的Long.parseLong(row.getString(0)),Vectors.dense(row.getDouble(1)));
            }
          })高速缓存()。

但它不工作。我有例外的时候尝试:

  Vectors.dense(row.getDouble(1))

所以,如果您有任何理想的转换数据帧cvModel到JavaPairRDD请告诉我。

我使用Spark和MLlib 1.5.1和Java8

任何帮助是非常AP preciated。谢谢
下面是异常日志文件,当我尝试从数据帧转换成JavaPairRDD

  15/10/25 10点03分07秒错误执行人:异常的任务在0.0 7.0阶段(TID 6)
java.lang.ClassCastException:java.lang.Long中不能转换为java.lang.String
在org.apache.spark.sql.Row $ class.getString(Row.scala:249)
在org.apache.spark.sql.catalyst.ex pressions.GenericRow.getString(rows.scala:191)
在UIT_LDA_ONLINE.LDAOnline $ 2.call(LDAOnline.java:88)
在UIT_LDA_ONLINE.LDAOnline $ 2.call(LDAOnline.java:1)
在org.apache.spark.api.java.JavaPairRDD $$ anonfun $ pairFunToScalaFun $ 1.适用(JavaPairRDD.scala:1030)
在org.apache.spark.api.java.JavaPairRDD $$ anonfun $ pairFunToScalaFun $ 1.适用(JavaPairRDD.scala:1030)
在scala.collection.Iterator $$不久$ 11.next(Iterator.scala:328)
在org.apache.spark.storage.MemoryStore.unrollSafely(M​​emoryStore.scala:278)
在org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:171)
在org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:78)
在org.apache.spark.rdd.RDD.iterator(RDD.scala:262)
在org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
在org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
在org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
在org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
在org.apache.spark.scheduler.Task.run(Task.scala:88)
在org.apache.spark.executor.Executor $ TaskRunner.run(Executor.scala:214)
在java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
在java.util.concurrent.ThreadPoolExecutor中的$ Worker.run(ThreadPoolExecutor.java:617)
在java.lang.Thread.run(Thread.java:745)
15/10/25 10点03分07秒WARN TaskSetManager:失落任务0.0舞台7.0(TID 6,为localhost):java.lang.ClassCastException:java.lang.Long中不能转换为java.lang.String
在org.apache.spark.sql.Row $ class.getString(Row.scala:249)
在org.apache.spark.sql.catalyst.ex pressions.GenericRow.getString(rows.scala:191)
在UIT_LDA_ONLINE.LDAOnline $ 2.call(LDAOnline.java:88)
在UIT_LDA_ONLINE.LDAOnline $ 2.call(LDAOnline.java:1)
在org.apache.spark.api.java.JavaPairRDD $$ anonfun $ pairFunToScalaFun $ 1.适用(JavaPairRDD.scala:1030)
在org.apache.spark.api.java.JavaPairRDD $$ anonfun $ pairFunToScalaFun $ 1.适用(JavaPairRDD.scala:1030)
在scala.collection.Iterator $$不久$ 11.next(Iterator.scala:328)
在org.apache.spark.storage.MemoryStore.unrollSafely(M​​emoryStore.scala:278)
在org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:171)
在org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:78)
在org.apache.spark.rdd.RDD.iterator(RDD.scala:262)
在org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
在org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
在org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
在org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
在org.apache.spark.scheduler.Task.run(Task.scala:88)
在org.apache.spark.executor.Executor $ TaskRunner.run(Executor.scala:214)
在java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
在java.util.concurrent.ThreadPoolExecutor中的$ Worker.run(ThreadPoolExecutor.java:617)
在java.lang.Thread.run(Thread.java:745)


解决方案

现在,我们有错误堆栈,这里是错误:

您正试图获取行,而你的领域字符串是一个长期的,所以你需要替换 row.getString(0) row.getLong(0)对于初学者。

一旦你解决这个问题,你会碰到来自同一类型的其他错误,但在不同的水平,我可以指出,随着信息给出,但你可以,如果你采用以下解决这些问题:

行干将是特定于每个字段类型,你需要使用正确的get方法。

要了解你需要的,如果你不知道,你可以用你的数据框的 printSchema 方法来检查类型的每个字段的使用方法,然后你可以官方文档这里

I'm trying to implement LDA algorithm using apache spark with Java API. Method LDA().run() accept parameter JavaPairRDD documents. I have use scala for create RDD[(Long, Vector)] follow:

val countVectors = cvModel.transform(filteredTokens)
    .select("docId", "features")
    .map { case Row(docId: Long, countVector: Vector) => (docId, countVector) }
    .cache()

And then input into LDA:

lda.run(countVectors)

But in Java API, I have CountVectorizerModel by using follow code:

CountVectorizerModel cvModel = new CountVectorizer()
        .setInputCol("filtered").setOutputCol("features")
        .setVocabSize(vocabSize).fit(filteredTokens);

look like that:

(0,(22,[0,8,9,10,14,16,18],
[1.0,1.0,1.0,1.0,1.0,1.0,1.0]))
(1,(22,[0,1,2,3,4,5,6,7,11,12,13,15,17,19,20,21],
1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0]))

What should I do if I want to transform from cvModel into JavaPairRDD countVectors? I have try this:

JavaPairRDD<Long, Vector> countVectors = cvModel.transform(filteredTokens)
          .select("docId", "features").toJavaRDD()
          .mapToPair(new PairFunction<Row, Long, Vector>() {
            public Tuple2<Long, Vector> call(Row row) throws Exception {
                return new Tuple2<Long, Vector>(Long.parseLong(row.getString(0)), Vectors.dense(row.getDouble(1)));
            }
          }).cache();

But it does not work. I got exception when try:

Vectors.dense(row.getDouble(1))

So, If you have any ideal for convert from DataFrame cvModel into JavaPairRDD please tell me.

I am using Spark and MLlib 1.5.1, and Java8

Any help is highly appreciated. Thanks Here is exception log file when I try to convert from DataFrame into JavaPairRDD

15/10/25 10:03:07 ERROR Executor: Exception in task 0.0 in stage 7.0     (TID 6)
java.lang.ClassCastException: java.lang.Long cannot be cast to      java.lang.String
at org.apache.spark.sql.Row$class.getString(Row.scala:249)
at org.apache.spark.sql.catalyst.expressions.GenericRow.getString(rows.scala:191)
at UIT_LDA_ONLINE.LDAOnline$2.call(LDAOnline.java:88)
at UIT_LDA_ONLINE.LDAOnline$2.call(LDAOnline.java:1)
at org.apache.spark.api.java.JavaPairRDD$$anonfun$pairFunToScalaFun$1.apply(JavaPairRDD.scala:1030)
at org.apache.spark.api.java.JavaPairRDD$$anonfun$pairFunToScalaFun$1.apply(JavaPairRDD.scala:1030)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:278)
at org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:171)
at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:78)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:262)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:88)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
15/10/25 10:03:07 WARN TaskSetManager: Lost task 0.0 in stage 7.0 (TID 6, localhost): java.lang.ClassCastException: java.lang.Long cannot be cast to java.lang.String
at org.apache.spark.sql.Row$class.getString(Row.scala:249)
at org.apache.spark.sql.catalyst.expressions.GenericRow.getString(rows.scala:191)
at UIT_LDA_ONLINE.LDAOnline$2.call(LDAOnline.java:88)
at UIT_LDA_ONLINE.LDAOnline$2.call(LDAOnline.java:1)
at org.apache.spark.api.java.JavaPairRDD$$anonfun$pairFunToScalaFun$1.apply(JavaPairRDD.scala:1030)
at org.apache.spark.api.java.JavaPairRDD$$anonfun$pairFunToScalaFun$1.apply(JavaPairRDD.scala:1030)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:278)
at org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:171)
at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:78)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:262)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:88)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

解决方案

Now that we have the error stack, here is the error:

You are trying to get a string from the row whereas your fields is a Long so you'll need to replace row.getString(0) by row.getLong(0) for starters.

Once you correct this, you'll run into other errors from the same type but on different levels, which I can point out with information given but you'll be able to solve them if you apply the following:

The row getters are specific for each field type, you'll need to use the correct get method.

To know the method you need to use if you are not sure, you can use the printSchema method on your DataFrame to check the types of each field and then you can all the type conversion described in the official documentation here.

这篇关于转换数据帧到JavaPairRDD&LT;长,向量&GT;的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆