如何使用Datastax Java驱动程序的异步/批量写入功能 [英] How to use Asynchronous/Batch writes feature with Datastax Java driver

查看:710
本文介绍了如何使用Datastax Java驱动程序的异步/批量写入功能的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我计划使用Datastax Java驱动程序写入Cassandra ..我主要感兴趣批量写入 Asycnhronous Datastax java驱动程序的功能,但我不能得到任何教程,可以解释我如何将这些功能,在我下面的代码使用Datastax Java驱动程序。

I am planning to use Datastax Java driver for writing to Cassandra.. I was mainly interested in Batch Writes and Asycnhronous features of Datastax java driver but I am not able to get any tutorials which can explain me how to incorporate these features in my below code which uses Datastax Java driver..

/**
 * Performs an upsert of the specified attributes for the specified id.
 */
public void upsertAttributes(final String userId, final Map<String, String> attributes, final String columnFamily) {

    try {

        // make a sql here using the above input parameters.

        String sql = sqlPart1.toString()+sqlPart2.toString();

        DatastaxConnection.getInstance();
        PreparedStatement prepStatement = DatastaxConnection.getSession().prepare(sql);
        prepStatement.setConsistencyLevel(ConsistencyLevel.ONE);        

        BoundStatement query = prepStatement.bind(userId, attributes.values().toArray(new Object[attributes.size()]));

        DatastaxConnection.getSession().execute(query);

    } catch (InvalidQueryException e) {
        LOG.error("Invalid Query Exception in DatastaxClient::upsertAttributes "+e);
    } catch (Exception e) {
        LOG.error("Exception in DatastaxClient::upsertAttributes "+e);
    }
}

在下面的代码中, Cassandra节点使用Datastax Java驱动程序。

In the below code, I am creating a Connection to Cassandra nodes using Datastax Java driver.

/**
 * Creating Cassandra connection using Datastax Java driver
 *
 */
private DatastaxConnection() {

    try{
        builder = Cluster.builder();
        builder.addContactPoint("some_nodes");

        builder.poolingOptions().setCoreConnectionsPerHost(
                HostDistance.LOCAL,
                builder.poolingOptions().getMaxConnectionsPerHost(HostDistance.LOCAL));

        cluster = builder
                .withRetryPolicy(DowngradingConsistencyRetryPolicy.INSTANCE)
                .withReconnectionPolicy(new ConstantReconnectionPolicy(100L))
                .build();

        StringBuilder s = new StringBuilder();
        Set<Host> allHosts = cluster.getMetadata().getAllHosts();
        for (Host h : allHosts) {
            s.append("[");
            s.append(h.getDatacenter());
            s.append(h.getRack());
            s.append(h.getAddress());
            s.append("]");
        }
        System.out.println("Cassandra Cluster: " + s.toString());

        session = cluster.connect("testdatastaxks");

    } catch (NoHostAvailableException e) {
        e.printStackTrace();
        throw new RuntimeException(e);
    } catch (Exception e) {

    }
}


$ b b

任何人都可以帮助我如何添加批量写入或异步功能到我的上述代码..感谢您的帮助..

Can anybody help me on how to add Batch writes or Asynchronous features to my above code.. Thanks for the help..

我正在运行Cassandra 1.2。 9

I am running Cassandra 1.2.9

推荐答案

对于asynch来说,使用 executeAsync

For asynch it's as simple as using the executeAsync function:

...
DatastaxConnection.getSession().executeAsync(query);

对于批处理,您需要构建查询(我使用字符串,因为编译器知道如何优化字符串连接真的很好):

For the batch, you need to build the query (I use strings because the compiler knows how to optimize string concatenation really well):

String cql =  "BEGIN BATCH "
       cql += "INSERT INTO test.prepared (id, col_1) VALUES (?,?); ";
       cql += "INSERT INTO test.prepared (id, col_1) VALUES (?,?); ";
       cql += "APPLY BATCH; "

DatastaxConnection.getInstance();
PreparedStatement prepStatement = DatastaxConnection.getSession().prepare(cql);
prepStatement.setConsistencyLevel(ConsistencyLevel.ONE);        

// this is where you need to be careful
// bind expects a comma separated list of values for all the params (?) above
// so for the above batch we need to supply 4 params:                     
BoundStatement query = prepStatement.bind(userId, "col1_val", userId_2, "col1_val_2");

DatastaxConnection.getSession().execute(query);

在旁注,我认为你的语句的绑定可能看起来像这样,属性映射到映射列表,其中每个映射表示批处理内的更新/插入:

On a side note, I think your binding of the statement might look something like this, assuming you change attributes to a list of maps where each map represents an update/insert inside the batch:

BoundStatement query = prepStatement.bind(userId,
                                          attributesList.get(0).values().toArray(new Object[attributes.size()]), 
                                          userId_2,
                                          attributesList.get(1).values().toArray(new Object[attributes.size()])); 

这篇关于如何使用Datastax Java驱动程序的异步/批量写入功能的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
相关文章
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆