从 DataFrame 转换为 JavaPairRDD<Long, Vector> [英] Convert from DataFrame to JavaPairRDD&lt;Long, Vector&gt;

查看:26
本文介绍了从 DataFrame 转换为 JavaPairRDD<Long, Vector>的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试使用带有 Java API 的 apache spark 来实现 LDA 算法.方法 LDA().run() 接受参数 JavaPairRDD 文件.我使用 Scala 来创建 RDD[(Long, Vector)] 如下:

I'm trying to implement LDA algorithm using apache spark with Java API. Method LDA().run() accept parameter JavaPairRDD documents. I have use scala for create RDD[(Long, Vector)] follow:

val countVectors = cvModel.transform(filteredTokens)
    .select("docId", "features")
    .map { case Row(docId: Long, countVector: Vector) => (docId, countVector) }
    .cache()

然后输入到LDA中:

lda.run(countVectors)

但是在 Java API 中,我使用以下代码获得了 CountVectorizerModel:

But in Java API, I have CountVectorizerModel by using follow code:

CountVectorizerModel cvModel = new CountVectorizer()
        .setInputCol("filtered").setOutputCol("features")
        .setVocabSize(vocabSize).fit(filteredTokens);

看起来像这样:

(0,(22,[0,8,9,10,14,16,18],
[1.0,1.0,1.0,1.0,1.0,1.0,1.0]))
(1,(22,[0,1,2,3,4,5,6,7,11,12,13,15,17,19,20,21],
1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0]))

如果我想从 cvModel 转换成 JavaPairRDD countVectors 应该怎么做?我试过这个:

What should I do if I want to transform from cvModel into JavaPairRDD countVectors? I have try this:

JavaPairRDD<Long, Vector> countVectors = cvModel.transform(filteredTokens)
          .select("docId", "features").toJavaRDD()
          .mapToPair(new PairFunction<Row, Long, Vector>() {
            public Tuple2<Long, Vector> call(Row row) throws Exception {
                return new Tuple2<Long, Vector>(Long.parseLong(row.getString(0)), Vectors.dense(row.getDouble(1)));
            }
          }).cache();

但它不起作用.尝试时出现异常:

But it does not work. I got exception when try:

Vectors.dense(row.getDouble(1))

所以,如果您有将 DataFrame cvModel 转换为 JavaPairRDD 的理想选择,请告诉我.

So, If you have any ideal for convert from DataFrame cvModel into JavaPairRDD please tell me.

我使用的是 Spark 和 MLlib 1.5.1 以及 Java8

I am using Spark and MLlib 1.5.1, and Java8

非常感谢任何帮助.谢谢这是我尝试从 DataFrame 转换为 JavaPairRDD 时的异常日志文件

Any help is highly appreciated. Thanks Here is exception log file when I try to convert from DataFrame into JavaPairRDD

15/10/25 10:03:07 ERROR Executor: Exception in task 0.0 in stage 7.0     (TID 6)
java.lang.ClassCastException: java.lang.Long cannot be cast to      java.lang.String
at org.apache.spark.sql.Row$class.getString(Row.scala:249)
at org.apache.spark.sql.catalyst.expressions.GenericRow.getString(rows.scala:191)
at UIT_LDA_ONLINE.LDAOnline$2.call(LDAOnline.java:88)
at UIT_LDA_ONLINE.LDAOnline$2.call(LDAOnline.java:1)
at org.apache.spark.api.java.JavaPairRDD$$anonfun$pairFunToScalaFun$1.apply(JavaPairRDD.scala:1030)
at org.apache.spark.api.java.JavaPairRDD$$anonfun$pairFunToScalaFun$1.apply(JavaPairRDD.scala:1030)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:278)
at org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:171)
at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:78)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:262)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:88)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
15/10/25 10:03:07 WARN TaskSetManager: Lost task 0.0 in stage 7.0 (TID 6, localhost): java.lang.ClassCastException: java.lang.Long cannot be cast to java.lang.String
at org.apache.spark.sql.Row$class.getString(Row.scala:249)
at org.apache.spark.sql.catalyst.expressions.GenericRow.getString(rows.scala:191)
at UIT_LDA_ONLINE.LDAOnline$2.call(LDAOnline.java:88)
at UIT_LDA_ONLINE.LDAOnline$2.call(LDAOnline.java:1)
at org.apache.spark.api.java.JavaPairRDD$$anonfun$pairFunToScalaFun$1.apply(JavaPairRDD.scala:1030)
at org.apache.spark.api.java.JavaPairRDD$$anonfun$pairFunToScalaFun$1.apply(JavaPairRDD.scala:1030)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:278)
at org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:171)
at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:78)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:262)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:88)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

推荐答案

现在我们有了错误堆栈,这里是错误:

Now that we have the error stack, here is the error:

您正在尝试从行中获取字符串,而您的字段是 Long,因此您需要将 row.getString(0) 替换为 row.getLong(0) 对于初学者.

You are trying to get a string from the row whereas your fields is a Long so you'll need to replace row.getString(0) by row.getLong(0) for starters.

一旦您更正此问题,您将遇到来自相同类型但处于不同级别的其他错误,我可以通过提供的信息指出这些错误,但如果您应用以下内容,您将能够解决这些错误:

Once you correct this, you'll run into other errors from the same type but on different levels, which I can point out with information given but you'll be able to solve them if you apply the following:

行 getter 特定于每种字段类型,您需要使用正确的 get 方法.

The row getters are specific for each field type, you'll need to use the correct get method.

如果你不确定你需要使用的方法,你可以使用DataFrame上的printSchema方法来检查每个字段的类型,然后你可以进行所有描述的类型转换官方文档这里.

To know the method you need to use if you are not sure, you can use the printSchema method on your DataFrame to check the types of each field and then you can all the type conversion described in the official documentation here.

这篇关于从 DataFrame 转换为 JavaPairRDD<Long, Vector>的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆