如何连接火花与卡桑德拉流? [英] how to connect spark streaming with cassandra?
问题描述
我用
Cassandra v2.1.12
Spark v1.4.1
Scala 2.10
和Cassandra是监听
and cassandra is listening on
rpc_address:127.0.1.1
rpc_port:9160
例如,连接卡夫卡和火花流,一边听每4秒卡夫卡,我有以下的火花工作
For example, to connect kafka and spark-streaming, while listening to kafka every 4 seconds, I have the following spark job
sc = SparkContext(conf=conf)
stream=StreamingContext(sc,4)
map1={'topic_name':1}
kafkaStream = KafkaUtils.createStream(stream, 'localhost:2181', "name", map1)
和火花流保持侦听卡夫卡经纪人每4秒,输出内容。
And spark-streaming keeps listening to kafka broker every 4 seconds and outputs the contents.
同样,我要流火花听卡桑德拉和输出指定表的内容,每例如4秒。。
如何转换上述流code,使其与卡桑德拉,而不是卡夫卡?工作
我可以明显地继续运行在一个无限循环的查询,但事实并非如此流吧?
火花的工作:
from __future__ import print_function
import time
import sys
from random import random
from operator import add
from pyspark.streaming import StreamingContext
from pyspark import SparkContext,SparkConf
from pyspark.sql import SQLContext
from pyspark.streaming import *
sc = SparkContext(appName="sparkcassandra")
while(True):
time.sleep(5)
sqlContext = SQLContext(sc)
stream=StreamingContext(sc,4)
lines = stream.socketTextStream("127.0.1.1", 9160)
sqlContext.read.format("org.apache.spark.sql.cassandra")\
.options(table="users", keyspace="keyspace2")\
.load()\
.show()
运行这样
sudo ./bin/spark-submit --packages \
datastax:spark-cassandra-connector:1.4.1-s_2.10 \
examples/src/main/python/sparkstreaming-cassandra2.py
和我得到的表中的值这rougly看起来像
and I get the table values which rougly looks like
lastname|age|city|email|firstname
那么什么是正确的方式,从卡桑德拉流的数据?
推荐答案
目前的正确的方式从C *流数据不是从C *流数据:)相反,它通常更有道理有你的消息队列(如卡夫卡)在C *的前面和流过的这一点。 C *不容易支持增量表读但如果聚集键基于插入时可以做到这一点。
Currently the "Right Way" to stream data from C* is not to Stream Data from C* :) Instead it usually makes much more sense to have your message queue (like Kafka) in front of C* and Stream off of that. C* doesn't easily support incremental table reads although this can be done if the clustering key is based on insert time.
如果您有兴趣使用C *作为流源,一定要检查和评论
https://issues.apache.org/jira/browse/CASSANDRA-8844
变更数据捕获
If you are interested in using C* as a streaming source be sure to check out and comment on https://issues.apache.org/jira/browse/CASSANDRA-8844 Change Data Capture
这是最有可能你在找什么。
Which is most likely what you are looking for.
如果您实际上只是想定期读取整个表,并做一些你可能是最好的,只是有cron作业推出一批操作你真的没有反正恢复状态的方式。
If you are actually just trying to read the full table periodically and do something you may be best off with just a cron job launching a batch operation as you really have no way of recovering state anyway.
这篇关于如何连接火花与卡桑德拉流?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!