星火卡桑德拉连接器错误 [英] Spark Cassandra Connector Error

查看:178
本文介绍了星火卡桑德拉连接器错误的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

试图从卡桑德拉火花壳牌连接和火花提交,但都抛出同样的错误。


  

SPARK版本:1.2.0


的Apache Cassandra的版本2.1.1与1.2.0星火连接使用Datastax卡桑德拉驱动器和连接器(版本在POM文件中列出)。除了卡珊德拉,斯卡拉或Java程序工作正常。请有人帮助以解决此错误。

错误:

 `** java.lang.AbstractMethodError
    在org.apache.spark.Logging $ class.log(Logging.scala:52)
    在com.datastax.spark.connector.cql.CassandraConnector $ .LOG(CassandraConnector.scala:144)
    在org.apache.spark.Logging $ class.logDebug(Logging.scala:63)
    在com.datastax.spark.connector.cql.CassandraConnector $ .logDebug(CassandraConnector.scala:144)**
    在com.datastax.spark.connector.cql.CassandraConnector$.com$datastax$spark$connector$cql$CassandraConnector$$createSession(CassandraConnector.scala:154)
    在com.datastax.spark.connector.cql.CassandraConnector $$ anonfun $ 2.适用(CassandraConnector.scala:151)
    在com.datastax.spark.connector.cql.CassandraConnector $$ anonfun $ 2.适用(CassandraConnector.scala:151)
    在com.datastax.spark.connector.cql.RefCountedCache.createNewValueAndKeys(RefCountedCache.scala:36)
    在com.datastax.spark.connector.cql.RefCountedCache.acquire(RefCountedCache.scala:61)
    在com.datastax.spark.connector.cql.CassandraConnector.openSession(CassandraConnector.scala:73)
    在com.datastax.spark.connector.cql.CassandraConnector.withSessionDo(CassandraConnector.scala:98)
    在com.datastax.spark.connector.cql.CassandraConnector.withClusterDo(CassandraConnector.scala:109)
    在com.datastax.spark.connector.cql.Schema $ .fromCassandra(Schema.scala:131)
    在com.datastax.spark.connector.rdd.CassandraRDD.tableDef $ lzycompute(CassandraRDD.scala:206)
    在com.datastax.spark.connector.rdd.CassandraRDD.tableDef(CassandraRDD.scala:205)
    在com.datastax.spark.connector.rdd.CassandraRDD<初始化方式>(CassandraRDD.scala:212)
    在com.datastax.spark.connector.SparkContextFunctions.cassandraTable(SparkContextFunctions.scala:48)
    在IWC万国表$ $$ $$ IWC万国表IWC万国表IWC $$ $$ IWC万国表$$ $$ IWC万国表$$ $$ IWC万国表$$ LT&;&初始化GT;(小于控制台>:25)。
    在IWC万国表$ $$ $$ IWC万国表IWC万国表IWC $$ $$ IWC万国表$$ $$ IWC万国表$$ $$ IWC万国表<&初始化GT;(小于控制台>:30)。
    在IWC万国表$ $$ $$ IWC万国表IWC万国表IWC $$ $$ IWC万国表$$ $$ IWC万国表$$ LT&;&初始化GT;(小于控制台>:32)。
    在IWC万国表$ $$ $$ IWC万国表IWC万国表IWC $$ $$ IWC万国表$$ $$ IWC万国表<&初始化GT;(小于控制台>:34)。
    在IWC万国表$ $$ $$ IWC万国表IWC万国表IWC $$ $$ IWC万国表$$ LT&;&初始化GT;(小于控制台>:36)。
    在IWC万国表$ $$ $$ IWC万国表IWC万国表IWC万国表$$ $$ IWC万国表<&初始化GT;(小于控制台>:38)。
    在IWC万国表$ $$ $$ IWC万国表IWC万国表IWC $$ LT&;&初始化GT;(小于控制台>:40)。
    在IWC万国表$ $$ $$ IWC万国表IWC万国表<&初始化GT;(小于控制台>:42)。
    在IWC万国表$ $$ IWC万国表<&初始化GT;(小于控制台>:44)。
    在IWC万国表$<&初始化GT;(小于控制台>:46)。
    在与下;初始化>(小于控制台>:48)
    在与下;初始化>(小于控制台>:52)。
    在与下; clinit>(小于控制台&GT)
    在与下;初始化方式>(小于控制台>:7)
    在与下; clinit>(小于控制台&GT)
    在$打印(小于控制台>)
    在sun.reflect.NativeMethodAccessorImpl.invoke0(本机方法)
    在sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    在sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    在java.lang.reflect.Method.invoke(Method.java:606)
    在org.apache.spark.repl.SparkIMain $ ReadEvalPrint.call(SparkIMain.scala:852)
    在org.apache.spark.repl.SparkIMain $ Request.loadAndRun(SparkIMain.scala:1125)
    1 org.apache.spark.repl.SparkIMain.loadAndRunReq $(SparkIMain.scala:674)
    在org.apache.spark.repl.SparkIMain.inter preT(SparkIMain.scala:705)
    在org.apache.spark.repl.SparkIMain.inter preT(SparkIMain.scala:669)
    在org.apache.spark.repl.SparkILoop.reallyInter preT $ 1(SparkILoop.scala:828)
    在org.apache.spark.repl.SparkILoop.inter pretStartingWith(SparkILoop.scala:873)
    在org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:785)
    1 org.apache.spark.repl.SparkILoop.processLine $(SparkILoop.scala:628)
    1 org.apache.spark.repl.SparkILoop.innerLoop $(SparkILoop.scala:636)
    在org.apache.spark.repl.SparkILoop.loop(SparkILoop.scala:641)
    在org.apache.spark.repl.SparkILoop $$ anonfun $ $过程$ 1.适用MCZ $ SP(SparkILoop.scala:968)
    在org.apache.spark.repl.SparkILoop $$ anonfun $ $过程1.适用(SparkILoop.scala:916)
    在org.apache.spark.repl.SparkILoop $$ anonfun $ $过程1.适用(SparkILoop.scala:916)
    在scala.tools.nsc.util.ScalaClassLoader $ .savingContextLoader(ScalaClassLoader.scala:135)
    在org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:916)
    在org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1011)
    在org.apache.spark.repl.Main $。主要(Main.scala:31)
    在org.apache.spark.repl.Main.main(Main.scala)
    在sun.reflect.NativeMethodAccessorImpl.invoke0(本机方法)
    在sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    在sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    在java.lang.reflect.Method.invoke(Method.java:606)
    在org.apache.spark.deploy.SparkSubmit $ .launch(SparkSubmit.scala:358)
    在org.apache.spark.deploy.SparkSubmit $。主要(SparkSubmit.scala:75)
    在org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)`

>命令我试过火花壳:

 斯卡拉>进口com.datastax.spark.connector._
斯卡拉> VAL的conf =新SparkConf()
斯卡拉> conf.set(cassandra.connection.host,node1.pc.datastax.com)
斯卡拉> VAL SC =新SparkContext(本地[2],卡桑德拉连接器测试,CONF)
斯卡拉> VAL表= sc.cassandraTable(密钥空间,表)
斯卡拉> table.count


  

的Java code:


 包com.madhes;进口com.datastax.driver.core.Session;
进口com.datastax.spark.connector.cql.CassandraConnector;
进口com.google.common.base.Optional;
进口org.apache.spark.SparkConf;
进口org.apache.spark.api.java.JavaPairRDD;
进口org.apache.spark.api.java.JavaRDD;
进口org.apache.spark.api.java.JavaSparkContext;
进口org.apache.spark.api.java.function.FlatMapFunction;
进口org.apache.spark.api.java.function.Function;
进口org.apache.spark.api.java.function.Function2;
进口org.apache.spark.api.java.function.PairFlatMapFunction;
进口scala.Tuple2;的Bean;
进口java.math.BigDecimal的;
进口java.text.MessageFormat中;
进口的java.util。*;引入静态com.datastax.spark.connector.CassandraJavaUtil *。公共类应用实现Serializable {
    私人短暂SparkConf的conf;    私有应用程序(SparkConf CONF){
        this.conf = CONF;
    }    私人无效的run(){
        JavaSparkContext SC =新JavaSparkContext(CONF);
        generateData(SC);
        计算(SC);
        showResults(SC);
        sc.stop();
    }    私人无效generateData(JavaSparkContext SC){
        CassandraConnector接头= CassandraConnector.apply(sc.getConf());        // prepare架构
        尝试(会话的会话= connector.openSession()){
           // session.execute(DROP KEYSPACE IF EXISTS java_api);
         // session.execute(创建复制= KEYSPACE java_api {'类':'SimpleStrategy','replication_factor':1});
         // session.execute(CREATE TABLE java_api.products(ID INT PRIMARY KEY,Name文本,家长LIST< INT>));
         // session.execute(CREATE TABLE java_api.sales(ID UUID PRIMARY KEY,产品INT,价格十进制));
         // session.execute(CREATE TABLE java_api.summaries(产品INT PRIMARY KEY,总结十进制));
        }        // prepare产品层次
        清单<产品与GT;产品= Arrays.asList(
                新产品(0,所有产品,集合<整数GT;的emptyList()),
                新产品(1,产品A,Arrays.asList(0)),
                新产品(4,产品A1Arrays.asList(0,1)),
                新产品(5,产品A2时,Arrays.asList(0,1)),
                新产品(2,产品B,Arrays.asList(0)),
                新产品(6,产品B1,Arrays.asList(0,2)),
                新产品(7,产品B2Arrays.asList(0,2)),
                新产品(3,产品C,Arrays.asList(0)),
                新产品(8,产品C1,Arrays.asList(0,3)),
                新产品(9,产品C2,Arrays.asList(0,3))
        );        JavaRDD<产品与GT; productsRDD = sc.parallelize(产品);
        javaFunctions(productsRDD,Product.class).saveToCassandra(java_api,产品);        JavaRDD<出售与GT; salesRDD = productsRDD.filter(新功能<产品,布尔>(){
            @覆盖
            公共布尔调用(产品产品)抛出异常{
                返回product.getParents()尺寸()== 2。
            }
        })flatMap(新FlatMapFunction<产品,销售及GT;(){
            @覆盖
            公众可迭代<出售与GT;电话(产品产品)抛出异常{
                随机随机=新的随机();
                清单<出售与GT;销售=新的ArrayList<>(1000);
                的for(int i = 0; I< 1000;我++){
                    sales.add(新售(UUID.randomUUID(),product.getId(),BigDecimal.valueOf(random.nextDouble())));
                }
                返回销售;
            }
        });        javaFunctions(salesRDD,Sale.class).saveToCassandra(java_api,销售);
    }    私人无效计算(JavaSparkContext SC){
        JavaPairRDD<整数,产品> productsRDD = javaFunctions(SC)
                .cassandraTable(java_api,产品,Product.class)
                .keyBy(新功能<产品,整数GT;(){
                    @覆盖
                    公共整数电话(产品产品)抛出异常{
                        返回product.getId();
                    }
                });        JavaPairRDD<整数,销售> salesRDD = javaFunctions(SC)
                .cassandraTable(java_api,销售,Sale.class)
                .keyBy(新功能<售后,整数GT;(){
                    @覆盖
                    公共整数电话(销售销售)抛出异常{
                        返回sale.getProduct();
                    }
                });        JavaPairRDD<整数,Tuple2<售后,产品与GT;> joinedRDD = salesRDD.join(productsRDD);        JavaPairRDD<整数,BigDecimal的> allSalesRDD = joinedRDD.flatMapToPair(新PairFlatMapFunction< Tuple2<整数,Tuple2<售后,产品与GT;>中整数,BigDecimal的>(){
            @覆盖
            公众可迭代< Tuple2<整数,BigDecimal的>>致电(Tuple2<整数,Tuple2<售后,产品与GT;>输入)抛出异常{
                Tuple2<售后,产品与GT; saleWithProduct = input._2();
                清单< Tuple2<整数,BigDecimal的>> allSales =新的ArrayList<>(saleWithProduct._2()getParents()大小()+ 1);
                allSales.add(新Tuple2&所述;>(saleWithProduct._1()getProduct(),saleWithProduct._1()用getPrice())。);
                为(整数parentProduct:saleWithProduct._2()getParents()){
                    allSales.add(新Tuple2&所述;>(parentProduct,saleWithProduct._1()用getPrice())。);
                }
                返回allSales;
            }
        });        JavaRDD&所述;总结> summariesRDD = allSalesRDD.reduceByKey(新功能2< BigDecimal的,BigDecimal的,BigDecimal的>(){
            @覆盖
            公共电话的BigDecimal(BigDecimal的V1,V2的BigDecimal)抛出异常{
                返回v1.add(V2);
            }
        })地图(新功能与LT; Tuple2<整数,BigDecimal的>中总结>(){
            @覆盖
            公共通话报告(Tuple2<整数,BigDecimal的>输入)抛出异常{
                返回新摘要(input._1(),input._2());
            }
        });        javaFunctions(summariesRDD,Summary.class).saveToCassandra(java_api,摘要);
    }    私人无效showResults(JavaSparkContext SC){
        JavaPairRDD<整数,总结> summariesRdd = javaFunctions(SC)
                .cassandraTable(java_api,摘要,Summary.class)
                .keyBy(新功能<总结,整数GT;(){
                    @覆盖
                    公共整数电话(摘要摘要)抛出异常{
                        返回summary.getProduct();
                    }
                });        JavaPairRDD<整数,产品> productsRdd = javaFunctions(SC)
                .cassandraTable(java_api,产品,Product.class)
                .keyBy(新功能<产品,整数GT;(){
                    @覆盖
                    公共整数电话(产品产品)抛出异常{
                        返回product.getId();
                    }
                });        清单< Tuple2<产品,可选<总结>>>结果= productsRdd.leftOuterJoin(summariesRdd).values​​()的toArray()。        对于(Tuple2<产品,可选<总结>>结果:结果){
            的System.out.println(结果);
        }
    }    公共静态无效的主要(字串[] args){
        如果(args.length!= 2){
            通信System.err.println(语法:com.datastax.spark.demo.JavaDemo<星火主网址><卡桑德拉接触点>中);
            System.exit(1);
        }        SparkConf的conf =新SparkConf();
        conf.setAppName(的Java API演示);
        conf.setMaster(参数[0]);
        conf.set(spark.cassandra.connection.host,ARGS [1]);        应用程序=新的App(CONF);
        app.run();
    }    公共静态类产品实现Serializable {
        私人整数ID;
        私人字符串名称;
        私人列表<整数GT;父母;        公共产品(){}        公共产品(整数ID字符串名称,列表与LT;整数GT;家长){
            this.id = ID;
            this.name =名称;
            this.parents =父母;
        }        公共整数的getId(){返回ID; }
        公共无效SETID(整数ID){this.id = ID; }        公共字符串的getName(){返回名称; }
        公共无效setname可以(字符串名称){this.name =名称; }        公开名单<整数GT; getParents(){返回父母; }
        公共无效setParents(列表<整数GT;家长){this.parents =父母; }        @覆盖
        公共字符串的toString(){
            返回MessageFormat.format(产品'{'ID = {0},名称=''{1}'',家长= {2}'}',ID,姓名,父母);
        }
    }    公共静态类销售实现S​​erializable {
        私人UUID ID;
        私人整数产品;
        私人BigDecimal的价格;        公开发售(){}        公开发售(UUID ID,整数的产品,价格的BigDecimal){
            this.id = ID;
            this.product =产品;
            this.price =价格;
        }        公共UUID的getId(){返回ID; }
        公共无效SETID(UUID ID){this.id = ID; }        公共整数getProduct(){回报的产品; }
        公共无效setProduct(整数产品){this.product =产品; }        公众的BigDecimal用getPrice(){返回的价格; }
        公共无效setPrice(BigDecimal的价格){this.price =价格; }        @覆盖
        公共字符串的toString(){
            返回MessageFormat.format(「出售'{'ID = {0},产品= {1},价格= {2}'}',身份证,产品,价格);
        }
    }    公共静态类摘要实现Serializable {
        私人整数产品;
        私人BigDecimal的总结;        公共摘要(){}        公共摘要(整数产品,BigDecimal的摘要){
            this.product =产品;
            this.summary =摘要;
        }        公共整数getProduct(){回报的产品; }
        公共无效setProduct(整数产品){this.product =产品; }        公众的BigDecimal getSummary(){返回摘要; }
        公共无效setSummary(BigDecimal的摘要){this.summary =摘要; }        @覆盖
        公共字符串的toString(){
            返回MessageFormat.format(摘要{产品= {0},总结= {1}'}',产品总结);
        }
    }}


  

的pom.xml


 <?XML版本=1.0编码=UTF-8&GT?;
<项目的xmlns =htt​​p://maven.apache.org/POM/4.0.0
         XMLNS:XSI =htt​​p://www.w3.org/2001/XMLSchema-instance
         XSI:的schemaLocation =htt​​p://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd\">
    < modelVersion> 4.0.0< / modelVersion>    <&的groupId GT; com.madhes< /的groupId>
    <&的artifactId GT;应用< / artifactId的>
    <&版GT; 1.0-SNAPSHOT< /版本>    <依赖和GT;
        <! - 星火卡桑德拉连接器 - >
        <&依赖性GT;
            <&的groupId GT; com.datastax.spark< /的groupId>
            <&的artifactId GT;火花卡桑德拉 - connector_2.10< / artifactId的>
            <&版GT; 1.0.0< /版本>
        < /依赖性>
        <&依赖性GT;
            <&的groupId GT; com.datastax.spark< /的groupId>
            <&的artifactId GT;火花卡桑德拉 - 连接器java_2.10< / artifactId的>
            <&版GT; 1.0.0< /版本>
        < /依赖性>        <! - 星火 - >
        <&依赖性GT;
            <&的groupId GT; org.apache.spark< /的groupId>
            <&的artifactId GT;火花core_2.10< / artifactId的>
            <&版GT; 1.2.0< /版本>
        < /依赖性>
        <&依赖性GT;
            <&的groupId GT; org.apache.spark< /的groupId>
            <&的artifactId GT;火花streaming_2.10< / artifactId的>
            <&版GT; 1.2.0< /版本>
        < /依赖性>
       <&依赖性GT;
    <&的groupId GT; net.jpountz.lz4< /的groupId>
    <&的artifactId GT;&LZ4 LT; / artifactId的>
    <&版GT; 1.3.0< /版本>
  < /依赖性>    <&依赖性GT;
        <&的groupId GT; com.datastax.cassandra< /的groupId>
        <&的artifactId GT;卡桑德拉驱动器核心和LT; / artifactId的>
        <&版GT; 2.1.0< /版本>
        <&排除GT;
            <&排斥GT;
                <&的groupId GT; org.slf4j< /的groupId>
                <&的artifactId GT; SLF4J-log4j12< / artifactId的>
            < /排除>
            <&排斥GT;
                <&的groupId GT;的log4j< /的groupId>
                <&的artifactId GT;的log4j< / artifactId的>
            < /排除>
            <&排斥GT;
                <&的groupId GT; org.apache.thrift< /的groupId>
                <&的artifactId GT; libthrift< / artifactId的>
            < /排除>
        < /排除>
    < /依赖性>    <&依赖性GT;
        <&的groupId GT; org.apache.thrift< /的groupId>
        <&的artifactId GT; libthrift< / artifactId的>
        <&版GT; 0.9.1< /版本>
    < /依赖性>    < /依赖和GT;
< /项目>


解决方案

我面临着同样的问题,并通过添加以下Maven依赖解决

 <&依赖性GT;
            <&的groupId GT; org.apache.cassandra< /的groupId>
            <&的artifactId GT;卡桑德拉 - 所有与LT; / artifactId的>
            <&版GT; 1.2.6< /版本>
            <&排除GT;
                <! - <&排斥GT; <&的groupId GT; com.google.guava< /的groupId> <&的artifactId GT;番石榴< / artifactId的>
                    < /排除> - >
                <&排斥GT;
                    <&的groupId GT; com.google code.concurrentlinkedhashmap< /的groupId>
                    <&的artifactId GT; concurrentlinkedhashmap-LRU< / artifactId的>
                < /排除>
                <&排斥GT;
                    <&的groupId GT; com.ning< /的groupId>
                    <&的artifactId GT; COM preSS-LZF< / artifactId的>
                < /排除>
                <&排斥GT;
                    <&的groupId GT; io.netty< /的groupId>
                    <&的artifactId GT;网状< / artifactId的>
                < /排除>
                <&排斥GT;
                    <&的groupId GT;的JLine< /的groupId>
                    <&的artifactId GT;的JLine< / artifactId的>
                < /排除>
                <&排斥GT;
                    <&的groupId GT; org.apache.cassandra.deps< /的groupId>
                    <&的artifactId GT;的Avro< / artifactId的>
                < /排除>
            < /排除>
< /依赖性>

Trying to connect Cassandra from spark-Shell and spark-Submit, but both throwing same error.

SPARK version : 1.2.0

Apache Cassandra version 2.1.1 connect with Spark 1.2.0 using Datastax Cassandra Driver and connector ( versions are listed in POM file). Other than cassandra, scala or java programs works fine. Please some one help to resolve this error.

Error:

`**java.lang.AbstractMethodError
    at org.apache.spark.Logging$class.log(Logging.scala:52)
    at com.datastax.spark.connector.cql.CassandraConnector$.log(CassandraConnector.scala:144)
    at org.apache.spark.Logging$class.logDebug(Logging.scala:63)
    at com.datastax.spark.connector.cql.CassandraConnector$.logDebug(CassandraConnector.scala:144)**
    at com.datastax.spark.connector.cql.CassandraConnector$.com$datastax$spark$connector$cql$CassandraConnector$$createSession(CassandraConnector.scala:154)
    at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$2.apply(CassandraConnector.scala:151)
    at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$2.apply(CassandraConnector.scala:151)
    at com.datastax.spark.connector.cql.RefCountedCache.createNewValueAndKeys(RefCountedCache.scala:36)
    at com.datastax.spark.connector.cql.RefCountedCache.acquire(RefCountedCache.scala:61)
    at com.datastax.spark.connector.cql.CassandraConnector.openSession(CassandraConnector.scala:73)
    at com.datastax.spark.connector.cql.CassandraConnector.withSessionDo(CassandraConnector.scala:98)
    at com.datastax.spark.connector.cql.CassandraConnector.withClusterDo(CassandraConnector.scala:109)
    at com.datastax.spark.connector.cql.Schema$.fromCassandra(Schema.scala:131)
    at com.datastax.spark.connector.rdd.CassandraRDD.tableDef$lzycompute(CassandraRDD.scala:206)
    at com.datastax.spark.connector.rdd.CassandraRDD.tableDef(CassandraRDD.scala:205)
    at com.datastax.spark.connector.rdd.CassandraRDD.<init>(CassandraRDD.scala:212)
    at com.datastax.spark.connector.SparkContextFunctions.cassandraTable(SparkContextFunctions.scala:48)
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:25)
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:30)
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:32)
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:34)
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:36)
    at $iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:38)
    at $iwC$$iwC$$iwC$$iwC.<init>(<console>:40)
    at $iwC$$iwC$$iwC.<init>(<console>:42)
    at $iwC$$iwC.<init>(<console>:44)
    at $iwC.<init>(<console>:46)
    at <init>(<console>:48)
    at .<init>(<console>:52)
    at .<clinit>(<console>)
    at .<init>(<console>:7)
    at .<clinit>(<console>)
    at $print(<console>)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:852)
    at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1125)
    at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:674)
    at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:705)
    at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:669)
    at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:828)
    at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:873)
    at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:785)
    at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:628)
    at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:636)
    at org.apache.spark.repl.SparkILoop.loop(SparkILoop.scala:641)
    at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply$mcZ$sp(SparkILoop.scala:968)
    at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:916)
    at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:916)
    at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
    at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:916)
    at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1011)
    at org.apache.spark.repl.Main$.main(Main.scala:31)
    at org.apache.spark.repl.Main.main(Main.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:358)
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75)
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)`

> Commands I tried in spark-shell:

scala> import com.datastax.spark.connector._
scala> val conf = new SparkConf()
scala> conf.set("cassandra.connection.host", "node1.pc.datastax.com")
scala> val sc = new SparkContext("local[2]", "Cassandra Connector Test", conf)
scala> val table = sc.cassandraTable("keyspace", "table")
scala> table.count

Java Code:

package com.madhes;

import com.datastax.driver.core.Session;
import com.datastax.spark.connector.cql.CassandraConnector;
import com.google.common.base.Optional;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFlatMapFunction;
import scala.Tuple2;

import java.io.Serializable;
import java.math.BigDecimal;
import java.text.MessageFormat;
import java.util.*;

import static com.datastax.spark.connector.CassandraJavaUtil.*;

public class App implements Serializable {
    private transient SparkConf conf;

    private App(SparkConf conf) {
        this.conf = conf;
    }

    private void run() {
        JavaSparkContext sc = new JavaSparkContext(conf);
        generateData(sc);
        compute(sc);
        showResults(sc);
        sc.stop();
    }

    private void generateData(JavaSparkContext sc) {
        CassandraConnector connector = CassandraConnector.apply(sc.getConf());

        // Prepare the schema
        try (Session session = connector.openSession()) {
           // session.execute("DROP KEYSPACE IF EXISTS java_api");
         //   session.execute("CREATE KEYSPACE java_api WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1}");
         //   session.execute("CREATE TABLE java_api.products (id INT PRIMARY KEY, name TEXT, parents LIST<INT>)");
         //   session.execute("CREATE TABLE java_api.sales (id UUID PRIMARY KEY, product INT, price DECIMAL)");
         //   session.execute("CREATE TABLE java_api.summaries (product INT PRIMARY KEY, summary DECIMAL)");
        }

        // Prepare the products hierarchy
        List<Product> products = Arrays.asList(
                new Product(0, "All products", Collections.<Integer>emptyList()),
                new Product(1, "Product A", Arrays.asList(0)),
                new Product(4, "Product A1", Arrays.asList(0, 1)),
                new Product(5, "Product A2", Arrays.asList(0, 1)),
                new Product(2, "Product B", Arrays.asList(0)),
                new Product(6, "Product B1", Arrays.asList(0, 2)),
                new Product(7, "Product B2", Arrays.asList(0, 2)),
                new Product(3, "Product C", Arrays.asList(0)),
                new Product(8, "Product C1", Arrays.asList(0, 3)),
                new Product(9, "Product C2", Arrays.asList(0, 3))
        );

        JavaRDD<Product> productsRDD = sc.parallelize(products);
        javaFunctions(productsRDD, Product.class).saveToCassandra("java_api", "products");

        JavaRDD<Sale> salesRDD = productsRDD.filter(new Function<Product, Boolean>() {
            @Override
            public Boolean call(Product product) throws Exception {
                return product.getParents().size() == 2;
            }
        }).flatMap(new FlatMapFunction<Product, Sale>() {
            @Override
            public Iterable<Sale> call(Product product) throws Exception {
                Random random = new Random();
                List<Sale> sales = new ArrayList<>(1000);
                for (int i = 0; i < 1000; i++) {
                    sales.add(new Sale(UUID.randomUUID(), product.getId(), BigDecimal.valueOf(random.nextDouble())));
                }
                return sales;
            }
        });

        javaFunctions(salesRDD, Sale.class).saveToCassandra("java_api", "sales");
    }

    private void compute(JavaSparkContext sc) {
        JavaPairRDD<Integer, Product> productsRDD = javaFunctions(sc)
                .cassandraTable("java_api", "products", Product.class)
                .keyBy(new Function<Product, Integer>() {
                    @Override
                    public Integer call(Product product) throws Exception {
                        return product.getId();
                    }
                });

        JavaPairRDD<Integer, Sale> salesRDD = javaFunctions(sc)
                .cassandraTable("java_api", "sales", Sale.class)
                .keyBy(new Function<Sale, Integer>() {
                    @Override
                    public Integer call(Sale sale) throws Exception {
                        return sale.getProduct();
                    }
                });

        JavaPairRDD<Integer, Tuple2<Sale, Product>> joinedRDD = salesRDD.join(productsRDD);

        JavaPairRDD<Integer, BigDecimal> allSalesRDD = joinedRDD.flatMapToPair(new PairFlatMapFunction<Tuple2<Integer, Tuple2<Sale, Product>>, Integer, BigDecimal>() {
            @Override
            public Iterable<Tuple2<Integer, BigDecimal>> call(Tuple2<Integer, Tuple2<Sale, Product>> input) throws Exception {
                Tuple2<Sale, Product> saleWithProduct = input._2();
                List<Tuple2<Integer, BigDecimal>> allSales = new ArrayList<>(saleWithProduct._2().getParents().size() + 1);
                allSales.add(new Tuple2<>(saleWithProduct._1().getProduct(), saleWithProduct._1().getPrice()));
                for (Integer parentProduct : saleWithProduct._2().getParents()) {
                    allSales.add(new Tuple2<>(parentProduct, saleWithProduct._1().getPrice()));
                }
                return allSales;
            }
        });

        JavaRDD<Summary> summariesRDD = allSalesRDD.reduceByKey(new Function2<BigDecimal, BigDecimal, BigDecimal>() {
            @Override
            public BigDecimal call(BigDecimal v1, BigDecimal v2) throws Exception {
                return v1.add(v2);
            }
        }).map(new Function<Tuple2<Integer, BigDecimal>, Summary>() {
            @Override
            public Summary call(Tuple2<Integer, BigDecimal> input) throws Exception {
                return new Summary(input._1(), input._2());
            }
        });

        javaFunctions(summariesRDD, Summary.class).saveToCassandra("java_api", "summaries");
    }

    private void showResults(JavaSparkContext sc) {
        JavaPairRDD<Integer, Summary> summariesRdd = javaFunctions(sc)
                .cassandraTable("java_api", "summaries", Summary.class)
                .keyBy(new Function<Summary, Integer>() {
                    @Override
                    public Integer call(Summary summary) throws Exception {
                        return summary.getProduct();
                    }
                });

        JavaPairRDD<Integer, Product> productsRdd = javaFunctions(sc)
                .cassandraTable("java_api", "products", Product.class)
                .keyBy(new Function<Product, Integer>() {
                    @Override
                    public Integer call(Product product) throws Exception {
                        return product.getId();
                    }
                });

        List<Tuple2<Product, Optional<Summary>>> results = productsRdd.leftOuterJoin(summariesRdd).values().toArray();

        for (Tuple2<Product, Optional<Summary>> result : results) {
            System.out.println(result);
        }
    }

    public static void main(String[] args) {
        if (args.length != 2) {
            System.err.println("Syntax: com.datastax.spark.demo.JavaDemo <Spark Master URL> <Cassandra contact point>");
            System.exit(1);
        }

        SparkConf conf = new SparkConf();
        conf.setAppName("Java API demo");
        conf.setMaster(args[0]);
        conf.set("spark.cassandra.connection.host", args[1]);

        App app = new App(conf);
        app.run();
    }

    public static class Product implements Serializable {
        private Integer id;
        private String name;
        private List<Integer> parents;

        public Product() { }

        public Product(Integer id, String name, List<Integer> parents) {
            this.id = id;
            this.name = name;
            this.parents = parents;
        }

        public Integer getId() { return id; }
        public void setId(Integer id) { this.id = id; }

        public String getName() { return name; }
        public void setName(String name) { this.name = name; }

        public List<Integer> getParents() { return parents; }
        public void setParents(List<Integer> parents) { this.parents = parents; }

        @Override
        public String toString() {
            return MessageFormat.format("Product'{'id={0}, name=''{1}'', parents={2}'}'", id, name, parents);
        }
    }

    public static class Sale implements Serializable {
        private UUID id;
        private Integer product;
        private BigDecimal price;

        public Sale() { }

        public Sale(UUID id, Integer product, BigDecimal price) {
            this.id = id;
            this.product = product;
            this.price = price;
        }

        public UUID getId() { return id; }
        public void setId(UUID id) { this.id = id; }

        public Integer getProduct() { return product; }
        public void setProduct(Integer product) { this.product = product; }

        public BigDecimal getPrice() { return price; }
        public void setPrice(BigDecimal price) { this.price = price; }

        @Override
        public String toString() {
            return MessageFormat.format("Sale'{'id={0}, product={1}, price={2}'}'", id, product, price);
        }
    }

    public static class Summary implements Serializable {
        private Integer product;
        private BigDecimal summary;

        public Summary() { }

        public Summary(Integer product, BigDecimal summary) {
            this.product = product;
            this.summary = summary;
        }

        public Integer getProduct() { return product; }
        public void setProduct(Integer product) { this.product = product; }

        public BigDecimal getSummary() { return summary; }
        public void setSummary(BigDecimal summary) { this.summary = summary; }

        @Override
        public String toString() {
            return MessageFormat.format("Summary'{'product={0}, summary={1}'}'", product, summary);
        }
    }

}

POM.XML

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.madhes</groupId>
    <artifactId>App</artifactId>
    <version>1.0-SNAPSHOT</version>

    <dependencies>
        <!--Spark Cassandra Connector-->
        <dependency>
            <groupId>com.datastax.spark</groupId>
            <artifactId>spark-cassandra-connector_2.10</artifactId>
            <version>1.0.0</version>
        </dependency>
        <dependency>
            <groupId>com.datastax.spark</groupId>
            <artifactId>spark-cassandra-connector-java_2.10</artifactId>
            <version>1.0.0</version>
        </dependency>

        <!--Spark-->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.10</artifactId>
            <version>1.2.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_2.10</artifactId>
            <version>1.2.0</version>
        </dependency>


       <dependency>
    <groupId>net.jpountz.lz4</groupId>
    <artifactId>lz4</artifactId>
    <version>1.3.0</version>
  </dependency>



    <dependency>
        <groupId>com.datastax.cassandra</groupId>
        <artifactId>cassandra-driver-core</artifactId>
        <version>2.1.0</version>
        <exclusions>
            <exclusion>
                <groupId>org.slf4j</groupId>
                <artifactId>slf4j-log4j12</artifactId>
            </exclusion>
            <exclusion>
                <groupId>log4j</groupId>
                <artifactId>log4j</artifactId>
            </exclusion>
            <exclusion>
                <groupId>org.apache.thrift</groupId>
                <artifactId>libthrift</artifactId>
            </exclusion>
        </exclusions>
    </dependency>

    <dependency>
        <groupId>org.apache.thrift</groupId>
        <artifactId>libthrift</artifactId>
        <version>0.9.1</version>
    </dependency>

    </dependencies>
</project>

解决方案

I faced the same issue and solved by adding following maven dependency

<dependency>
            <groupId>org.apache.cassandra</groupId>
            <artifactId>cassandra-all</artifactId>
            <version>1.2.6</version>
            <exclusions>
                <!-- <exclusion> <groupId>com.google.guava</groupId> <artifactId>guava</artifactId> 
                    </exclusion> -->
                <exclusion>
                    <groupId>com.googlecode.concurrentlinkedhashmap</groupId>
                    <artifactId>concurrentlinkedhashmap-lru</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>com.ning</groupId>
                    <artifactId>compress-lzf</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>io.netty</groupId>
                    <artifactId>netty</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>jline</groupId>
                    <artifactId>jline</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>org.apache.cassandra.deps</groupId>
                    <artifactId>avro</artifactId>
                </exclusion>
            </exclusions>
</dependency>

这篇关于星火卡桑德拉连接器错误的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆