Apache Spark中的数据集 [英] Datasets in Apache Spark

查看:820
本文介绍了Apache Spark中的数据集的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

Dataset<Tweet> ds = sc.read().json("path").as(Encoders.bean(Tweet.class));
ds.show();
JavaRDD<Tweet> dstry = ds.toJavaRDD();
System.out.println(dstry.first().getClass());





Caused by: java.util.concurrent.ExecutionException: org.codehaus.commons.compiler.CompileException: File 'generated.java', Line 50, Column 16: failed to compile: org.codehaus.commons.compiler.CompileException: File 'generated.java', Line 50, Column 16: No applicable constructor/method found for actual parameters "org.apache.spark.unsafe.types.UTF8String"; candidates are: "public void sparkSQL.Tweet.setId(long)"
    at org.spark_project.guava.util.concurrent.AbstractFuture$Sync.getValue(AbstractFuture.java:306)
    at org.spark_project.guava.util.concurrent.AbstractFuture$Sync.get(AbstractFuture.java:293)
    at org.spark_project.guava.util.concurrent.AbstractFuture.get(AbstractFuture.java:116)
    at org.spark_project.guava.util.concurrent.Uninterruptibles.getUninterruptibly(Uninterruptibles.java:135)
    at org.spark_project.guava.cache.LocalCache$Segment.getAndRecordStats(LocalCache.java:2410)
    at org.spark_project.guava.cache.LocalCache$Segment.loadSync(LocalCache.java:2380)
    at org.spark_project.guava.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2342)
    at org.spark_project.guava.cache.LocalCache$Segment.get(LocalCache.java:2257)
    at org.spark_project.guava.cache.LocalCache.get(LocalCache.java:4000)
    at org.spark_project.guava.cache.LocalCache.getOrLoad(LocalCache.java:4004)
    at org.spark_project.guava.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4874)
    at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$.compile(CodeGenerator.scala:1369)
    at org.apache.spark.sql.catalyst.expressions.codegen.GenerateSafeProjection$.create(GenerateSafeProjection.scala:197)
    at org.apache.spark.sql.catalyst.expressions.codegen.GenerateSafeProjection$.create(GenerateSafeProjection.scala:36)
    at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator.generate(CodeGenerator.scala:1325)
    at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator.generate(CodeGenerator.scala:1322)
    at org.apache.spark.sql.execution.DeserializeToObjectExec$$anonfun$2.apply(objects.scala:90)
    at org.apache.spark.sql.execution.DeserializeToObjectExec$$anonfun$2.apply(objects.scala:89)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndexInternal$1$$anonfun$apply$24.apply(RDD.scala:818)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndexInternal$1$$anonfun$apply$24.apply(RDD.scala:818)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    at org.apache.spark.scheduler.Task.run(Task.scala:109)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.codehaus.commons.compiler.CompileException: File 'generated.java', Line 50, Column 16: failed to compile: org.codehaus.commons.compiler.CompileException: File 'generated.java', Line 50, Column 16: No applicable constructor/method found for actual parameters "org.apache.spark.unsafe.types.UTF8String"; candidates are: "public void sparkSQL.Tweet.setId(long)"
    at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$.org$apache$spark$sql$catalyst$expressions$codegen$CodeGenerator$$doCompile(CodeGenerator.scala:1435)
    at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$$anon$1.load(CodeGenerator.scala:1497)
    at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$$anon$1.load(CodeGenerator.scala:1494)
    at org.spark_project.guava.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3599)
    at org.spark_project.guava.cache.LocalCache$Segment.loadSync(LocalCache.java:2379)

当我仔细观察时,我唯一提出疑问的是:

When I looked closely the only thing I raised my doubts is this:


找不到适用的构造函数/方法实际参数org.apache.spark.unsafe.types.UTF8String;候选人是:public void sparkSQL.Tweet.setId(long)

No applicable constructor/method found for actual parameters "org.apache.spark.unsafe.types.UTF8String"; candidates are: "public void sparkSQL.Tweet.setId(long)"


推荐答案

As @ user9718686写道,你有id字段的不同类型:json文件中的 String ,以及类定义中的 long 。当您将其读入数据集< Row> 时,Spark会从文件中推断出架构,并检测到该ID的类型为 String 这就是为什么当你试图打印时它起作用的原因(正如你在其中一条评论中提到的那样)。如果您希望将数据框设置为数据集< Tweet> ,则必须更改您的json文件以使用 long ids而不是 String 或者当你尝试执行任何操作操作

As @user9718686 wrote, you have different types for the id field: String in your json file, and long in your class definition. When you read it into Dataset<Row>, then Spark infers the schema from the file and detects that the id is of type String and that is why it worked when you tried to print it (as you asked for this in one of your comments). If you want to have the dataframe as Dataset<Tweet>, then you have to change your json files to use long ids instead of String or you can let Spark cast this id when you try to perform any action operation on your dataframe.

Dataset<Row> rowDataset = sc.read().json("path");
Dataset<Tweet> tweetDataset = rowDataset
                .withColumn("id", rowDataset.col("id").cast(DataTypes.LongType))
                .as(Encoders.bean(Tweet.class));
tweetDataset.printSchema();
System.out.println(tweetDataset.head().getId());

这篇关于Apache Spark中的数据集的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆