星火Datastax的Java API Select语句 [英] Spark Datastax Java API Select statements

查看:201
本文介绍了星火Datastax的Java API Select语句的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我使用的是这里的教程在这个Github上使用Java Maven项目上运行卡桑德拉火花:的 https://github.com/datastax/spark-cassandra-connector

我已经想通了如何使用直接CQL语句,因为我有previously问了一个关于此问题:<一href=\"http://stackoverflow.com/questions/25893945/querying-data-in-cassandra-via-spark-in-a-java-maven-project\">Querying通过在一个Java Maven项目 Spark在卡桑德拉数据

不过,现在我尝试使用datastax Java API中恐惧,在我原来的问题我原来的code不会为Spark和卡桑德拉的Datastax版本。出于某种奇怪的原因,它不会让我用。凡即使它,我可以使用,准确的说法文档中列出。这里是我的code:

 进口org.apache.commons.lang3.StringUtils;
进口org.apache.spark.SparkConf;
进口org.apache.spark.api.java.JavaRDD;
进口org.apache.spark.api.java.JavaSparkContext;
进口org.apache.spark.api.java.function.Function;的Bean;引入静态com.datastax.spark.connector.CassandraJavaUtil *。
公共类应用实现Serializable
{    //首先,我们定义一个bean类
    公共静态类Person实现Serializable {
        私人整数ID;
        私人字符串FNAME;
        私人字符串LNAME;
        私人字符串的作用;        //记住申报无参数的构造函数
        公众人物(){}        公共整数的getId(){返回ID; }
        公共无效SETID(整数ID){this.id = ID; }        公共字符串getfname(){返回FNAME; }
        公共无效setfname(字符串FNAME){this.fname = FNAME; }        公共字符串getlname(){返回LNAME; }
        公共无效setlname(字符串LNAME){this.lname = LNAME; }        公共字符串getrole(){返回的作用; }
        公共无效角色setRole(字符串角色){this.role =作用; }        //其他方法,构造函数等。
    }    私人短暂SparkConf的conf;
    私有应用程序(SparkConf CONF){
        this.conf = CONF;
    }
    私人无效的run(){
        JavaSparkContext SC =新JavaSparkContext(CONF);
        createSchema(SC);
        sc.stop();
    }    私人无效createSchema(JavaSparkContext SC){        JavaRDD&LT;串GT; RDD = javaFunctions(SC).cassandraTable(测试,empbyrole,Person.class)
                。凡(IT工程师,角色=?)地图(新功能与LT;人,字符串&GT;(){
                    @覆盖
                    公共字符串调用(人人)抛出异常{
                        返回person.toString();
                    }
                });
        的System.out.println(数据为豆人:\\ n+ StringUtils.join(\\ n,rdd.toArray()));
       }    公共静态无效的主要(字串[] args)
    {
        如果(args.length!= 2){
            通信System.err.println(语法:com.datastax.spark.demo.JavaDemo&LT;星火主网址&GT;&LT;卡桑德拉接触点&gt;中);
            System.exit(1);
        }        SparkConf的conf =新SparkConf();
        conf.setAppName(的Java API演示);
        conf.setMaster(参数[0]);
        conf.set(spark.cassandra.connection.host,ARGS [1]);        应用程序=新的App(CONF);
        app.run();
    }
}

这里是错误:

  14/09/23 13点46分53秒错误executor.Executor:异常的任务ID 0
java.io.IOException异常:在中选择角色preparation,ID,其中fname,lname的FROM测试仪,empbyroleWHERE令牌(角色)&GT异常; -5709068081826432029和令牌(角色)&LT; = -5491279024053142424和作用=? ALLOW滤波:作用不能由一个以上的关系受到限制,如果它包括一个平等
    在com.datastax.spark.connector.rdd.CassandraRDD.createStatement(CassandraRDD.scala:310)
    在com.datastax.spark.connector.rdd.CassandraRDD.com$datastax$spark$connector$rdd$CassandraRDD$$fetchTokenRange(CassandraRDD.scala:317)
    在com.datastax.spark.connector.rdd.CassandraRDD $$ anonfun $ 13.apply(CassandraRDD.scala:338)
    在com.datastax.spark.connector.rdd.CassandraRDD $$ anonfun $ 13.apply(CassandraRDD.scala:338)
    在scala.collection.Iterator $$不久$ 13.hasNext(Iterator.scala:371)
    在com.datastax.spark.connector.util.CountingIterator.hasNext(CountingIterator.scala:10)
    在scala.collection.Iterator $$不久$ 11.hasNext(Iterator.scala:327)
    在scala.collection.Iterator $ class.foreach(Iterator.scala:727)
    在scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
    在scala.collection.generic.Growable $ $类加$另加$ EQ(Growable.scala:48)。
    在scala.collection.mutable.ArrayBuffer $另加$另加$ EQ(ArrayBuffer.scala:103)。
    在scala.collection.mutable.ArrayBuffer $另加$另加$ EQ(ArrayBuffer.scala:47)。
    在scala.collection.TraversableOnce $ class.to(TraversableOnce.scala:273)
    在scala.collection.AbstractIterator.to(Iterator.scala:1157)
    在scala.collection.TraversableOnce $ class.toBuffer(TraversableOnce.scala:265)
    在scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
    在scala.collection.TraversableOnce $ class.toArray(TraversableOnce.scala:252)
    在scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
    在org.apache.spark.rdd.RDD $$ anonfun $ 4.适用(RDD.scala:608)
    在org.apache.spark.rdd.RDD $$ anonfun $ 4.适用(RDD.scala:608)
    在org.apache.spark.SparkContext $$ anonfun $ runJob $ 4.适用(SparkContext.scala:884)
    在org.apache.spark.SparkContext $$ anonfun $ runJob $ 4.适用(SparkContext.scala:884)
    在org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:109)
    在org.apache.spark.scheduler.Task.run(Task.scala:53)
    在org.apache.spark.executor.Executor $ TaskRunner.run(Executor.scala:205)
    在java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    在java.util.concurrent.ThreadPoolExecutor中的$ Worker.run(ThreadPoolExecutor.java:615)
    在java.lang.Thread.run(Thread.java:745)
com.datastax.driver.core.exceptions.InvalidQueryException:引起作用不能由一个以上的关系,如果它包括一个平等的限制
    在com.datastax.driver.core.exceptions.InvalidQueryException.copy(InvalidQueryException.java:35)
    在com.datastax.driver.core.DefaultResultSetFuture.extractCauseFromExecutionException(DefaultResultSetFuture.java:256)
    在com.datastax.driver.core.AbstractSession。prepare(AbstractSession.java:91)
    在com.datastax.spark.connector.cql.$p$pparedStatementCache$.$p$ppareStatement($p$pparedStatementCache.scala:45)
    在com.datastax.spark.connector.cql.SessionProxy.invoke(SessionProxy.scala:28)
    在com.sun.proxy。$ Proxy8。prepare(来源不明)
    在com.datastax.spark.connector.rdd.CassandraRDD.createStatement(CassandraRDD.scala:293)
    ... 27更多
com.datastax.driver.core.exceptions.InvalidQueryException:引起作用不能由一个以上的关系,如果它包括一个平等的限制
    在com.datastax.driver.core.Responses $ Error.asException(Responses.java:97)
    在com.datastax.driver.core.SessionManager $ 1.适用(SessionManager.java:156)
    在com.datastax.driver.core.SessionManager $ 1.适用(SessionManager.java:131)
    在com.google.common.util.concurrent.Futures $ 1.适用(Futures.java:711)
    在com.google.common.util.concurrent.Futures $ ChainingListenableFuture.run(Futures.java:849)
    ... 3个
14/09/23 13点46分53秒WARN scheduler.TaskSetManager:失落的TID 0(任务0.0:0)
14/09/23 13点46分53秒WARN scheduler.TaskSetManager:损失是由于java.io.IOException异常
java.io.IOException异常:在中选择角色preparation,ID,其中fname,lname的FROM测试仪,empbyroleWHERE令牌(角色)&GT异常; -5709068081826432029和令牌(角色)&LT; = -5491279024053142424和作用=? ALLOW滤波:作用不能由一个以上的关系受到限制,如果它包括一个平等
    在com.datastax.spark.connector.rdd.CassandraRDD.createStatement(CassandraRDD.scala:310)
    在com.datastax.spark.connector.rdd.CassandraRDD.com$datastax$spark$connector$rdd$CassandraRDD$$fetchTokenRange(CassandraRDD.scala:317)
    在com.datastax.spark.connector.rdd.CassandraRDD $$ anonfun $ 13.apply(CassandraRDD.scala:338)
    在com.datastax.spark.connector.rdd.CassandraRDD $$ anonfun $ 13.apply(CassandraRDD.scala:338)
    在scala.collection.Iterator $$不久$ 13.hasNext(Iterator.scala:371)
    在com.datastax.spark.connector.util.CountingIterator.hasNext(CountingIterator.scala:10)
    在scala.collection.Iterator $$不久$ 11.hasNext(Iterator.scala:327)
    在scala.collection.Iterator $ class.foreach(Iterator.scala:727)
    在scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
    在scala.collection.generic.Growable $ $类加$另加$ EQ(Growable.scala:48)。
    在scala.collection.mutable.ArrayBuffer $另加$另加$ EQ(ArrayBuffer.scala:103)。
    在scala.collection.mutable.ArrayBuffer $另加$另加$ EQ(ArrayBuffer.scala:47)。
    在scala.collection.TraversableOnce $ class.to(TraversableOnce.scala:273)
    在scala.collection.AbstractIterator.to(Iterator.scala:1157)
    在scala.collection.TraversableOnce $ class.toBuffer(TraversableOnce.scala:265)
    在scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
    在scala.collection.TraversableOnce $ class.toArray(TraversableOnce.scala:252)
    在scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
    在org.apache.spark.rdd.RDD $$ anonfun $ 4.适用(RDD.scala:608)
    在org.apache.spark.rdd.RDD $$ anonfun $ 4.适用(RDD.scala:608)
    在org.apache.spark.SparkContext $$ anonfun $ runJob $ 4.适用(SparkContext.scala:884)
    在org.apache.spark.SparkContext $$ anonfun $ runJob $ 4.适用(SparkContext.scala:884)
    在org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:109)
    在org.apache.spark.scheduler.Task.run(Task.scala:53)
    在org.apache.spark.executor.Executor $ TaskRunner.run(Executor.scala:205)
    在java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    在java.util.concurrent.ThreadPoolExecutor中的$ Worker.run(ThreadPoolExecutor.java:615)
    在java.lang.Thread.run(Thread.java:745)
14/09/23 13点46分53秒错误scheduler.TaskSetManager:任务0.0:0失败1次;中止工作
14/09/23 13点46分53秒INFO scheduler.TaskSchedulerImpl:删除taskset的0.0,其任务已全部建成后,从池
14/09/23 13点46分53秒INFO scheduler.DAGScheduler:无法在App.java:65运行的toArray
异常线程mainorg.apache.spark.SparkException:作业已中止:任务0.0:0失败1次(最近的失败:异常失败:java.io.IOException异常:中选择角色preparation过程中的异常, ?ID,其中fname,lname的FROM测试仪,empbyroleWHERE令牌(角色)&GT; -5709068081826432029和令牌(角色)&LT; = -5491279024053142424和作用=允许过滤:角色不能由一个以上的关系受到限制,如果它包括一个等于)
    在org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1020)
    在org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1018)
    在scala.collection.mutable.ResizableArray $ class.foreach(ResizableArray.scala:59)
    在scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
    在org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$abortStage(DAGScheduler.scala:1018)
    在org.apache.spark.scheduler.DAGScheduler $$ anonfun $ $的processEvent 10.apply(DAGScheduler.scala:604)
    在org.apache.spark.scheduler.DAGScheduler $$ anonfun $ $的processEvent 10.apply(DAGScheduler.scala:604)
    在scala.Option.foreach(Option.scala:236)
    在org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:604)
    在org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$$anon$2$$anonfun$receive$1.applyOrElse(DAGScheduler.scala:190)
    在akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
    在akka.actor.ActorCell.invoke(ActorCell.scala:456)
    在akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
    在akka.dispatch.Mailbox.run(Mailbox.scala:219)
    在akka.dispatch.ForkJoinExecutorConfigurator $ AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
    在scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    在scala.concurrent.forkjoin.ForkJoinPool $ WorkQueue.runTask(ForkJoinPool.java:1339)
    在scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    在scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
14/09/23 13点46分53秒INFO cql.CassandraConnector:从卡桑德拉集群断开:测试集群

我知道我的错误是专门在本节:

  JavaRDD&LT;串GT; RDD = javaFunctions(SC).cassandraTable(测试,empbyrole,Person.class)
                。凡(IT工程师,角色=?)地图(新功能与LT;人,字符串&GT;(){
                    @覆盖
                    公共字符串调用(人人)抛出异常{
                        返回person.toString();
                    }
                });

当我删除。凡(),它的工作原理。但是,在GitHub上,你应该能够分别执行。凡和.map功能特别说。有没有人有这个任何类型的推理?或解决方案?谢谢你。

修改
我得到的错误走开时,我用这个语句来代替:

  JavaRDD&LT;串GT; RDD = javaFunctions(SC).cassandraTable(测试,empbyrole,Person.class)
                。凡(1,ID =?)地图(新功能与LT;人,字符串&GT;(){
                    @覆盖
                    公共字符串调用(人人)抛出异常{
                        返回person.toString();
                    }
                });

我不知道为什么这个选项的作品,但不是我变化的其余部分。下面是我在CQL运行报表,让你知道我的密钥空间是什么样子:

  session.execute(DROP KEYSPACE IF EXISTS测试仪);
    session.execute(创建复制= KEYSPACE仪{'类':'SimpleStrategy','replication_factor':3});
    session.execute(CREATE TABLE tester.emp(ID INT PRIMARY KEY,FNAME TEXT,LNAME TEXT,TEXT角色));
    session.execute(CREATE TABLE tester.empByRole(ID INT,TEXT FNAME,LNAME TEXT,TEXT作用,PRIMARY KEY(角色,ID)));
    session.execute(CREATE TABLE tester.dept(ID INT PRIMARY KEY,DNAME TEXT));    session.execute(
              INSERT INTO tester.emp(ID,FNAME,LNAME,角色)+
              VALUES(+
                  0001,+
                  '天使',+
                  '工资',+
                  'IT工程师'+
                  ););
    session.execute(
              INSERT INTO tester.emp(ID,FNAME,LNAME,角色)+
              VALUES(+
                  0002,+
                  '约翰',+
                  '李四',+
                  'IT工程师'+
                  ););
    session.execute(
              INSERT INTO tester.emp(ID,FNAME,LNAME,角色)+
              VALUES(+
                  0003,+
                  简,+
                  '李四',+
                  'IT分析师'+
                  ););
    session.execute(
          INSERT INTO tester.empByRole(ID,FNAME,LNAME,角色)+
          VALUES(+
              0001,+
              '天使',+
              '工资',+
              'IT工程师'+
              ););
    session.execute(
              INSERT INTO tester.empByRole(ID,FNAME,LNAME,角色)+
              VALUES(+
                  0002,+
                  '约翰',+
                  '李四',+
                  'IT工程师'+
                  ););
    session.execute(
              INSERT INTO tester.empByRole(ID,FNAME,LNAME,角色)+
              VALUES(+
                  0003,+
                  简,+
                  '李四',+
                  'IT分析师'+
                  ););
        session.execute(
              INSERT INTO tester.dept(ID,DNAME)+
              VALUES(+
                  1553年,+
                  '商'+
                  ););


解决方案

,其中方法添加允许滤波来在幕后查询。这不是灵丹妙药,因为它仍然不支持任意字段查询predicates。在一般情况下,字段必须要么被索引或集群列。如果这不是你的数据模型实用,你可以简单地使用过滤器的RDD法。缺点是过滤器发生在星火,而不是在卡桑德拉。

所以 ID 外地工作,因为它是在一个定制列表支持 WHERE 条款,而我假设的作用是只是一个普通的领域。请注意,我不建议你索引你的领域,或将其更改为群集列,因为我不知道你的数据模型。

I'm using a tutorial here in this Github to run spark on cassandra using a java maven project: https://github.com/datastax/spark-cassandra-connector.

I've figured how to use direct CQL statements, as I have previously asked a question about that here: Querying Data in Cassandra via Spark in a Java Maven Project

However, now I'm trying to use the datastax java API in fear that my original code in my original question will not work for Datastax version of Spark and Cassandra. For some weird reason, it won't let me use .where even though it is outlined in the documentation that I can use that exact statement. Here is my code:

import org.apache.commons.lang3.StringUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;

import java.io.Serializable;

import static com.datastax.spark.connector.CassandraJavaUtil.*;


public class App implements Serializable
{

    // firstly, we define a bean class
    public static class Person implements Serializable {
        private Integer id;
        private String fname;
        private String lname;
        private String role;

        // Remember to declare no-args constructor
        public Person() { }

        public Integer getId() { return id; }
        public void setId(Integer id) { this.id = id; }

        public String getfname() { return fname; }
        public void setfname(String fname) { this.fname = fname; }

        public String getlname() { return lname; }
        public void setlname(String lname) { this.lname = lname; }

        public String getrole() { return role; }
        public void setrole(String role) { this.role = role; }

        // other methods, constructors, etc.
    }

    private transient SparkConf conf;
    private App(SparkConf conf) {
        this.conf = conf;
    }


    private void run() {
        JavaSparkContext sc = new JavaSparkContext(conf);
        createSchema(sc);


        sc.stop();
    }

    private void createSchema(JavaSparkContext sc) {

        JavaRDD<String> rdd = javaFunctions(sc).cassandraTable("tester", "empbyrole", Person.class)
                .where("role=?", "IT Engineer").map(new Function<Person, String>() {
                    @Override
                    public String call(Person person) throws Exception {
                        return person.toString();
                    }
                });
        System.out.println("Data as Person beans: \n" + StringUtils.join("\n", rdd.toArray()));
       }



    public static void main( String[] args )
    {
        if (args.length != 2) {
            System.err.println("Syntax: com.datastax.spark.demo.JavaDemo <Spark Master URL> <Cassandra contact point>");
            System.exit(1);
        }

        SparkConf conf = new SparkConf();
        conf.setAppName("Java API demo");
        conf.setMaster(args[0]);
        conf.set("spark.cassandra.connection.host", args[1]);

        App app = new App(conf);
        app.run();
    }
}

here is the error:

14/09/23 13:46:53 ERROR executor.Executor: Exception in task ID 0
java.io.IOException: Exception during preparation of SELECT "role", "id", "fname", "lname" FROM "tester"."empbyrole" WHERE token("role") > -5709068081826432029 AND token("role") <= -5491279024053142424 AND role=? ALLOW FILTERING: role cannot be restricted by more than one relation if it includes an Equal
    at com.datastax.spark.connector.rdd.CassandraRDD.createStatement(CassandraRDD.scala:310)
    at com.datastax.spark.connector.rdd.CassandraRDD.com$datastax$spark$connector$rdd$CassandraRDD$$fetchTokenRange(CassandraRDD.scala:317)
    at com.datastax.spark.connector.rdd.CassandraRDD$$anonfun$13.apply(CassandraRDD.scala:338)
    at com.datastax.spark.connector.rdd.CassandraRDD$$anonfun$13.apply(CassandraRDD.scala:338)
    at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
    at com.datastax.spark.connector.util.CountingIterator.hasNext(CountingIterator.scala:10)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
    at scala.collection.Iterator$class.foreach(Iterator.scala:727)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
    at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
    at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
    at scala.collection.AbstractIterator.to(Iterator.scala:1157)
    at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
    at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
    at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
    at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
    at org.apache.spark.rdd.RDD$$anonfun$4.apply(RDD.scala:608)
    at org.apache.spark.rdd.RDD$$anonfun$4.apply(RDD.scala:608)
    at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:884)
    at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:884)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:109)
    at org.apache.spark.scheduler.Task.run(Task.scala:53)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:205)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:745)
Caused by: com.datastax.driver.core.exceptions.InvalidQueryException: role cannot be restricted by more than one relation if it includes an Equal
    at com.datastax.driver.core.exceptions.InvalidQueryException.copy(InvalidQueryException.java:35)
    at com.datastax.driver.core.DefaultResultSetFuture.extractCauseFromExecutionException(DefaultResultSetFuture.java:256)
    at com.datastax.driver.core.AbstractSession.prepare(AbstractSession.java:91)
    at com.datastax.spark.connector.cql.PreparedStatementCache$.prepareStatement(PreparedStatementCache.scala:45)
    at com.datastax.spark.connector.cql.SessionProxy.invoke(SessionProxy.scala:28)
    at com.sun.proxy.$Proxy8.prepare(Unknown Source)
    at com.datastax.spark.connector.rdd.CassandraRDD.createStatement(CassandraRDD.scala:293)
    ... 27 more
Caused by: com.datastax.driver.core.exceptions.InvalidQueryException: role cannot be restricted by more than one relation if it includes an Equal
    at com.datastax.driver.core.Responses$Error.asException(Responses.java:97)
    at com.datastax.driver.core.SessionManager$1.apply(SessionManager.java:156)
    at com.datastax.driver.core.SessionManager$1.apply(SessionManager.java:131)
    at com.google.common.util.concurrent.Futures$1.apply(Futures.java:711)
    at com.google.common.util.concurrent.Futures$ChainingListenableFuture.run(Futures.java:849)
    ... 3 more
14/09/23 13:46:53 WARN scheduler.TaskSetManager: Lost TID 0 (task 0.0:0)
14/09/23 13:46:53 WARN scheduler.TaskSetManager: Loss was due to java.io.IOException
java.io.IOException: Exception during preparation of SELECT "role", "id", "fname", "lname" FROM "tester"."empbyrole" WHERE token("role") > -5709068081826432029 AND token("role") <= -5491279024053142424 AND role=? ALLOW FILTERING: role cannot be restricted by more than one relation if it includes an Equal
    at com.datastax.spark.connector.rdd.CassandraRDD.createStatement(CassandraRDD.scala:310)
    at com.datastax.spark.connector.rdd.CassandraRDD.com$datastax$spark$connector$rdd$CassandraRDD$$fetchTokenRange(CassandraRDD.scala:317)
    at com.datastax.spark.connector.rdd.CassandraRDD$$anonfun$13.apply(CassandraRDD.scala:338)
    at com.datastax.spark.connector.rdd.CassandraRDD$$anonfun$13.apply(CassandraRDD.scala:338)
    at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
    at com.datastax.spark.connector.util.CountingIterator.hasNext(CountingIterator.scala:10)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
    at scala.collection.Iterator$class.foreach(Iterator.scala:727)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
    at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
    at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
    at scala.collection.AbstractIterator.to(Iterator.scala:1157)
    at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
    at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
    at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
    at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
    at org.apache.spark.rdd.RDD$$anonfun$4.apply(RDD.scala:608)
    at org.apache.spark.rdd.RDD$$anonfun$4.apply(RDD.scala:608)
    at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:884)
    at org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:884)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:109)
    at org.apache.spark.scheduler.Task.run(Task.scala:53)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:205)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:745)
14/09/23 13:46:53 ERROR scheduler.TaskSetManager: Task 0.0:0 failed 1 times; aborting job
14/09/23 13:46:53 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool 
14/09/23 13:46:53 INFO scheduler.DAGScheduler: Failed to run toArray at App.java:65
Exception in thread "main" org.apache.spark.SparkException: Job aborted: Task 0.0:0 failed 1 times (most recent failure: Exception failure: java.io.IOException: Exception during preparation of SELECT "role", "id", "fname", "lname" FROM "tester"."empbyrole" WHERE token("role") > -5709068081826432029 AND token("role") <= -5491279024053142424 AND role=? ALLOW FILTERING: role cannot be restricted by more than one relation if it includes an Equal)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1020)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1018)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$abortStage(DAGScheduler.scala:1018)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:604)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:604)
    at scala.Option.foreach(Option.scala:236)
    at org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:604)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$$anon$2$$anonfun$receive$1.applyOrElse(DAGScheduler.scala:190)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
    at akka.actor.ActorCell.invoke(ActorCell.scala:456)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
    at akka.dispatch.Mailbox.run(Mailbox.scala:219)
    at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
14/09/23 13:46:53 INFO cql.CassandraConnector: Disconnected from Cassandra cluster: Test Cluster

I know that my error is specifically at this section:

JavaRDD<String> rdd = javaFunctions(sc).cassandraTable("tester", "empbyrole", Person.class)
                .where("role=?", "IT Engineer").map(new Function<Person, String>() {
                    @Override
                    public String call(Person person) throws Exception {
                        return person.toString();
                    }
                });

When I remove the .where(), it works. But it says specifically on github that you should be able to execute .where and .map functions respectively. Does anyone have any type of reasoning for this? or solution? Thanks.

edit i get the error to go away when i use this statement instead:

JavaRDD<String> rdd = javaFunctions(sc).cassandraTable("tester", "empbyrole", Person.class)
                .where("id=?", "1").map(new Function<Person, String>() {
                    @Override
                    public String call(Person person) throws Exception {
                        return person.toString();
                    }
                });

I have no idea why this option works but not the rest of my variations. Here are the statements i ran in my cql so that you know what my keyspace looks like:

    session.execute("DROP KEYSPACE IF EXISTS tester");
    session.execute("CREATE KEYSPACE tester WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 3}");
    session.execute("CREATE TABLE tester.emp (id INT PRIMARY KEY, fname TEXT, lname TEXT, role TEXT)");
    session.execute("CREATE TABLE tester.empByRole (id INT, fname TEXT, lname TEXT, role TEXT, PRIMARY KEY (role,id))");
    session.execute("CREATE TABLE tester.dept (id INT PRIMARY KEY, dname TEXT)");       

    session.execute(
              "INSERT INTO tester.emp (id, fname, lname, role) " +
              "VALUES (" +
                  "0001," +
                  "'Angel'," +
                  "'Pay'," +
                  "'IT Engineer'" +
                  ");");
    session.execute(
              "INSERT INTO tester.emp (id, fname, lname, role) " +
              "VALUES (" +
                  "0002," +
                  "'John'," +
                  "'Doe'," +
                  "'IT Engineer'" +
                  ");");
    session.execute(
              "INSERT INTO tester.emp (id, fname, lname, role) " +
              "VALUES (" +
                  "0003," +
                  "'Jane'," +
                  "'Doe'," +
                  "'IT Analyst'" +
                  ");");
    session.execute(
          "INSERT INTO tester.empByRole (id, fname, lname, role) " +
          "VALUES (" +
              "0001," +
              "'Angel'," +
              "'Pay'," +
              "'IT Engineer'" +
              ");");
    session.execute(
              "INSERT INTO tester.empByRole (id, fname, lname, role) " +
              "VALUES (" +
                  "0002," +
                  "'John'," +
                  "'Doe'," +
                  "'IT Engineer'" +
                  ");");
    session.execute(
              "INSERT INTO tester.empByRole (id, fname, lname, role) " +
              "VALUES (" +
                  "0003," +
                  "'Jane'," +
                  "'Doe'," +
                  "'IT Analyst'" +
                  ");");
        session.execute(
              "INSERT INTO tester.dept (id, dname) " +
              "VALUES (" +
                  "1553," +
                  "'Commerce'" +
                  ");");

解决方案

The where method adds ALLOW FILTERING to your query under the covers. This is not a magic bullet, as it still doesn't support arbitrary fields as query predicates. In general, the field must either be indexed or a clustering column. If this isn't practical for your data model, you can simply use the filter method on the RDD. The downside is that the filter takes place in Spark and not in Cassandra.

So the id field works because it's supported in a CQL WHERE clause, whereas I'm assuming role is just a regular field. Please note that I am NOT suggesting that you index your field or change it to a clustering column, as I don't know your data model.

这篇关于星火Datastax的Java API Select语句的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆