Java,Spark和Cassandra java.lang.ClassCastException:com.datastax.driver.core.DefaultResultSetFuture无法投射为阴影 [英] Java, Spark and Cassandra java.lang.ClassCastException: com.datastax.driver.core.DefaultResultSetFuture cannot be cast to shade
问题描述
尝试将数据写入我的Cassandra数据库时出错。
I got an error trying to write data to my Cassandra database.
我在这里得到的内容:
1)Dictionary.java
What I got here: 1) Dictionary.java
package com.chatSparkConnactionTest;
import java.io.Serializable;
public class Dictionary implements Serializable{
private String value_id;
private String d_name;
private String d_value;
public Dictionary(){}
public Dictionary (String value_id, String d_name, String d_value) {
this.setValue_id(value_id);
this.setD_name(d_name);
this.setD_value(d_value);
}
public String getValue_id() {
return value_id;
}
public void setValue_id(String value_id) {
this.value_id = value_id;
}
public String getD_name() {
return d_name;
}
public void setD_name(String d_name) {
this.d_name = d_name;
}
public String getD_value() {
return d_value;
}
public void setD_value(String d_value) {
this.d_value = d_value;
}
}
我的主班:
package com.chatSparkConnactionTest;
import static com.datastax.spark.connector.japi.CassandraJavaUtil.javaFunctions;
import java.io.Serializable;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import com.datastax.spark.connector.japi.CassandraJavaUtil;
import com.datastax.spark.connector.japi.CassandraRow;
import com.datastax.spark.connector.japi.SparkContextJavaFunctions;
import com.datastax.spark.connector.japi.rdd.CassandraJavaRDD;
import com.datastax.driver.core.Session;
import com.datastax.spark.connector.cql.CassandraConnector;
import com.datastax.spark.connector.japi.CassandraRow;
import com.google.common.base.Objects;
import org.apache.avro.data.Json;
import org.apache.hadoop.util.StringUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
//import org.apache.spark.sql.SchemaRDD;
//import org.apache.spark.sql.cassandra.CassandraSQLContext;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Date;
import java.util.List;
import static com.datastax.spark.connector.japi.CassandraJavaUtil.javaFunctions;
import static com.datastax.spark.connector.japi.CassandraJavaUtil.mapRowTo;
import static com.datastax.spark.connector.japi.CassandraJavaUtil.mapToRow;
import com.datastax.spark.connector.japi.CassandraRow;
import com.fasterxml.jackson.core.JsonParseException;
import com.fasterxml.jackson.databind.JsonNode;
import org.apache.spark.api.java.function.Function;
public class JavaDemoRDDWrite implements Serializable {
private static final long serialVersionUID = 1L;
public static void main(String[] args) {
SparkConf conf = new SparkConf().
setAppName("chat").
setMaster("local").
set("spark.cassandra.connection.host", "127.0.0.1");
JavaSparkContext sc = new JavaSparkContext(conf);
List<Dictionary> dictionary = Arrays.asList(
new Dictionary("7", "n1", "v1"),
new Dictionary("8", "n2", "v2"),
new Dictionary("9", "n3", "v3")
);
for (Dictionary dictionaryRow : dictionary) {
System.out.println("id: " + dictionaryRow.getValue_id());
System.out.println("name: " + dictionaryRow.getD_name());
System.out.println("value: " + dictionaryRow.getD_value());
}
JavaRDD<Dictionary> rdd = sc.parallelize(dictionary);
System.out.println("Total rdd rows: " + rdd.collect().size());
javaFunctions(rdd)
.writerBuilder("chat", "dictionary",
mapToRow(Dictionary.class))
.saveToCassandra();
};
}
Pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>chat_connaction_test</groupId>
<artifactId>ChatSparkConnectionTest</artifactId>
<version>0.0.1-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>com.datastax.cassandra</groupId>
<artifactId>cassandra-driver-core</artifactId>
<version>3.1.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.0.0</version>
</dependency>
<dependency>
<groupId>com.datastax.spark</groupId>
<artifactId>spark-cassandra-connector_2.11</artifactId>
<version>2.0.0-M3</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.10</artifactId>
<version>2.0.0</version>
</dependency>
</dependencies>
</project>
这是错误文本:
java.lang.ClassCastException: com.datastax.driver.core.DefaultResultSetFuture cannot be cast to shade.com.datastax.spark.connector.google.common.util.concurrent.ListenableFuture
at com.datastax.spark.connector.writer.AsyncExecutor.executeAsync(AsyncExecutor.scala:31)
at com.datastax.spark.connector.writer.TableWriter$$anonfun$write$1$$anonfun$apply$2.apply(TableWriter.scala:159)
at com.datastax.spark.connector.writer.TableWriter$$anonfun$write$1$$anonfun$apply$2.apply(TableWriter.scala:158)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at com.datastax.spark.connector.writer.GroupingBatchBuilder.foreach(GroupingBatchBuilder.scala:31)
at com.datastax.spark.connector.writer.TableWriter$$anonfun$write$1.apply(TableWriter.scala:158)
at com.datastax.spark.connector.writer.TableWriter$$anonfun$write$1.apply(TableWriter.scala:135)
at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:111)
at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:110)
at com.datastax.spark.connector.cql.CassandraConnector.closeResourceAfterUse(CassandraConnector.scala:140)
at com.datastax.spark.connector.cql.CassandraConnector.withSessionDo(CassandraConnector.scala:110)
at com.datastax.spark.connector.writer.TableWriter.write(TableWriter.scala:135)
at com.datastax.spark.connector.RDDFunctions$$anonfun$saveToCassandra$1.apply(RDDFunctions.scala:37)
at com.datastax.spark.connector.RDDFunctions$$anonfun$saveToCassandra$1.apply(RDDFunctions.scala:37)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
at org.apache.spark.scheduler.Task.run(Task.scala:85)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.lang.Thread.run(Unknown Source)
16/10/11 17:43:03 WARN TaskSetManager: Lost task 0.0 in stage 1.0 (TID 1, localhost): java.lang.ClassCastException: com.datastax.driver.core.DefaultResultSetFuture cannot be cast to shade.com.datastax.spark.connector.google.common.util.concurrent.ListenableFuture
at com.datastax.spark.connector.writer.AsyncExecutor.executeAsync(AsyncExecutor.scala:31)
at com.datastax.spark.connector.writer.TableWriter$$anonfun$write$1$$anonfun$apply$2.apply(TableWriter.scala:159)
at com.datastax.spark.connector.writer.TableWriter$$anonfun$write$1$$anonfun$apply$2.apply(TableWriter.scala:158)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at com.datastax.spark.connector.writer.GroupingBatchBuilder.foreach(GroupingBatchBuilder.scala:31)
at com.datastax.spark.connector.writer.TableWriter$$anonfun$write$1.apply(TableWriter.scala:158)
at com.datastax.spark.connector.writer.TableWriter$$anonfun$write$1.apply(TableWriter.scala:135)
at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:111)
at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:110)
at com.datastax.spark.connector.cql.CassandraConnector.closeResourceAfterUse(CassandraConnector.scala:140)
at com.datastax.spark.connector.cql.CassandraConnector.withSessionDo(CassandraConnector.scala:110)
at com.datastax.spark.connector.writer.TableWriter.write(TableWriter.scala:135)
at com.datastax.spark.connector.RDDFunctions$$anonfun$saveToCassandra$1.apply(RDDFunctions.scala:37)
at com.datastax.spark.connector.RDDFunctions$$anonfun$saveToCassandra$1.apply(RDDFunctions.scala:37)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
at org.apache.spark.scheduler.Task.run(Task.scala:85)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.lang.Thread.run(Unknown Source)
16/10/11 17:43:03 ERROR TaskSetManager: Task 0 in stage 1.0 failed 1 times; aborting job
16/10/11 17:43:03 INFO TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed, from pool
16/10/11 17:43:03 INFO TaskSchedulerImpl: Cancelling stage 1
16/10/11 17:43:03 INFO DAGScheduler: ResultStage 1 (runJob at RDDFunctions.scala:37) failed in 0.274 s
16/10/11 17:43:03 INFO DAGScheduler: Job 1 failed: runJob at RDDFunctions.scala:37, took 0.291592 s
Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 1 times, most recent failure: Lost task 0.0 in stage 1.0 (TID 1, localhost): java.lang.ClassCastException: com.datastax.driver.core.DefaultResultSetFuture cannot be cast to shade.com.datastax.spark.connector.google.common.util.concurrent.ListenableFuture
at com.datastax.spark.connector.writer.AsyncExecutor.executeAsync(AsyncExecutor.scala:31)
at com.datastax.spark.connector.writer.TableWriter$$anonfun$write$1$$anonfun$apply$2.apply(TableWriter.scala:159)
at com.datastax.spark.connector.writer.TableWriter$$anonfun$write$1$$anonfun$apply$2.apply(TableWriter.scala:158)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at com.datastax.spark.connector.writer.GroupingBatchBuilder.foreach(GroupingBatchBuilder.scala:31)
at com.datastax.spark.connector.writer.TableWriter$$anonfun$write$1.apply(TableWriter.scala:158)
at com.datastax.spark.connector.writer.TableWriter$$anonfun$write$1.apply(TableWriter.scala:135)
at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:111)
at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:110)
at com.datastax.spark.connector.cql.CassandraConnector.closeResourceAfterUse(CassandraConnector.scala:140)
at com.datastax.spark.connector.cql.CassandraConnector.withSessionDo(CassandraConnector.scala:110)
at com.datastax.spark.connector.writer.TableWriter.write(TableWriter.scala:135)
at com.datastax.spark.connector.RDDFunctions$$anonfun$saveToCassandra$1.apply(RDDFunctions.scala:37)
at com.datastax.spark.connector.RDDFunctions$$anonfun$saveToCassandra$1.apply(RDDFunctions.scala:37)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
at org.apache.spark.scheduler.Task.run(Task.scala:85)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.lang.Thread.run(Unknown Source)
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1450)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1438)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1437)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1437)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811)
at scala.Option.foreach(Option.scala:257)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:811)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1659)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1618)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1607)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:632)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1871)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1884)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1904)
at com.datastax.spark.connector.RDDFunctions.saveToCassandra(RDDFunctions.scala:37)
at com.datastax.spark.connector.japi.RDDJavaFunctions.saveToCassandra(RDDJavaFunctions.java:61)
at com.datastax.spark.connector.japi.RDDAndDStreamCommonJavaFunctions$WriterBuilder.saveToCassandra(RDDAndDStreamCommonJavaFunctions.java:486)
at com.chatSparkConnactionTest.JavaDemoRDDWrite.main(JavaDemoRDDWrite.java:69)
Caused by: java.lang.ClassCastException: com.datastax.driver.core.DefaultResultSetFuture cannot be cast to shade.com.datastax.spark.connector.google.common.util.concurrent.ListenableFuture
at com.datastax.spark.connector.writer.AsyncExecutor.executeAsync(AsyncExecutor.scala:31)
at com.datastax.spark.connector.writer.TableWriter$$anonfun$write$1$$anonfun$apply$2.apply(TableWriter.scala:159)
at com.datastax.spark.connector.writer.TableWriter$$anonfun$write$1$$anonfun$apply$2.apply(TableWriter.scala:158)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at com.datastax.spark.connector.writer.GroupingBatchBuilder.foreach(GroupingBatchBuilder.scala:31)
at com.datastax.spark.connector.writer.TableWriter$$anonfun$write$1.apply(TableWriter.scala:158)
at com.datastax.spark.connector.writer.TableWriter$$anonfun$write$1.apply(TableWriter.scala:135)
at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:111)
at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$withSessionDo$1.apply(CassandraConnector.scala:110)
at com.datastax.spark.connector.cql.CassandraConnector.closeResourceAfterUse(CassandraConnector.scala:140)
at com.datastax.spark.connector.cql.CassandraConnector.withSessionDo(CassandraConnector.scala:110)
at com.datastax.spark.connector.writer.TableWriter.write(TableWriter.scala:135)
at com.datastax.spark.connector.RDDFunctions$$anonfun$saveToCassandra$1.apply(RDDFunctions.scala:37)
at com.datastax.spark.connector.RDDFunctions$$anonfun$saveToCassandra$1.apply(RDDFunctions.scala:37)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
at org.apache.spark.scheduler.Task.run(Task.scala:85)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.lang.Thread.run(Unknown Source)
问题是系统从RDD中插入第一个值和其他两行都被忽略了。
The thing is that system inserts first value from the RDD to database table as a result even if I got an error. And other 2 rows just ignored.
但是,以防万一,这是我的Cassandra表:
But, just in case, this is my Cassandra table:
CREATE TABLE dictionary (
value_id text,
d_value text,
d_name text,
PRIMARY KEY (value_id, d_name)
) WITH comment = 'dictionary values'
AND CLUSTERING ORDER BY (d_name ASC);
更新的pom.xml:
Updated pom.xml:
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>chat_connaction_test</groupId>
<artifactId>ChatSparkConnectionTest</artifactId>
<version>0.0.1-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.0.0</version>
</dependency>
<dependency>
<groupId>com.datastax.spark</groupId>
<artifactId>spark-cassandra-connector_2.11</artifactId>
<version>2.0.0-M3</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.10</artifactId>
<version>2.0.0</version>
</dependency>
</dependencies>
</project>
推荐答案
从以下位置删除 cassandra-driver-core依赖项您的pom.xml文件,因为它引起了问题。为了与Cassandra DB进行交互,只需要 spark-cassandra-connector依赖关系以及spark依赖关系即可。
Remove below "cassandra-driver-core" dependency from your pom.xml file as it is causing the issue. You only need "spark-cassandra-connector" dependency along with spark dependencies in order to interact with Cassandra DB.
<dependency>
<groupId>com.datastax.cassandra</groupId>
<artifactId>cassandra-driver-core</artifactId>
<version>3.1.0</version>
</dependency>
这篇关于Java,Spark和Cassandra java.lang.ClassCastException:com.datastax.driver.core.DefaultResultSetFuture无法投射为阴影的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!