Cassandra:如何使用CQL插入具有良好性能的新宽行 [英] Cassandra: How to insert a new wide row with good performance using CQL

查看:362
本文介绍了Cassandra:如何使用CQL插入具有良好性能的新宽行的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我在评估cassandra。我使用datastax驱动程序和CQL。



我想存储一些具有以下内部结构的数据,其中每个更新的名称不同。

  + ------- + ------- + ------- + ------- + ------ -  + ------- + 
| | name1 | name2 | name3 | ... | nameN |
|时间+ ------- + ------- + ------- + ------- + ------- +
| | val1 | val2 | val3 | ... | ValN |
+ ------- + ------- + ------- + ------- | ------- + ----- - +

所以时间应该是列键,名称应该是行键。我用来创建这个表的CQL语句是:

  CREATE TABLE IF NOT EXISTS test.wide(
time varchar ,
name varchar,
value varchar,
PRIMARY KEY(time,name))
WITH COMPACT STORAGE

我想让模式这样方便查询。我也不得不偶尔存储超过65000行的更新。因此,使用cassandra list / set / map数据类型不是一个选项。



我必须能够处理每秒至少1000个行插入,但是大(约1000)个名称/值对。



问题如下:我写了一个简单的基准,值对。我使用CQL和datastax驱动程序的性能非常慢,而不使用CQL(使用astyanax)的版本在同一测试集群上具有良好的性能。



I已阅读此相关问题,并在此问题的已接受答案中建议您应该能够通过使用批量准备的语句以原子方式快速创建新的宽行



所以我尝试使用这些,但是我仍然得到缓慢的性能(每秒两个插入一个小的三节点集群运行on localhost)。我缺少明显的东西,或者我必须使用较低级别的thrift API? 我在astyanax中使用ColumnListMutation实现了相同的插入,我每秒获得大约30个插入。



如果我必须使用lower level thrift API:




  • 实际上是否已被弃用,还是因为它较低级别而使用不方便?




我可以使用CQL查询thrift api创建的表吗? b

下面是一个自包含的代码示例在scala。



我使用BatchStatement的选项,并且使用一致性级别,但是它只能创建一个批处理语句,用于插入包含10000列的宽行,没有什么可以让我更好的性能。



我唯一的解释是,尽管批处理由预备语句组成,条目被一一添加到该行。 p>




 包cassandra 

import com.datastax.driver。 core._

对象CassandraTestMinimized扩展应用程序{

val keyspace =test
val table =wide
val tableName = s$ $ b

def createKeyspace = s
如果不存在则创建键列表$ {keyspace}
WITH REPLICATION = {'class':'SimpleStrategy' replication_factor':1}


def createWideTable = s
CREATE TABLE IF NOT EXISTS $ {tableName}(
time varchar,
name varchar,
value varchar,
PRIMARY KEY(time,name))
WITH COMPACT STORAGE


def writeTimeNameValue :String)= s
INSERT INTO $ {tableName}(time,name,value)
VALUES('$ time',?,?)


val cluster = Cluster.builder.addContactPoints(127.0.0.1)。build
val session = cluster.connect()

session.execute(createKeyspace)
session.execute(createWideTable)

for(i< -0到1000){
val entries =
for {
i < - 0 until 10000
name = i.toString
value = name
} yield name - > value
val batchPreparedStatement = writeMap(i,entries)
val t0 = System.nanoTime()
session.execute(batchPreparedStatement)
val dt = System.nanoTime
println(i ++(dt / 1.0e9))
}

def writeMap(time:Long,update:Seq [(String,String) BatchStatement = {
val template = session
.prepare(writeTimeNameValue(time.toString))
.setConsistencyLevel(ConsistencyLevel.ONE)
val batch = new BatchStatement(BatchStatement.Type。 UNLOGGED)
for((k,v)< - update)
batch.add(template.bind(k,v))
batch
}
}






这里是astyanax代码a href =https://github.com/Netflix/astyanax/blob/master/astyanax-examples/src/main/java/com/netflix/astyanax/examples/AstClient.java> astyanax示例)基本上做同样的事情15倍。注意,这也不使用异步调用,因此它是一个公平的比较。这需要列族已经存在,因为我还没有弄清楚如何使用astyanax创建它,并且该示例没有任何用于创建columnfamily的代码。

 包cassandra; 

import java.util.Iterator;

import com.netflix.astyanax.ColumnListMutation;
import com.netflix.astyanax.serializers.AsciiSerializer;
import com.netflix.astyanax.serializers.LongSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.netflix.astyanax.AstyanaxContext;
import com.netflix.astyanax.Keyspace;
import com.netflix.astyanax.MutationBatch;
import com.netflix.astyanax.connectionpool.NodeDiscoveryType;
import com.netflix.astyanax.connectionpool.OperationResult;
import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
import com.netflix.astyanax.connectionpool.impl.ConnectionPoolConfigurationImpl;
import com.netflix.astyanax.connectionpool.impl.CountingConnectionPoolMonitor;
import com.netflix.astyanax.impl.AstyanaxConfigurationImpl;
import com.netflix.astyanax.model.Column;
import com.netflix.astyanax.model.ColumnFamily;
import com.netflix.astyanax.model.ColumnList;
import com.netflix.astyanax.thrift.ThriftFamilyFactory;

public class AstClient {
private static final Logger logger = LoggerFactory.getLogger(AstClient.class);

private AstyanaxContext< Keyspace>上下文;
private Keyspace keyspace;
private ColumnFamily< Long,String> EMP_CF;
private static final String EMP_CF_NAME =employees2;

public void init(){
logger.debug(init());

context = new AstyanaxContext.Builder()
forforCluster(Test Cluster)
.forKeyspace(test1)
.withAstyanaxConfiguration(new AstyanaxConfigurationImpl
.setDiscoveryType(NodeDiscoveryType.RING_DESCRIBE)

.withConnectionPoolConfiguration(new ConnectionPoolConfigurationImpl(MyConnectionPool)
.setPort(9160)
.setMaxConnsPerHost(1)
.setSeeds(127.0.0.1:9160)

.withAstyanaxConfiguration(new AstyanaxConfigurationImpl()
.setCqlVersion(3.0.0)
.setTargetCassandraVersion 2.0.5))
.withConnectionPoolMonitor(new CountingConnectionPoolMonitor())
.buildKeyspace(ThriftFamilyFactory.getInstance());

context.start();
keyspace = context.getClient();

EMP_CF = ColumnFamily.newColumnFamily(
EMP_CF_NAME,
LongSerializer.get(),
AsciiSerializer.get());
}

public void insert(long time){
MutationBatch m = keyspace.prepareMutationBatch();

ColumnListMutation< String> x =
m.withRow(EMP_CF,time);
for(int i = 0; i <10000; i ++)
x.putColumn(Integer.toString(i),Integer.toString(i));

try {
@SuppressWarnings(unused)
对象result = m.execute();
} catch(ConnectionException e){
logger.error(无法将数据写入C *,e);
throw new RuntimeException(failed to write data to C *,e);
}
logger.debug(insert ok);
}

public void createCF(){$​​ b $ b}

public void read(long time){
OperationResult< ColumnList< String> >结果;
try {
result = keyspace.prepareQuery(EMP_CF)
.getKey(time)
.execute();

ColumnList< String> cols = result.getResult();
//过程数据

// a)遍历columsn
for(Iterator< Column< String>> i = cols.iterator(); i.hasNext ;){
Column< String> c = i.next();
String v = c.getStringValue();
System.out.println(c.getName()++ v);
}

} catch(ConnectionException e){
logger.error(无法读取C *,e);
throw new RuntimeException(从C *读取失败,e);
}
}

public static void main(String [] args){
AstClient c = new AstClient
c.init();
long t00 = System.nanoTime();
for(int i = 0; i <1000; i ++){
long t0 = System.nanoTime();
c.insert(i);
long dt = System.nanoTime() - t0;
System.out.println((1.0e9 / dt)++ i);
}
long dtt = System.nanoTime() - t00;

c.read(0);
System.out.println(dtt / 1e9);
}

}



更新: cassandra用户邮件列表。看来,在进行大型宽行插入时,CQL存在性能问题。有一张票 CASSANDRA-6737 可以跟踪此问题。



Update2:我试过了附加到CASSANDRA-6737的补丁,我可以确认这个补丁完全修复这个问题。感谢DataStax的Sylvain Lebresne如此快速地解决这个问题。

解决方案

我写了一个博客一段时间以前更侧重于CQL和thrift之间的转换,但有链接到邮件列表问题的人看到同样的事情(宽行插入的性能问题是我最初的动机调查):
http://thelastpickle.com/blog/2013 /09/13/CQL3-to-Astyanax-Compatibility.html



总之,CQL非常适合消除处理打字和理解的负担Cassandra的新人数据模型。 DataStax驱动程序写得很好,包含大量有用的功能。



但是,对于宽行插入,Thrift API的速度要快得多。 Netflix博客没有进入这么具体的用例。此外,Thrift API不是遗留的,只要人们使用它(很多人是)。这是一个ASF项目,因此不是由任何单一的供应商运行。



一般来说,对于任何基于Cassandra的应用程序,如果您发现一种能够满足(或经常超过)工作负载性能要求的方法,请坚持使用。


I am evaluating cassandra. I am using the datastax driver and CQL.

I would like to store some data with the following internal structure, where the names are different for each update.

+-------+-------+-------+-------+-------+-------+
|       | name1 | name2 | name3 | ...   | nameN |
| time  +-------+-------+-------+-------+-------+
|       | val1  | val2  | val3  | ...   | valN  |
+-------+-------+-------+-------|-------+-------+

So time should be the column key, and name should be the row key. The CQL statement I use to create this table is:

CREATE TABLE IF NOT EXISTS test.wide (
  time varchar,
  name varchar,
  value varchar,
  PRIMARY KEY (time,name))
  WITH COMPACT STORAGE

I want the schema to be this way for ease of querying. I also have to occasionally store updates with more than 65000 rows. So using the cassandra list/set/map data types is not an option.

I have to be able to handle at least 1000 wide row inserts per second, with a varying but large (~1000) number of name/value pairs.

The problem is the following: I have written a simple benchmark that does 1000 wide row inserts of 10000 name/value pairs each. I am getting very slow performance with CQL and the datastax driver, whereas the version that does not use CQL (using astyanax) has good performance on the same test cluster.

I have read this related question, and in the accepted answer of this question suggests that you should be able to atomically and quickly create a new wide row by using batch prepared statements, which are available in cassandra 2.

So I tried using those, but I still get slow performance (two inserts per second for a small three-node cluster running on localhost). Am I missing something obvious, or do I have to use the lower level thrift API? I have implemented the same insert with a ColumnListMutation in astyanax, and I get about 30 inserts per second.

If I have to use the lower level thrift API:

  • is it actually deprecated, or is it just inconvenient to use because it is lower level?

  • will I be able to query a table created with the thrift api with CQL?

Below is a self-contained code example in scala. It simply creates a batch statement for inserting a wide row with 10000 columns and times the insertion performance repeatedly.

I played with the options of BatchStatement and with the consistency level, but nothing could get me better performance.

The only explanation I have is that despite the batch consisting of prepared statements, the entries are added to the row one by one.


package cassandra

import com.datastax.driver.core._

object CassandraTestMinimized extends App {

  val keyspace = "test"
  val table = "wide"
  val tableName = s"$keyspace.$table"

  def createKeyspace = s"""
CREATE KEYSPACE IF NOT EXISTS ${keyspace}
WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 }
"""

  def createWideTable = s"""
CREATE TABLE IF NOT EXISTS ${tableName} (
time varchar,
name varchar,
value varchar,
PRIMARY KEY (time,name))
WITH COMPACT STORAGE
"""

  def writeTimeNameValue(time: String) = s"""
INSERT INTO ${tableName} (time, name, value)
VALUES ('$time', ?, ?)
"""

  val cluster = Cluster.builder.addContactPoints("127.0.0.1").build
  val session = cluster.connect()

  session.execute(createKeyspace)
  session.execute(createWideTable)

  for(i<-0 until 1000) {
    val entries =
      for {
        i <- 0 until 10000
        name = i.toString
        value = name
      } yield name -> value
    val batchPreparedStatement = writeMap(i, entries)
    val t0 = System.nanoTime()
    session.execute(batchPreparedStatement)
    val dt = System.nanoTime() - t0
    println(i + " " + (dt/1.0e9))
  }

  def writeMap(time: Long, update: Seq[(String, String)]) : BatchStatement = {
    val template = session
      .prepare(writeTimeNameValue(time.toString))
      .setConsistencyLevel(ConsistencyLevel.ONE)
    val batch = new BatchStatement(BatchStatement.Type.UNLOGGED)
    for ((k, v) <- update)
      batch.add(template.bind(k, v))
    batch
  }
}


Here is the astyanax code (modified from an astyanax example) that does essentially the same thing 15 times faster. Note that this also does not use asynchronous calls so it is a fair comparison. This requires the column family to already exist, since I did not yet figure out how to create it using astyanax and the example did not have any code for creating the columnfamily.

package cassandra;

import java.util.Iterator;

import com.netflix.astyanax.ColumnListMutation;
import com.netflix.astyanax.serializers.AsciiSerializer;
import com.netflix.astyanax.serializers.LongSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.netflix.astyanax.AstyanaxContext;
import com.netflix.astyanax.Keyspace;
import com.netflix.astyanax.MutationBatch;
import com.netflix.astyanax.connectionpool.NodeDiscoveryType;
import com.netflix.astyanax.connectionpool.OperationResult;
import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
import com.netflix.astyanax.connectionpool.impl.ConnectionPoolConfigurationImpl;
import com.netflix.astyanax.connectionpool.impl.CountingConnectionPoolMonitor;
import com.netflix.astyanax.impl.AstyanaxConfigurationImpl;
import com.netflix.astyanax.model.Column;
import com.netflix.astyanax.model.ColumnFamily;
import com.netflix.astyanax.model.ColumnList;
import com.netflix.astyanax.thrift.ThriftFamilyFactory;

public class AstClient {
    private static final Logger logger = LoggerFactory.getLogger(AstClient.class);

    private AstyanaxContext<Keyspace> context;
    private Keyspace keyspace;
    private ColumnFamily<Long, String> EMP_CF;
    private static final String EMP_CF_NAME = "employees2";

    public void init() {
        logger.debug("init()");

        context = new AstyanaxContext.Builder()
                .forCluster("Test Cluster")
                .forKeyspace("test1")
                .withAstyanaxConfiguration(new AstyanaxConfigurationImpl()
                        .setDiscoveryType(NodeDiscoveryType.RING_DESCRIBE)
                )
                .withConnectionPoolConfiguration(new ConnectionPoolConfigurationImpl("MyConnectionPool")
                        .setPort(9160)
                        .setMaxConnsPerHost(1)
                        .setSeeds("127.0.0.1:9160")
                )
                .withAstyanaxConfiguration(new AstyanaxConfigurationImpl()
                        .setCqlVersion("3.0.0")
                        .setTargetCassandraVersion("2.0.5"))
                .withConnectionPoolMonitor(new CountingConnectionPoolMonitor())
                .buildKeyspace(ThriftFamilyFactory.getInstance());

        context.start();
        keyspace = context.getClient();

        EMP_CF = ColumnFamily.newColumnFamily(
                EMP_CF_NAME,
                LongSerializer.get(),
                AsciiSerializer.get());
    }

    public void insert(long time) {
        MutationBatch m = keyspace.prepareMutationBatch();

        ColumnListMutation<String> x =
                m.withRow(EMP_CF, time);
        for(int i=0;i<10000;i++)
            x.putColumn(Integer.toString(i), Integer.toString(i));

        try {
            @SuppressWarnings("unused")
            Object result = m.execute();
        } catch (ConnectionException e) {
            logger.error("failed to write data to C*", e);
            throw new RuntimeException("failed to write data to C*", e);
        }
        logger.debug("insert ok");
    }

    public void createCF() {
    }

    public void read(long time) {
        OperationResult<ColumnList<String>> result;
        try {
            result = keyspace.prepareQuery(EMP_CF)
                    .getKey(time)
                    .execute();

            ColumnList<String> cols = result.getResult();
            // process data

            // a) iterate over columsn
            for (Iterator<Column<String>> i = cols.iterator(); i.hasNext(); ) {
                Column<String> c = i.next();
                String v = c.getStringValue();
                System.out.println(c.getName() + " " + v);
            }

        } catch (ConnectionException e) {
            logger.error("failed to read from C*", e);
            throw new RuntimeException("failed to read from C*", e);
        }
    }

    public static void main(String[] args) {
        AstClient c = new AstClient();
        c.init();
        long t00 = System.nanoTime();
        for(int i=0;i<1000;i++) {
            long t0 = System.nanoTime();
            c.insert(i);
            long dt = System.nanoTime() - t0;
            System.out.println((1.0e9/dt) + " " + i);
        }
        long dtt = System.nanoTime() - t00;

        c.read(0);
        System.out.println(dtt / 1e9);
    }

}

Update: I found this thread on the cassandra-user mailing list. It seems that there is a performance problem with CQL when doing large wide row inserts. There is a ticket CASSANDRA-6737 to track this issue.

Update2: I have tried out the patch that is attached to CASSANDRA-6737, and I can confirm that this patch completely fixes the issue. Thanks to Sylvain Lebresne from DataStax for fixing this so quickly!

解决方案

You are not the only person to experience this. I wrote a blog post a while ago focused more on conversion between CQL and thrift, but there are links to mail list issues of folks seeing the same thing (the performance issue of wide-row inserts were my initial motivations for investigating): http://thelastpickle.com/blog/2013/09/13/CQL3-to-Astyanax-Compatibility.html

In sum - CQL is great for removing the burdens of dealing with typing and understanding the data model for folks new to Cassandra. The DataStax driver is well written and contains lots of useful features.

However, the Thrift API is more than slightly faster for wide row inserts. The Netflix blog does not go in to this specific use case so much. Further, the Thrift API is not legacy so long as people are using it (many folks are). It's an ASF project and as such is not run by any single vendor.

In general, with any Cassandra-based application, if you find a way of doing something that meets (or often exceeds) the performance requirements of your workload, stick with it.

这篇关于Cassandra:如何使用CQL插入具有良好性能的新宽行的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆