与卡桑德拉输入/输出星火 [英] Spark with Cassandra input/output

查看:231
本文介绍了与卡桑德拉输入/输出星火的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

照片下面塞纳里奥:火花的应用程序(Java实现)使用卡桑德拉数据库加载,转换为RDD和处理数据。还应用程序被蒸从其也由定制接收器处理的数据库中的新的数据。流传输过程的输出被存储在数据库中。实现从与数据库集成使用Spring数据卡珊德拉。

Picture the following senario: A Spark application (Java implementation) is using Cassandra database to load, convert to RDD and process the data. Also the application is steaming new data from the database which are also processed by a custom receiver. The output of the streaming process is stored in the database. The implementation is using Spring Data Cassandra from the integration with the database.

CassandraConfig:

CassandraConfig:

@Configuration
@ComponentScan(basePackages = {"org.foo"})
@PropertySource(value = { "classpath:cassandra.properties" })
public class CassandraConfig {

    @Autowired
    private Environment env;

    @Bean
    public CassandraClusterFactoryBean cluster() {
        CassandraClusterFactoryBean cluster = new CassandraClusterFactoryBean();
        cluster.setContactPoints(env.getProperty("cassandra.contactpoints"));
        cluster.setPort(Integer.parseInt(env.getProperty("cassandra.port")));

        return cluster;
    }

    @Bean
    public CassandraMappingContext mappingContext() {
        return new BasicCassandraMappingContext();
    }

    @Bean
    public CassandraConverter converter() {
        return new MappingCassandraConverter(mappingContext());
    }

    @Bean
    public CassandraSessionFactoryBean session() throws Exception {
        CassandraSessionFactoryBean session = new CassandraSessionFactoryBean();
        session.setCluster(cluster().getObject());
        session.setKeyspaceName(env.getProperty("cassandra.keyspace"));
        session.setConverter(converter());
        session.setSchemaAction(SchemaAction.NONE);

        return session;
    }

    @Bean
    public CassandraOperations cassandraTemplate() throws Exception {
        return new CassandraTemplate(session().getObject());
    }

}

DataProcessor.main方式:

DataProcessor.main method:

// Initialize spring application context
ApplicationContext applicationContext = new AnnotationConfigApplicationContext(CassandraConfig.class);
ApplicationContextHolder.setApplicationContext(applicationContext);
CassandraOperations cassandraOperations = applicationContext.getBean(CassandraOperations.class);
// Initialize spark context
SparkConf conf = new SparkConf().setAppName("test-spark").setMaster("local[2]");
JavaSparkContext sc = new JavaSparkContext(conf);

// Load data pages
List<Event> pagingResults = cassandraOperations.select("select * from event where event_type = 'event_type1' order by creation_time desc limit " + DATA_PAGE_SIZE, Event.class);
// Parallelize the first page
JavaRDD<Event> rddBuffer = sc.parallelize(pagingResults);

while(pagingResults != null && !pagingResults.isEmpty()) {
    Event lastEvent = pagingResults.get(pagingResults.size() - 1);
    pagingResults = cassandraOperations.select("select * from event where event_type = 'event_type1' and creation_time < " + lastEvent.getPk().getCreationTime() + " order by creation_time desc limit " + DATA_PAGE_SIZE, Event.class);
    // Parallelize page and add to the existing
    rddBuffer = rddBuffer.union(sc.parallelize(pagingResults));
}

// data processing
...

据预计将有数据的一个大的量的初始加载。由于这个原因,数据分页,加载并分布在rddBuffer

It is expected to have a big amount of data for the initial loading. For this reason the data are paginated, loaded and distributed in rddBuffer.

也有以下选项:


  1. 火花卡桑德拉例子(<一个href=\"https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/CassandraCQLTest.scala\">https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/CassandraCQLTest.scala),虽然存在用于该实施例的文档的最小量。

  2. 的卡利奥普项目( http://tuplejump.github.io/calliope/

  1. The Spark-Cassandra example (https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/CassandraCQLTest.scala), although there is minimum amount of documentation for this example.
  2. The Calliope project (http://tuplejump.github.io/calliope/)

我想知道什么是集成星火与卡桑德拉的最佳实践。这将是我在执行遵循最佳选择?

I would like to know what is the best practice for the integration of Spark with Cassandra. What would be the best option to follow in my implementation?

阿帕奇星火1.0.0,Apache的卡桑德拉2.0.8

Apache Spark 1.0.0, Apache Cassandra 2.0.8

推荐答案

与卡桑德拉和星火工作的最简单的方法是使用官方的开源驱动卡桑德拉由DataStax开发星火:的 https://github.com/datastax/spark-cassandra-connector

The easiest way to work with Cassandra and Spark is to use the official open source Cassandra driver for Spark developed by DataStax: https://github.com/datastax/spark-cassandra-connector

该司机已建成卡桑德拉Java驱动程序之上,提供卡桑德拉和星火之间的直接桥梁。不像卡利奥普,它不使用Hadoop的接口。此外,它具有以下独特的功能:

This driver has been built on top of Cassandra Java Driver and provides a direct bridge between Cassandra and Spark. Unlike Calliope, it does not use the Hadoop interface. Additionally it offers the following unique features:


  • 所有卡桑德拉数据类型,包括集合的支持,开箱即用

  • 卡桑德拉行自定义类或元组轻量级的映射,而无需使用任何implicits或其他先进的功能,在斯卡拉

  • 保存任何RDDS卡桑德拉

  • 的卡桑德拉的虚拟节点全力支持

  • 要过滤能力/选择在服务器端,例如利用卡桑德拉集群列或辅助索引

这篇关于与卡桑德拉输入/输出星火的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆