尝试使用spark streaming连接cassandra数据库时出错 [英] Error while trying to connect cassandra database using spark streaming

查看:555
本文介绍了尝试使用spark streaming连接cassandra数据库时出错的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在使用Spark流媒体,Apache kafka和Cassandra的项目。
我使用流式kafka集成。在kafka我有一个生产者使用这个配置发送数据:

I'm working in a project which uses Spark streaming, Apache kafka and Cassandra. I use streaming-kafka integration. In kafka I have a producer which sends data using this configuration:

props.put(metadata.broker.list,KafkaProperties.ZOOKEEPER) ;
props.put(bootstrap.servers,KafkaProperties.SERVER);
props.put(client.id,DemoProducer);

其中 ZOOKEEPER = localhost:2181 SERVER = localhost:9092

我可以接收它与火花,我可以消耗它。我的spark配置是:

Once I send data I can receive it with spark, and I can consume it too. My spark configuration is:

SparkConf sparkConf = new SparkConf().setAppName("org.kakfa.spark.ConsumerData").setMaster("local[4]");
sparkConf.set("spark.cassandra.connection.host", "localhost");
JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(2000));

之后,我试图将此数据存储到cassandra数据库。但是当我尝试使用这个打开会话:

After that I am trying to store this data into cassandra database. But when I try to open session using this:

CassandraConnector connector = CassandraConnector.apply(jssc.sparkContext().getConf());
Session session = connector.openSession();

我得到以下错误:

Exception in thread "main" com.datastax.driver.core.exceptions.NoHostAvailableException: All host(s) tried for query failed (tried: localhost/127.0.0.1:9042 (com.datastax.driver.core.exceptions.InvalidQueryException: unconfigured table schema_keyspaces))
at com.datastax.driver.core.ControlConnection.reconnectInternal(ControlConnection.java:220)
at com.datastax.driver.core.ControlConnection.connect(ControlConnection.java:78)
at com.datastax.driver.core.Cluster$Manager.init(Cluster.java:1231)
at com.datastax.driver.core.Cluster.getMetadata(Cluster.java:334)
at com.datastax.spark.connector.cql.CassandraConnector$.com$datastax$spark$connector$cql$CassandraConnector$$createSession(CassandraConnector.scala:182)
at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$2.apply(CassandraConnector.scala:161)
at com.datastax.spark.connector.cql.CassandraConnector$$anonfun$2.apply(CassandraConnector.scala:161)
at com.datastax.spark.connector.cql.RefCountedCache.createNewValueAndKeys(RefCountedCache.scala:36)
at com.datastax.spark.connector.cql.RefCountedCache.acquire(RefCountedCache.scala:61)
at com.datastax.spark.connector.cql.CassandraConnector.openSession(CassandraConnector.scala:70)
at org.kakfa.spark.ConsumerData.main(ConsumerData.java:80)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144)

关于cassandra,我使用默认配置:

Regarding to cassandra, I'm using default configuration:

start_native_transport: true
native_transport_port: 9042
- seeds: "127.0.0.1"
cluster_name: 'Test Cluster'
rpc_address: localhost
rpc_port: 9160
start_rpc: true

使用cqlsh localhost从命令行连接到cassandra,获取以下消息:

I can manage to connect to cassandra from the command line using cqlsh localhost, getting the following message:

Connected to Test Cluster at 127.0.0.1:9042. [cqlsh 5.0.1 | Cassandra 3.0.5 | CQL spec 3.4.0 | Native protocol v4] Use HELP for help. cqlsh> 

我也使用nodetool状态,这显示我:

I used nodetool status too, which shows me this:

http://pastebin.com/ZQ5YyDyB

对于运行cassandra,我调用 bin / cassandra -f

For running cassandra I invoke bin/cassandra -f

try (Session session = connector.openSession()) {
        System.out.println("dentro del try");
        session.execute("DROP KEYSPACE IF EXISTS test");
        System.out.println("dentro del try - 1");
        session.execute("CREATE KEYSPACE test WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1}");
        System.out.println("dentro del try - 2");
        session.execute("CREATE TABLE test.users (id TEXT PRIMARY KEY, name TEXT)");
        System.out.println("dentro del try - 3");
    }

我的pom.xml文件看起来像这样:

My pom.xml file looks like that:

<dependencies>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming_2.10</artifactId>
        <version>1.6.1</version>
    </dependency>

    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming-kafka_2.10</artifactId>
        <version>1.6.1</version>
    </dependency>

    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.10</artifactId>
        <version>1.6.1</version>
    </dependency>
    <dependency>
        <groupId>com.datastax.spark</groupId>
        <artifactId>spark-cassandra-connector-java_2.10</artifactId>
        <version>1.6.0-M1</version>
    </dependency>
    <dependency>
        <groupId>com.datastax.spark</groupId>
        <artifactId>spark-cassandra-connector_2.10</artifactId>
        <version>1.6.0-M2</version>
    </dependency>
    <dependency>
        <groupId>com.datastax.spark</groupId>
        <artifactId>spark-cassandra-connector_2.10</artifactId>
        <version>1.1.0-alpha2</version>
    </dependency>
    <dependency>
        <groupId>com.datastax.spark</groupId>
        <artifactId>spark-cassandra-connector-java_2.10</artifactId>
        <version>1.1.0-alpha2</version>
    </dependency>

    <dependency>
        <groupId>org.json</groupId>
        <artifactId>json</artifactId>
        <version>20160212</version>
    </dependency>
</dependencies>

我不知道为什么我无法使用spark连接到cassandra,我做错了?

I have no idea why I can't connect to cassandra using spark, is it configuration bad or what i am doing wrong?

谢谢!

推荐答案


com.datastax.driver.core.exceptions.InvalidQueryException:
未配置的表schema_keyspaces)

com.datastax.driver.core.exceptions.InvalidQueryException: unconfigured table schema_keyspaces)

表示具有新Cassandra版本的旧驱动程序。看着POM文件,我们发现spark-cassandra连接器的依赖性声明了两次。
一个使用版本 1.6.0-m2 (GOOD),另一个 1.1.0-alpha2

That error indicates an old driver with a new Cassandra version. Looking at the POM file, we find there the spark-cassandra-connector dependency declared twice. One uses version 1.6.0-m2 (GOOD) and the other 1.1.0-alpha2 (old).

从您的配置中删除对旧依赖项的引用 1.1.0-alpha2

Remove the references to the old dependencies 1.1.0-alpha2 from your config:

<dependency>
    <groupId>com.datastax.spark</groupId>
    <artifactId>spark-cassandra-connector_2.10</artifactId>
    <version>1.1.0-alpha2</version>
</dependency>
<dependency>
    <groupId>com.datastax.spark</groupId>
    <artifactId>spark-cassandra-connector-java_2.10</artifactId>
    <version>1.1.0-alpha2</version>
</dependency>

这篇关于尝试使用spark streaming连接cassandra数据库时出错的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆