如何连接火花与卡桑德拉流? [英] how to connect spark streaming with cassandra?

查看:203
本文介绍了如何连接火花与卡桑德拉流?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我用

Cassandra v2.1.12
Spark v1.4.1
Scala 2.10

和Cassandra是监听

and cassandra is listening on

rpc_address:127.0.1.1
rpc_port:9160

例如,连接卡夫卡和火花流,一边听每4秒卡夫卡,我有以下的火花工作

For example, to connect kafka and spark-streaming, while listening to kafka every 4 seconds, I have the following spark job

sc = SparkContext(conf=conf)
stream=StreamingContext(sc,4)
map1={'topic_name':1}
kafkaStream = KafkaUtils.createStream(stream, 'localhost:2181', "name", map1)

和火花流保持侦听卡夫卡经纪人每4秒,输出内容。

And spark-streaming keeps listening to kafka broker every 4 seconds and outputs the contents.

同样,我要流火花听卡桑德拉和输出指定表的内容,每例如4秒。

如何转换上述流code,使其与卡桑德拉,而不是卡夫卡?工作

我可以明显地继续运行在一个无限循环的查询,但事实并非如此流吧?

火花的工作:

from __future__ import print_function
import time
import sys

from random import random
from operator import add
from pyspark.streaming import StreamingContext
from pyspark import SparkContext,SparkConf
from pyspark.sql import SQLContext
from pyspark.streaming import *

sc = SparkContext(appName="sparkcassandra")
while(True):
    time.sleep(5)
    sqlContext = SQLContext(sc)
    stream=StreamingContext(sc,4)
    lines = stream.socketTextStream("127.0.1.1", 9160)
    sqlContext.read.format("org.apache.spark.sql.cassandra")\
                 .options(table="users", keyspace="keyspace2")\
                 .load()\
                 .show()

运行这样

sudo ./bin/spark-submit --packages \
datastax:spark-cassandra-connector:1.4.1-s_2.10 \
examples/src/main/python/sparkstreaming-cassandra2.py

和我得到的表中的值这rougly看起来像

and I get the table values which rougly looks like

lastname|age|city|email|firstname

那么什么是正确的方式,从卡桑德拉流的数据?

推荐答案

目前的正确的方式从C *流数据不是从C *流数据:)相反,它通常更有道理有你的消息队列(如卡夫卡)在C *的前面和流过的这一点。 C *不容易支持增量表读但如果聚集键基于插入时可以做到这一点。

Currently the "Right Way" to stream data from C* is not to Stream Data from C* :) Instead it usually makes much more sense to have your message queue (like Kafka) in front of C* and Stream off of that. C* doesn't easily support incremental table reads although this can be done if the clustering key is based on insert time.

如果您有兴趣使用C *作为流源,一定要检查和评论
https://issues.apache.org/jira/browse/CASSANDRA-8844
变更数据捕获

If you are interested in using C* as a streaming source be sure to check out and comment on https://issues.apache.org/jira/browse/CASSANDRA-8844 Change Data Capture

这是最有可能你在找什么。

Which is most likely what you are looking for.

如果您实际上只是想定期读取整个表,并做一些你可能是最好的,只是有cron作业推出一批操作你真的没有反正恢复状态的方式。

If you are actually just trying to read the full table periodically and do something you may be best off with just a cron job launching a batch operation as you really have no way of recovering state anyway.

这篇关于如何连接火花与卡桑德拉流?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆