Apache Spark 中的数据集 [英] Datasets in Apache Spark

查看:30
本文介绍了Apache Spark 中的数据集的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

Dataset<Tweet> ds = sc.read().json("path").as(Encoders.bean(Tweet.class));
ds.show();
JavaRDD<Tweet> dstry = ds.toJavaRDD();
System.out.println(dstry.first().getClass());

Caused by: java.util.concurrent.ExecutionException: org.codehaus.commons.compiler.CompileException: File 'generated.java', Line 50, Column 16: failed to compile: org.codehaus.commons.compiler.CompileException: File 'generated.java', Line 50, Column 16: No applicable constructor/method found for actual parameters "org.apache.spark.unsafe.types.UTF8String"; candidates are: "public void sparkSQL.Tweet.setId(long)"
    at org.spark_project.guava.util.concurrent.AbstractFuture$Sync.getValue(AbstractFuture.java:306)
    at org.spark_project.guava.util.concurrent.AbstractFuture$Sync.get(AbstractFuture.java:293)
    at org.spark_project.guava.util.concurrent.AbstractFuture.get(AbstractFuture.java:116)
    at org.spark_project.guava.util.concurrent.Uninterruptibles.getUninterruptibly(Uninterruptibles.java:135)
    at org.spark_project.guava.cache.LocalCache$Segment.getAndRecordStats(LocalCache.java:2410)
    at org.spark_project.guava.cache.LocalCache$Segment.loadSync(LocalCache.java:2380)
    at org.spark_project.guava.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2342)
    at org.spark_project.guava.cache.LocalCache$Segment.get(LocalCache.java:2257)
    at org.spark_project.guava.cache.LocalCache.get(LocalCache.java:4000)
    at org.spark_project.guava.cache.LocalCache.getOrLoad(LocalCache.java:4004)
    at org.spark_project.guava.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4874)
    at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$.compile(CodeGenerator.scala:1369)
    at org.apache.spark.sql.catalyst.expressions.codegen.GenerateSafeProjection$.create(GenerateSafeProjection.scala:197)
    at org.apache.spark.sql.catalyst.expressions.codegen.GenerateSafeProjection$.create(GenerateSafeProjection.scala:36)
    at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator.generate(CodeGenerator.scala:1325)
    at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator.generate(CodeGenerator.scala:1322)
    at org.apache.spark.sql.execution.DeserializeToObjectExec$$anonfun$2.apply(objects.scala:90)
    at org.apache.spark.sql.execution.DeserializeToObjectExec$$anonfun$2.apply(objects.scala:89)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndexInternal$1$$anonfun$apply$24.apply(RDD.scala:818)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndexInternal$1$$anonfun$apply$24.apply(RDD.scala:818)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    at org.apache.spark.scheduler.Task.run(Task.scala:109)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.codehaus.commons.compiler.CompileException: File 'generated.java', Line 50, Column 16: failed to compile: org.codehaus.commons.compiler.CompileException: File 'generated.java', Line 50, Column 16: No applicable constructor/method found for actual parameters "org.apache.spark.unsafe.types.UTF8String"; candidates are: "public void sparkSQL.Tweet.setId(long)"
    at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$.org$apache$spark$sql$catalyst$expressions$codegen$CodeGenerator$$doCompile(CodeGenerator.scala:1435)
    at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$$anon$1.load(CodeGenerator.scala:1497)
    at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$$anon$1.load(CodeGenerator.scala:1494)
    at org.spark_project.guava.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3599)
    at org.spark_project.guava.cache.LocalCache$Segment.loadSync(LocalCache.java:2379)

当我仔细观察时,我唯一提出疑问的是:

When I looked closely the only thing I raised my doubts is this:

未找到适用于实际参数org.apache.spark.unsafe.types.UTF8String"的构造函数/方法;候选人是:public void sparkSQL.Tweet.setId(long)"

No applicable constructor/method found for actual parameters "org.apache.spark.unsafe.types.UTF8String"; candidates are: "public void sparkSQL.Tweet.setId(long)"

推荐答案

正如@user9718686 所写,id 字段有不同的类型:json 文件中的 Stringlong 在您的类定义中.当您将其读入 Dataset 时,Spark 会从文件中推断架构并检测到 id 的类型为 String,这就是为什么它在您尝试时起作用的原因打印它(正如您在评论之一中所要求的那样).如果要将数据帧设为 Dataset,则必须更改 json 文件以使用 long ids 而不是 String 或当您尝试执行任何 操作时,您可以让 Spark 投射此 ID 对您的数据帧进行操作.

As @user9718686 wrote, you have different types for the id field: String in your json file, and long in your class definition. When you read it into Dataset<Row>, then Spark infers the schema from the file and detects that the id is of type String and that is why it worked when you tried to print it (as you asked for this in one of your comments). If you want to have the dataframe as Dataset<Tweet>, then you have to change your json files to use long ids instead of String or you can let Spark cast this id when you try to perform any action operation on your dataframe.

Dataset<Row> rowDataset = sc.read().json("path");
Dataset<Tweet> tweetDataset = rowDataset
                .withColumn("id", rowDataset.col("id").cast(DataTypes.LongType))
                .as(Encoders.bean(Tweet.class));
tweetDataset.printSchema();
System.out.println(tweetDataset.head().getId());

这篇关于Apache Spark 中的数据集的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆