Cassandra:如何使用 CQL 插入具有良好性能的新宽行 [英] Cassandra: How to insert a new wide row with good performance using CQL

查看:12
本文介绍了Cassandra:如何使用 CQL 插入具有良好性能的新宽行的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在评估 cassandra.我正在使用 datastax 驱动程序和 CQL.

I am evaluating cassandra. I am using the datastax driver and CQL.

我想用以下内部结构存储一些数据,其中每次更新的名称都不同.

I would like to store some data with the following internal structure, where the names are different for each update.

+-------+-------+-------+-------+-------+-------+
|       | name1 | name2 | name3 | ...   | nameN |
| time  +-------+-------+-------+-------+-------+
|       | val1  | val2  | val3  | ...   | valN  |
+-------+-------+-------+-------|-------+-------+

所以时间应该是列键,名称应该是行键.我用来创建这个表的 CQL 语句是:

So time should be the column key, and name should be the row key. The CQL statement I use to create this table is:

CREATE TABLE IF NOT EXISTS test.wide (
  time varchar,
  name varchar,
  value varchar,
  PRIMARY KEY (time,name))
  WITH COMPACT STORAGE

为了便于查询,我希望架构采用这种方式.我还必须偶尔存储超过 65000 行的更新.所以使用 cassandra list/set/map 数据类型不是一个选项.

I want the schema to be this way for ease of querying. I also have to occasionally store updates with more than 65000 rows. So using the cassandra list/set/map data types is not an option.

我必须能够每秒处理至少 1000 次宽行插入,并且名称/值对的数量变化很大(~1000).

I have to be able to handle at least 1000 wide row inserts per second, with a varying but large (~1000) number of name/value pairs.

问题如下:我编写了一个简单的基准测试,它执行 1000 个宽行插入,每行插入 10000 个名称/值对.我使用 CQL 和 datastax 驱动程序的性能很慢,而不使用 CQL 的版本(使用 astyanax)在同一测试集群上具有良好的性能.

The problem is the following: I have written a simple benchmark that does 1000 wide row inserts of 10000 name/value pairs each. I am getting very slow performance with CQL and the datastax driver, whereas the version that does not use CQL (using astyanax) has good performance on the same test cluster.

我已阅读此相关问题,并且在这个问题的公认答案中表明您应该能够使用 batch 原子地快速创建一个新的宽行准备好的语句,在 cassandra 2 中可用.

I have read this related question, and in the accepted answer of this question suggests that you should be able to atomically and quickly create a new wide row by using batch prepared statements, which are available in cassandra 2.

所以我尝试使用这些,但我的性能仍然很慢(对于在本地主机上运行的小型三节点集群,每秒插入两次).我是否遗漏了一些明显的东西,还是必须使用较低级别的 thrift API?我在 astyanax 中使用 ColumnListMutation 实现了相同的插入,并且每秒获得大约 30 次插入.

So I tried using those, but I still get slow performance (two inserts per second for a small three-node cluster running on localhost). Am I missing something obvious, or do I have to use the lower level thrift API? I have implemented the same insert with a ColumnListMutation in astyanax, and I get about 30 inserts per second.

如果我必须使用较低级别的 thrift API:

If I have to use the lower level thrift API:

  • 它实际上已被弃用,还是因为它的级别较低而使用起来不方便?

  • is it actually deprecated, or is it just inconvenient to use because it is lower level?

我能用 CQL 查询用 thrift api 创建的表吗?

will I be able to query a table created with the thrift api with CQL?

以下是 Scala 中的一个自包含代码示例.它只是创建一个批处理语句,用于插入 10000 列的宽行,并重复乘以插入性能.

Below is a self-contained code example in scala. It simply creates a batch statement for inserting a wide row with 10000 columns and times the insertion performance repeatedly.

我使用了 BatchStatement 的选项和一致性级别,但没有什么能让我获得更好的性能.

I played with the options of BatchStatement and with the consistency level, but nothing could get me better performance.

我唯一的解释是,尽管批处理由准备好的语句组成,但条目是逐行添加的.

The only explanation I have is that despite the batch consisting of prepared statements, the entries are added to the row one by one.

package cassandra

import com.datastax.driver.core._

object CassandraTestMinimized extends App {

  val keyspace = "test"
  val table = "wide"
  val tableName = s"$keyspace.$table"

  def createKeyspace = s"""
CREATE KEYSPACE IF NOT EXISTS ${keyspace}
WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 }
"""

  def createWideTable = s"""
CREATE TABLE IF NOT EXISTS ${tableName} (
time varchar,
name varchar,
value varchar,
PRIMARY KEY (time,name))
WITH COMPACT STORAGE
"""

  def writeTimeNameValue(time: String) = s"""
INSERT INTO ${tableName} (time, name, value)
VALUES ('$time', ?, ?)
"""

  val cluster = Cluster.builder.addContactPoints("127.0.0.1").build
  val session = cluster.connect()

  session.execute(createKeyspace)
  session.execute(createWideTable)

  for(i<-0 until 1000) {
    val entries =
      for {
        i <- 0 until 10000
        name = i.toString
        value = name
      } yield name -> value
    val batchPreparedStatement = writeMap(i, entries)
    val t0 = System.nanoTime()
    session.execute(batchPreparedStatement)
    val dt = System.nanoTime() - t0
    println(i + " " + (dt/1.0e9))
  }

  def writeMap(time: Long, update: Seq[(String, String)]) : BatchStatement = {
    val template = session
      .prepare(writeTimeNameValue(time.toString))
      .setConsistencyLevel(ConsistencyLevel.ONE)
    val batch = new BatchStatement(BatchStatement.Type.UNLOGGED)
    for ((k, v) <- update)
      batch.add(template.bind(k, v))
    batch
  }
}

<小时>

这是 astyanax 代码(修改自 astyanax 示例),它基本上以 15 倍的速度完成相同的事情.请注意,这也不使用异步调用,因此这是一个公平的比较.这要求列族已经存在,因为我还没有弄清楚如何使用 astyanax 创建它,并且示例中没有任何用于创建列族的代码.


Here is the astyanax code (modified from an astyanax example) that does essentially the same thing 15 times faster. Note that this also does not use asynchronous calls so it is a fair comparison. This requires the column family to already exist, since I did not yet figure out how to create it using astyanax and the example did not have any code for creating the columnfamily.

package cassandra;

import java.util.Iterator;

import com.netflix.astyanax.ColumnListMutation;
import com.netflix.astyanax.serializers.AsciiSerializer;
import com.netflix.astyanax.serializers.LongSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.netflix.astyanax.AstyanaxContext;
import com.netflix.astyanax.Keyspace;
import com.netflix.astyanax.MutationBatch;
import com.netflix.astyanax.connectionpool.NodeDiscoveryType;
import com.netflix.astyanax.connectionpool.OperationResult;
import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
import com.netflix.astyanax.connectionpool.impl.ConnectionPoolConfigurationImpl;
import com.netflix.astyanax.connectionpool.impl.CountingConnectionPoolMonitor;
import com.netflix.astyanax.impl.AstyanaxConfigurationImpl;
import com.netflix.astyanax.model.Column;
import com.netflix.astyanax.model.ColumnFamily;
import com.netflix.astyanax.model.ColumnList;
import com.netflix.astyanax.thrift.ThriftFamilyFactory;

public class AstClient {
    private static final Logger logger = LoggerFactory.getLogger(AstClient.class);

    private AstyanaxContext<Keyspace> context;
    private Keyspace keyspace;
    private ColumnFamily<Long, String> EMP_CF;
    private static final String EMP_CF_NAME = "employees2";

    public void init() {
        logger.debug("init()");

        context = new AstyanaxContext.Builder()
                .forCluster("Test Cluster")
                .forKeyspace("test1")
                .withAstyanaxConfiguration(new AstyanaxConfigurationImpl()
                        .setDiscoveryType(NodeDiscoveryType.RING_DESCRIBE)
                )
                .withConnectionPoolConfiguration(new ConnectionPoolConfigurationImpl("MyConnectionPool")
                        .setPort(9160)
                        .setMaxConnsPerHost(1)
                        .setSeeds("127.0.0.1:9160")
                )
                .withAstyanaxConfiguration(new AstyanaxConfigurationImpl()
                        .setCqlVersion("3.0.0")
                        .setTargetCassandraVersion("2.0.5"))
                .withConnectionPoolMonitor(new CountingConnectionPoolMonitor())
                .buildKeyspace(ThriftFamilyFactory.getInstance());

        context.start();
        keyspace = context.getClient();

        EMP_CF = ColumnFamily.newColumnFamily(
                EMP_CF_NAME,
                LongSerializer.get(),
                AsciiSerializer.get());
    }

    public void insert(long time) {
        MutationBatch m = keyspace.prepareMutationBatch();

        ColumnListMutation<String> x =
                m.withRow(EMP_CF, time);
        for(int i=0;i<10000;i++)
            x.putColumn(Integer.toString(i), Integer.toString(i));

        try {
            @SuppressWarnings("unused")
            Object result = m.execute();
        } catch (ConnectionException e) {
            logger.error("failed to write data to C*", e);
            throw new RuntimeException("failed to write data to C*", e);
        }
        logger.debug("insert ok");
    }

    public void createCF() {
    }

    public void read(long time) {
        OperationResult<ColumnList<String>> result;
        try {
            result = keyspace.prepareQuery(EMP_CF)
                    .getKey(time)
                    .execute();

            ColumnList<String> cols = result.getResult();
            // process data

            // a) iterate over columsn
            for (Iterator<Column<String>> i = cols.iterator(); i.hasNext(); ) {
                Column<String> c = i.next();
                String v = c.getStringValue();
                System.out.println(c.getName() + " " + v);
            }

        } catch (ConnectionException e) {
            logger.error("failed to read from C*", e);
            throw new RuntimeException("failed to read from C*", e);
        }
    }

    public static void main(String[] args) {
        AstClient c = new AstClient();
        c.init();
        long t00 = System.nanoTime();
        for(int i=0;i<1000;i++) {
            long t0 = System.nanoTime();
            c.insert(i);
            long dt = System.nanoTime() - t0;
            System.out.println((1.0e9/dt) + " " + i);
        }
        long dtt = System.nanoTime() - t00;

        c.read(0);
        System.out.println(dtt / 1e9);
    }

}

更新:我在 cassandra-user 邮件列表.在进行大的宽行插入时,CQL 似乎存在性能问题.有一张票 CASSANDRA-6737 来跟踪这个问题.

Update: I found this thread on the cassandra-user mailing list. It seems that there is a performance problem with CQL when doing large wide row inserts. There is a ticket CASSANDRA-6737 to track this issue.

Update2:我已经试用了 CASSANDRA-6737 附带的补丁,我可以确认这个补丁完全解决了这个问题.感谢 DataStax 的 Sylvain Lebresne 如此迅速地解决了这个问题!

Update2: I have tried out the patch that is attached to CASSANDRA-6737, and I can confirm that this patch completely fixes the issue. Thanks to Sylvain Lebresne from DataStax for fixing this so quickly!

推荐答案

您不是唯一经历过这种情况的人.不久前我写了一篇博文,更多地关注 CQL 和 thrift 之间的转换,但是有一些链接指向看到同样事情的人的邮件列表问题(宽行插入的性能问题是我调查的最初动机):http://thelastpickle.com/blog/2013/09/13/CQL3-to-Astyanax-Compatibility.html

You are not the only person to experience this. I wrote a blog post a while ago focused more on conversion between CQL and thrift, but there are links to mail list issues of folks seeing the same thing (the performance issue of wide-row inserts were my initial motivations for investigating): http://thelastpickle.com/blog/2013/09/13/CQL3-to-Astyanax-Compatibility.html

总而言之 - CQL 非常适合为 Cassandra 新手减轻打字和理解数据模型的负担.DataStax 驱动程序编写得很好,包含许多有用的功能.

In sum - CQL is great for removing the burdens of dealing with typing and understanding the data model for folks new to Cassandra. The DataStax driver is well written and contains lots of useful features.

但是,对于宽行插入,Thrift API 的速度要快一些.Netflix 博客并没有过多地涉及这个特定的用例.此外,只要人们在使用 Thrift API(很多人都在使用),它就不是遗留的.这是一个 ASF 项目,因此不由任何单一供应商运行.

However, the Thrift API is more than slightly faster for wide row inserts. The Netflix blog does not go in to this specific use case so much. Further, the Thrift API is not legacy so long as people are using it (many folks are). It's an ASF project and as such is not run by any single vendor.

一般来说,对于任何基于 Cassandra 的应用程序,如果您找到一种满足(或经常超过)工作负载性能要求的方法,请坚持下去.

In general, with any Cassandra-based application, if you find a way of doing something that meets (or often exceeds) the performance requirements of your workload, stick with it.

这篇关于Cassandra:如何使用 CQL 插入具有良好性能的新宽行的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆