Apache Spark 中的数据集 [英] Datasets in Apache Spark
问题描述
Dataset<Tweet> ds = sc.read().json("path").as(Encoders.bean(Tweet.class));
ds.show();
JavaRDD<Tweet> dstry = ds.toJavaRDD();
System.out.println(dstry.first().getClass());
Caused by: java.util.concurrent.ExecutionException: org.codehaus.commons.compiler.CompileException: File 'generated.java', Line 50, Column 16: failed to compile: org.codehaus.commons.compiler.CompileException: File 'generated.java', Line 50, Column 16: No applicable constructor/method found for actual parameters "org.apache.spark.unsafe.types.UTF8String"; candidates are: "public void sparkSQL.Tweet.setId(long)"
at org.spark_project.guava.util.concurrent.AbstractFuture$Sync.getValue(AbstractFuture.java:306)
at org.spark_project.guava.util.concurrent.AbstractFuture$Sync.get(AbstractFuture.java:293)
at org.spark_project.guava.util.concurrent.AbstractFuture.get(AbstractFuture.java:116)
at org.spark_project.guava.util.concurrent.Uninterruptibles.getUninterruptibly(Uninterruptibles.java:135)
at org.spark_project.guava.cache.LocalCache$Segment.getAndRecordStats(LocalCache.java:2410)
at org.spark_project.guava.cache.LocalCache$Segment.loadSync(LocalCache.java:2380)
at org.spark_project.guava.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2342)
at org.spark_project.guava.cache.LocalCache$Segment.get(LocalCache.java:2257)
at org.spark_project.guava.cache.LocalCache.get(LocalCache.java:4000)
at org.spark_project.guava.cache.LocalCache.getOrLoad(LocalCache.java:4004)
at org.spark_project.guava.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4874)
at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$.compile(CodeGenerator.scala:1369)
at org.apache.spark.sql.catalyst.expressions.codegen.GenerateSafeProjection$.create(GenerateSafeProjection.scala:197)
at org.apache.spark.sql.catalyst.expressions.codegen.GenerateSafeProjection$.create(GenerateSafeProjection.scala:36)
at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator.generate(CodeGenerator.scala:1325)
at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator.generate(CodeGenerator.scala:1322)
at org.apache.spark.sql.execution.DeserializeToObjectExec$$anonfun$2.apply(objects.scala:90)
at org.apache.spark.sql.execution.DeserializeToObjectExec$$anonfun$2.apply(objects.scala:89)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndexInternal$1$$anonfun$apply$24.apply(RDD.scala:818)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndexInternal$1$$anonfun$apply$24.apply(RDD.scala:818)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:109)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.codehaus.commons.compiler.CompileException: File 'generated.java', Line 50, Column 16: failed to compile: org.codehaus.commons.compiler.CompileException: File 'generated.java', Line 50, Column 16: No applicable constructor/method found for actual parameters "org.apache.spark.unsafe.types.UTF8String"; candidates are: "public void sparkSQL.Tweet.setId(long)"
at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$.org$apache$spark$sql$catalyst$expressions$codegen$CodeGenerator$$doCompile(CodeGenerator.scala:1435)
at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$$anon$1.load(CodeGenerator.scala:1497)
at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$$anon$1.load(CodeGenerator.scala:1494)
at org.spark_project.guava.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3599)
at org.spark_project.guava.cache.LocalCache$Segment.loadSync(LocalCache.java:2379)
当我仔细观察时,我唯一提出疑问的是:
When I looked closely the only thing I raised my doubts is this:
未找到适用于实际参数org.apache.spark.unsafe.types.UTF8String"的构造函数/方法;候选人是:public void sparkSQL.Tweet.setId(long)"
No applicable constructor/method found for actual parameters "org.apache.spark.unsafe.types.UTF8String"; candidates are: "public void sparkSQL.Tweet.setId(long)"
推荐答案
正如@user9718686 所写,id 字段有不同的类型:json 文件中的 String
和 long
在您的类定义中.当您将其读入 Dataset
时,Spark 会从文件中推断架构并检测到 id 的类型为 String
,这就是为什么它在您尝试时起作用的原因打印它(正如您在评论之一中所要求的那样).如果要将数据帧设为 Dataset
,则必须更改 json 文件以使用 long
ids 而不是 String
或当您尝试执行任何 操作时,您可以让 Spark 投射此 ID 对您的数据帧进行操作.
As @user9718686 wrote, you have different types for the id field: String
in your json file, and long
in your class definition. When you read it into Dataset<Row>
, then Spark infers the schema from the file and detects that the id is of type String
and that is why it worked when you tried to print it (as you asked for this in one of your comments). If you want to have the dataframe as Dataset<Tweet>
, then you have to change your json files to use long
ids instead of String
or you can let Spark cast this id when you try to perform any action operation on your dataframe.
Dataset<Row> rowDataset = sc.read().json("path");
Dataset<Tweet> tweetDataset = rowDataset
.withColumn("id", rowDataset.col("id").cast(DataTypes.LongType))
.as(Encoders.bean(Tweet.class));
tweetDataset.printSchema();
System.out.println(tweetDataset.head().getId());
这篇关于Apache Spark 中的数据集的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!