如何在触发器中为其他Cassandra表填充突变 [英] How to populate a Mutation for a different Cassandra table in a trigger

查看:132
本文介绍了如何在触发器中为其他Cassandra表填充突变的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试实现Cassandra触发器,以便在keyspace1.tableA上进行更新或删除时,触发器将向keyspace1.tableB添加一行。

I'm trying to implement a Cassandra trigger such that when there is an update or a delete on keyspace1.tableA, the trigger will add a row to keyspace1.tableB.

表B中的列名称与表A中的列完全不同。

The names of the columns in tableB are completely different than the columns in tableA.

我正在使用Cassandra 2.1,没有选择迁移到较新版本的选择版。在 https://github.com/apache/cassandra/blob/cassandra-2.1/examples/triggers/src/org/apache/cassandra/triggers/InvertedIndex.java 我可以看到添加的基础知识突变:

I'm working with Cassandra 2.1, no option to move to a more recent version. Looking at the InvertedIndex trigger example at https://github.com/apache/cassandra/blob/cassandra-2.1/examples/triggers/src/org/apache/cassandra/triggers/InvertedIndex.java I can see the basics of adding a mutation:

来自InvertedIndex示例:

From the InvertedIndex example:

    for (Cell cell : update)
    {
        // Skip the row marker and other empty values, since they lead to an empty key.
        if (cell.value().remaining() > 0)
        {
            Mutation mutation = new Mutation(properties.getProperty("keyspace"), cell.value());
            mutation.add(properties.getProperty("columnfamily"), cell.name(), key, System.currentTimeMillis());
            mutations.add(mutation);
        }
    }

挑战在于,在此示例中,单元名称被传递给mutation.add的是cell.name(),它是一个现有的对象,我们可以使用该函数使用它的名字。

The challenge is that in this example, the cell name being passed to mutation.add is cell.name() which is an existing object whose name we can just take using that function.

现在,我只是在尝试存储对表A进行更改的时间,因此表B有两列:

For now, I'm just trying to store the time a change was made to tableA, so tableB has two columns:


  • changetime timeuuid

  • operation text

我需要添加一个变量,该变量将在tableB中添加一行,其中包含更改时间和执行的操作。我如何在Cassandra 2.1.12中添加这样的行突变?

I need to add a mutation that will add a row to tableB with the changetime and operation performed. How can I add such a row mutation in Cassandra 2.1.12?

我已经尝试过了,但是在触发器中却得到了空指针异常:

I have tried this but I am getting a null pointer exception in the trigger:

...
String keycol = "changetime";
ByteBuffer uuidKey = ByteBuffer.wrap(UUIDGen.getTimeUUIDBytes()); 
ColumnIdentifier ci = new ColumnIdentifier(keycol, false);
CellName cn = CellNames.simpleSparse(ci);
mutation = new Mutation(keyspace, uuidKey);
mutation.add(tableName,cn, uuidKey, System.currentTimeMillis());
...

任何帮助将不胜感激-我不知道Cassandra内部构造

Any assistance would be much appreciated - I have no knowledge of Cassandra internals so no amount of detail is too much information.

推荐答案

答案是使用CFMetaData比较器创建Mutation所需的CellName。 。加(...)。为了提供具体的示例,我将使用Cassandra 3.0 AuditTrigger中的架构和示例,该架构和示例位于 https://github.com/apache/cassandra/tree/cassandra-3.0/examples/triggers

The answer is to use the CFMetaData comparator to create the CellName needed by Mutation.add(...). To provide a specific example I'll use the schema and example from the Cassandra 3.0 AuditTrigger available at https://github.com/apache/cassandra/tree/cassandra-3.0/examples/triggers

在这种情况下,我们将写入定义如下的test.audit表:

In this case, the table we will be writing to is the test.audit table defined as follows:

CREATE TABLE test.audit (key timeuuid, keyspace_name text,
    table_name text, primary_key text, PRIMARY KEY(key));

此表具有一个名为 key的分区键,并且没有群集列。有关定义,请参见 https://cassandra.apache.org/doc/cql3/CQL。 html#createTableStmt ,分区键和集群列部分。

This table has a partition key named "key" and no clustering columns. For definitions see https://cassandra.apache.org/doc/cql3/CQL.html#createTableStmt, section "Partition key and clustering columns).

需要注意这一点,因为对makeCellName的调用(我们将在示例中看到)后面的代码)获取变量的可变列表,其中每个变量是我们希望相应的聚类列用于将受影响的行的值,最后一个参数是文本格式的列的名称。

This is important to note because the call to makeCellName (which we will see in the sample code that follows) takes a variable list of arguments, where each argument is the value we want the corresponding clustering column to take for the row that will be affected, and the last argument is the name of the column in text format.

当没有聚类列时(在此模式中就是这种情况),那么对makeCellName的调用将使用单个参数:列的名称。

When there are no clustering columns (as is the case in this schema), then the call to makeCellName takes a single argument: the name of the column.

将所有内容放在一起,与3.0示例相同的Cassandra 2.1的AuditTrigger函数如下所示:

Putting all this together, the AuditTrigger function for Cassandra 2.1 that does the same thing as the 3.0 example looks like this:

public Collection<Mutation> augment(ByteBuffer key, ColumnFamily update)
{
    CFMetaData cfm = update.metadata();

    List<Mutation> mutations = new ArrayList<>(update.getColumnCount());

    String keyspaceName = "";
    String tableName    = "";
    String keyStr       = "";

    keyspaceName = cfm.ksName;
    tableName = cfm.cfName;

    try {
        keyStr = ByteBufferUtil.string(key);
    } catch (CharacterCodingException e) {
        StringWriter errors = new StringWriter();
        e.printStackTrace(new PrintWriter(errors));
        logger.error(errors.toString());
    }

    for (Cell cell : update)
    {
        // Skip the row marker and other empty values, since they lead to an empty key.
        if (cell.value().remaining() > 0)
        {
            CFMetaData other = Schema.instance.getCFMetaData("test","audit");
            CellNameType cnt = other.comparator;

            ByteBuffer auditkey = UUIDType.instance.decompose(UUIDGen.getTimeUUID());

            // create CellName objects for each of the columns in the audit table row we are inserting
            CellName primaryKeyCellName = cnt.makeCellName("primary_key");
            CellName keyspaceCellName = cnt.makeCellName("keyspace_name");
            CellName tableCellName = cnt.makeCellName("table_name");

            try {
                // put the values we want to write to the audit table into ByteBuffer objects
                ByteBuffer ksvalbb,tablevalbb,keyvalbb;
                ksvalbb=ByteBuffer.wrap(keyspaceName.getBytes("UTF8"));
                tablevalbb=ByteBuffer.wrap(tableName.getBytes("UTF8"));
                keyvalbb=ByteBuffer.wrap(keyStr.getBytes("UTF8"));

                // create the mutation object
                Mutation mutation = new Mutation(keyspaceName, auditkey);

                // get the time which will be needed for the call to mutation.add
                long mutationTime=System.currentTimeMillis();

                // add each of the column values to the mutation
                mutation.add("audit", primaryKeyCellName, keyvalbb,  mutationTime);
                mutation.add("audit", keyspaceCellName,  ksvalbb,  mutationTime);
                mutation.add("audit", tableCellName, tablevalbb,  mutationTime);

                mutations.add(mutation);
            } catch (UnsupportedEncodingException e) {
                StringWriter errors = new StringWriter();
                e.printStackTrace(new PrintWriter(errors));
                logger.error(errors.toString());
            }
        }
    }
    return mutations;
} 

这篇关于如何在触发器中为其他Cassandra表填充突变的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆