通过 ByteBuffer & 将 Java 对象序列化到 Cassandra 1.2质量标准 3 [英] Serializing Java objects to Cassandra 1.2 via ByteBuffer & CQL 3

查看:26
本文介绍了通过 ByteBuffer & 将 Java 对象序列化到 Cassandra 1.2质量标准 3的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我拼凑了下面没有做任何复杂事情的代码——只是创建一个 byte[] 变量,将它写入 Cassandra(v1.2,通过新的 Datastax CQL 库)中的 blob 字段,然后再读一遍.

I've cobbled together the below code that doesn't do anything complex -- just creates a byte[] variable, writes it into a blob field in Cassandra (v1.2, via the new Datastax CQL library), then reads it back out again.

当我放入它时它有 3 个元素长,当我读回它时它有 84 个元素长......!这意味着我实际尝试做的事情(序列化 Java 对象)失败,出现 org.apache.commons.lang.SerializationException: java.io.StreamCorruptedException: invalid stream header: 81000008 尝试时出错再次反序列化.

When I put it in it's 3 elements long, and when I read it back it's 84 elements long...! This means the thing I'm actually trying to do (serialize Java objects) fails with an org.apache.commons.lang.SerializationException: java.io.StreamCorruptedException: invalid stream header: 81000008 error when trying to deserialize again.

下面是一些演示我的问题的示例代码:

Here's some sample code that demonstrates my problem:

import java.nio.ByteBuffer;

import org.apache.commons.lang.SerializationUtils;

import com.datastax.driver.core.BoundStatement;
import com.datastax.driver.core.Cluster;
import com.datastax.driver.core.Host;
import com.datastax.driver.core.Metadata;
import com.datastax.driver.core.PreparedStatement;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.Row;
import com.datastax.driver.core.Session;

public class TestCassandraSerialization {


    private Cluster cluster;
    private Session session;

    public TestCassandraSerialization(String node) {
        connect(node);
    }

    private void connect(String node) {
        cluster = Cluster.builder().addContactPoint(node).build();
        Metadata metadata = cluster.getMetadata();
        System.out.printf("Connected to %s
", metadata.getClusterName());
        for (Host host: metadata.getAllHosts()) {
              System.out.printf("Datacenter: %s; Host: %s; Rack: %s
",
                         host.getDatacenter(), host.getAddress(), host.getRack());
        }
        session = cluster.connect();
    }

    public void setUp() {
        session.execute("CREATE KEYSPACE test_serialization WITH replication = {'class':'SimpleStrategy', 'replication_factor':1};");

        session.execute("CREATE TABLE test_serialization.test_table (id text PRIMARY KEY, data blob)");
    }

    public void tearDown() {
        session.execute("DROP KEYSPACE test_serialization");
    }

    public void insertIntoTable(String key, byte[] data) {
        PreparedStatement statement = session.prepare("INSERT INTO test_serialization.test_table (id,data) VALUES (?, ?)");
        BoundStatement boundStatement = new BoundStatement(statement);
        session.execute(boundStatement.bind(key,ByteBuffer.wrap(data)));
    }

    public byte[] readFromTable(String key) {
        String q1 = "SELECT * FROM test_serialization.test_table WHERE id = '"+key+"';";

        ResultSet results = session.execute(q1);
        for (Row row : results) {
            ByteBuffer data = row.getBytes("data");
            return data.array();
        }
        return null;
    }


    public static boolean compareByteArrays(byte[] one, byte[] two) {
        if (one.length > two.length) {
            byte[] foo = one;
            one = two;
            two = foo;
        }

        // so now two is definitely the longer array    
        for (int i=0; i<one.length; i++) {
            //System.out.printf("%d: %s	%s
", i, one[i], two[i]);
            if (one[i] != two[i]) {
                return false;
            }
        }
        return true;
    }


    public static void main(String[] args) {
        TestCassandraSerialization tester = new TestCassandraSerialization("localhost");

        try {
            tester.setUp();
            byte[] dataIn = new byte[]{1,2,3};
            tester.insertIntoTable("123", dataIn);
            byte[] dataOut = tester.readFromTable("123");

            System.out.println(dataIn);
            System.out.println(dataOut);

            System.out.println(dataIn.length); // prints "3"
            System.out.println(dataOut.length); // prints "84"

            System.out.println(compareByteArrays(dataIn, dataOut)); // prints false         

            String toSave = "Hello, world!";
            dataIn = SerializationUtils.serialize(toSave);
            tester.insertIntoTable("toSave", dataIn);
            dataOut = tester.readFromTable("toSave");

            System.out.println(dataIn.length); // prints "20"
            System.out.println(dataOut.length); // prints "104"


            // The below throws org.apache.commons.lang.SerializationException: java.io.StreamCorruptedException: invalid stream header: 81000008
            String hasLoaded = (String) SerializationUtils.deserialize(dataOut); 
            System.out.println(hasLoaded);

        } finally {
            tester.tearDown();
        }
    }
}

看起来正确的东西进入了数据库:

It looks like the right stuff makes it into the database:

cqlsh:flight_cache> select * from test_serialization.test_table;

 id     | data
--------+--------------------------------------------
    123 |                                   0x010203
 toSave | 0xaced000574000d48656c6c6f2c20776f726c6421

cqlsh:flight_cache> 

因此,在读取(而不是写入)二进制数据时,它看起来像是一个错误.谁能给我任何关于我做错了什么的指示?

So it looks like an error when reading, rather than writing, the binary data. Can anyone give me any pointers as to what I'm doing wrong?

推荐答案

问题几乎可以肯定是因为 ByteBuffer.array() 返回的数组是完整的后备数组,但数据可能只包含在其中的一部分.

The problem is almost certainly because the array returned by ByteBuffer.array() is the full backing array, but the data may only be contained within a portion of it.

返回的有效数据从 ByteBuffer.arrayOffset() 开始,长度为 ByteBuffer.remaining().要获取仅包含有效数据的字节数组,请使用 readFromTable 中的以下代码:

The valid data that is being returned starts at ByteBuffer.arrayOffset() and is of length ByteBuffer.remaining(). To get a byte array containing just the valid data use this code in readFromTable:

byte[] result = new byte[data.remaining()];
data.get(result);

然后您的数据在结果中,您可以返回该数据.

then your data is in result and you can return that.

这篇关于通过 ByteBuffer &amp; 将 Java 对象序列化到 Cassandra 1.2质量标准 3的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆