kafka spark-streaming 数据未写入 cassandra.插入零行 [英] kafka spark-streaming data not getting written into cassandra. zero rows inserted

查看:24
本文介绍了kafka spark-streaming 数据未写入 cassandra.插入零行的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

从 spark 向 cassandra 写入数据时,没有写入数据.
闪回是:
我正在做 kafka-sparkStreaming-cassandra 集成.
我正在阅读 kafka 消息并尝试将其放入 cassandra 表 CREATE TABLE TEST_TABLE(key INT PRIMARY KEY, value TEXT).
kafka 到 spark-streaming 运行很酷,但是对于 cassandra,spark 有一些问题......数据没有写入表.
我能够创建与 cassandra 的连接,但数据没有插入到 cassandra 表中.输出显示它正在连接,下一秒断开连接.
System.out.print() 的字符串都在输出中.

While writing data to cassandra from spark, data is not getting written.
The flash back is:
I am doing a kafka-sparkStreaming-cassandra integration.
I am reading kafka messages and trying to put it in a cassandra table CREATE TABLE TEST_TABLE(key INT PRIMARY KEY, value TEXT).
kafka to spark-streaming is running cool, but spark to cassandra, there is some issue...data not getting written to table.
I am able to create a connection with cassandra, but the data is not getting inserted into the cassandra table. The output shows its getting connected and the next second getting disconnected.
The strings for System.out.print() is all at the output.

+++++++++++cassandra connector created++++++++++++++++++++++++++++
+++++++++++++streaming Connection done!+++++++++++++++++++++++++++
++++++++++++++++JavaDStream<TestTable> created++++++++++++++++++++++++++++

Cassandra shell 显示 0 行.
完整代码以及日志和依赖项如下:

Cassandra shell shows 0 rows.
the full code and the logs and dependencies are below:

public class SparkStream {
    static int key=0;
    public static void main(String args[]) throws Exception
    {

        if(args.length != 3)
        {
            System.out.println("parameters not given properly");
            System.exit(1);
        }

        Logger.getLogger("org").setLevel(Level.OFF);
        Logger.getLogger("akka").setLevel(Level.OFF);
        Map<String,Integer> topicMap = new HashMap<String,Integer>();
        String[] topic = args[2].split(",");
        for(String t: topic)
        {
            topicMap.put(t, new Integer(3));
        }

        /* Connection to Spark */
        SparkConf conf = new SparkConf();
        conf.set("spark.cassandra.connection.host", "localhost");
        JavaSparkContext sc = new JavaSparkContext("local[4]", "SparkStream",conf);
        JavaStreamingContext jssc = new JavaStreamingContext(sc, new Duration(5000));


        /* connection to cassandra */
        CassandraConnector connector = CassandraConnector.apply(sc.getConf());
        System.out.println("+++++++++++cassandra connector created++++++++++++++++++++++++++++");


        /* Receive Kafka streaming inputs */
        JavaPairReceiverInputDStream<String, String> messages = KafkaUtils.createStream(jssc, args[0], args[1], topicMap );
        System.out.println("+++++++++++++streaming Connection done!+++++++++++++++++++++++++++");


        /* Create DStream */                
        JavaDStream<TestTable> data = messages.map(new Function< Tuple2<String,String>, TestTable >() 
        {
            public TestTable call(Tuple2<String, String> message)
            {
                return new TestTable(new Integer(++key), message._2() );
            }
        }
        );
        System.out.println("++++++++++++++++JavaDStream<TestTable> created++++++++++++++++++++++++++++");


        /* Write to cassandra */
        javaFunctions(data).writerBuilder("testkeyspace", "test_table", mapToRow(TestTable.class)).saveToCassandra();


        jssc.start();
        jssc.awaitTermination();

    }
}

class TestTable implements Serializable
{
    Integer key;
    String value;

    public TestTable() {}

    public TestTable(Integer k, String v)
    {
        key=k;
        value=v;
    }

    public Integer getKey(){
        return key;
    }

    public void setKey(Integer k){
        key=k;
    }

    public String getValue(){
        return value;
    }

    public void setValue(String v){
        value=v;
    }

    public String toString(){
        return MessageFormat.format("TestTable'{'key={0}, value={1}'}'", key, value);

    }
}

日志是:

+++++++++++cassandra connector created++++++++++++++++++++++++++++
+++++++++++++streaming Connection done!+++++++++++++++++++++++++++
++++++++++++++++JavaDStream<TestTable> created++++++++++++++++++++++++++++
14/12/09 12:07:33 INFO core.Cluster: New Cassandra host localhost/127.0.0.1:9042 added
14/12/09 12:07:33 INFO cql.CassandraConnector: Connected to Cassandra cluster: Test Cluster
14/12/09 12:07:33 INFO cql.LocalNodeFirstLoadBalancingPolicy: Adding host 127.0.0.1 (datacenter1)
14/12/09 12:07:33 INFO cql.LocalNodeFirstLoadBalancingPolicy: Adding host 127.0.0.1 (datacenter1)
14/12/09 12:07:34 INFO cql.CassandraConnector: Disconnected from Cassandra cluster: Test Cluster

14/12/09 12:07:45 INFO core.Cluster: New Cassandra host localhost/127.0.0.1:9042 added
14/12/09 12:07:45 INFO cql.CassandraConnector: Connected to Cassandra cluster: Test Cluster
14/12/09 12:07:45 INFO cql.LocalNodeFirstLoadBalancingPolicy: Adding host 127.0.0.1 (datacenter1)
14/12/09 12:07:45 INFO cql.LocalNodeFirstLoadBalancingPolicy: Adding host 127.0.0.1 (datacenter1)
14/12/09 12:07:46 INFO cql.CassandraConnector: Disconnected from Cassandra cluster: Test Cluster

POM.xml 依赖项是:

The POM.xml dependencies are:

   <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming-kafka_2.10</artifactId>
        <version>1.1.0</version>
    </dependency>

    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming_2.10</artifactId>
        <version>1.1.0</version>
    </dependency>

<dependency>
    <groupId>com.datastax.spark</groupId>
    <artifactId>spark-cassandra-connector_2.10</artifactId>
    <version>1.1.0</version>
</dependency>
<dependency>
    <groupId>com.datastax.spark</groupId>
    <artifactId>spark-cassandra-connector-java_2.10</artifactId>
    <version>1.1.0</version>
</dependency>
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-core_2.10</artifactId>
    <version>1.1.1</version>
</dependency>


    <dependency>
        <groupId>com.msiops.footing</groupId>
        <artifactId>footing-tuple</artifactId>
        <version>0.2</version>
    </dependency>   

<dependency>
    <groupId>com.datastax.cassandra</groupId>
    <artifactId>cassandra-driver-core</artifactId>
    <version>2.1.3</version>
</dependency>

代码有问题吗?还是 cassandra 配置?

is there something wrong with the code? or cassandra configuration?

推荐答案

解决了这个问题.columnMapper 无法访问类 TestTable 的 getter 和 setter.因此将访问修饰符更改为 public.但现在我在一个文件中有 2 个公共课程.这是一个错误.所以创建了另一个java文件TestTable.java,类为

solved the issue. the columnMapper wasnt able to access the getters and setters of class TestTable. So changed the access modifier to public. but now i had 2 public classes in one file. which is an error. so created another java file TestTable.java with class as

public class TestTable implements Serializable { 
//code
}

现在正在从 kafka 读取消息并存储在 cassandra 表中

now the messages are being read from kafka and getting stored in cassandra table

这篇关于kafka spark-streaming 数据未写入 cassandra.插入零行的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆