Spark 与 Cassandra 输入/输出 [英] Spark with Cassandra input/output

查看:18
本文介绍了Spark 与 Cassandra 输入/输出的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

想象以下场景:一个 Spark 应用程序(Java 实现)正在使用 Cassandra 数据库加载、转换为 RDD 并处理数据.此外,该应用程序正在从数据库中传输新数据,这些数据也由自定义接收器处理.流处理的输出存储在数据库中.实现是使用 Spring Data Cassandra 与数据库的集成.

Picture the following senario: A Spark application (Java implementation) is using Cassandra database to load, convert to RDD and process the data. Also the application is steaming new data from the database which are also processed by a custom receiver. The output of the streaming process is stored in the database. The implementation is using Spring Data Cassandra from the integration with the database.

CassandraConfig:

CassandraConfig:

@Configuration
@ComponentScan(basePackages = {"org.foo"})
@PropertySource(value = { "classpath:cassandra.properties" })
public class CassandraConfig {

    @Autowired
    private Environment env;

    @Bean
    public CassandraClusterFactoryBean cluster() {
        CassandraClusterFactoryBean cluster = new CassandraClusterFactoryBean();
        cluster.setContactPoints(env.getProperty("cassandra.contactpoints"));
        cluster.setPort(Integer.parseInt(env.getProperty("cassandra.port")));

        return cluster;
    }

    @Bean
    public CassandraMappingContext mappingContext() {
        return new BasicCassandraMappingContext();
    }

    @Bean
    public CassandraConverter converter() {
        return new MappingCassandraConverter(mappingContext());
    }

    @Bean
    public CassandraSessionFactoryBean session() throws Exception {
        CassandraSessionFactoryBean session = new CassandraSessionFactoryBean();
        session.setCluster(cluster().getObject());
        session.setKeyspaceName(env.getProperty("cassandra.keyspace"));
        session.setConverter(converter());
        session.setSchemaAction(SchemaAction.NONE);

        return session;
    }

    @Bean
    public CassandraOperations cassandraTemplate() throws Exception {
        return new CassandraTemplate(session().getObject());
    }

}

DataProcessor.main 方法:

DataProcessor.main method:

// Initialize spring application context
ApplicationContext applicationContext = new AnnotationConfigApplicationContext(CassandraConfig.class);
ApplicationContextHolder.setApplicationContext(applicationContext);
CassandraOperations cassandraOperations = applicationContext.getBean(CassandraOperations.class);
// Initialize spark context
SparkConf conf = new SparkConf().setAppName("test-spark").setMaster("local[2]");
JavaSparkContext sc = new JavaSparkContext(conf);

// Load data pages
List<Event> pagingResults = cassandraOperations.select("select * from event where event_type = 'event_type1' order by creation_time desc limit " + DATA_PAGE_SIZE, Event.class);
// Parallelize the first page
JavaRDD<Event> rddBuffer = sc.parallelize(pagingResults);

while(pagingResults != null && !pagingResults.isEmpty()) {
    Event lastEvent = pagingResults.get(pagingResults.size() - 1);
    pagingResults = cassandraOperations.select("select * from event where event_type = 'event_type1' and creation_time < " + lastEvent.getPk().getCreationTime() + " order by creation_time desc limit " + DATA_PAGE_SIZE, Event.class);
    // Parallelize page and add to the existing
    rddBuffer = rddBuffer.union(sc.parallelize(pagingResults));
}

// data processing
...

预计初始加载会有大量数据.为此,数据在 rddBuffer 中进行分页、加载和分发.

It is expected to have a big amount of data for the initial loading. For this reason the data are paginated, loaded and distributed in rddBuffer.

还有以下选项可用:

  1. Spark-Cassandra 示例 (https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/CassandraCQLTest.scala),尽管有最少数量的文档这个例子.
  2. Calliope 项目 (http://tuplejump.github.io/calliope/)
  1. The Spark-Cassandra example (https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/CassandraCQLTest.scala), although there is minimum amount of documentation for this example.
  2. The Calliope project (http://tuplejump.github.io/calliope/)

我想知道将 Spark 与 Cassandra 集成的最佳实践是什么.在我的实施中遵循的最佳选择是什么?

I would like to know what is the best practice for the integration of Spark with Cassandra. What would be the best option to follow in my implementation?

Apache Spark 1.0.0、Apache Cassandra 2.0.8

Apache Spark 1.0.0, Apache Cassandra 2.0.8

推荐答案

使用 Cassandra 和 Spark 的最简单方法是使用 DataStax 开发的官方开源 Cassandra 驱动程序 for Spark:https://github.com/datastax/spark-cassandra-connector

The easiest way to work with Cassandra and Spark is to use the official open source Cassandra driver for Spark developed by DataStax: https://github.com/datastax/spark-cassandra-connector

此驱动程序建立在 Cassandra Java Driver 之上,提供了 Cassandra 和 Spark 之间的直接桥梁.与 Calliope 不同,它不使用 Hadoop 接口.此外,它还提供以下独特功能:

This driver has been built on top of Cassandra Java Driver and provides a direct bridge between Cassandra and Spark. Unlike Calliope, it does not use the Hadoop interface. Additionally it offers the following unique features:

  • 开箱即用地支持所有 Cassandra 数据类型,包括集合
  • Cassandra 行到自定义类或元组的轻量级映射,无需在 Scala 中使用任何隐式或其他高级功能
  • 将任何 RDD 保存到 Cassandra
  • 完全支持 Cassandra 虚拟节点
  • 在服务器端过滤/选择的能力,例如利用 Cassandra 集群列或二级索引

这篇关于Spark 与 Cassandra 输入/输出的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆