python cassandra驱动程序插入性能与复制相同 [英] python cassandra driver same insert performance as copy

查看:90
本文介绍了python cassandra驱动程序插入性能与复制相同的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试对Cassandra使用Python异步,以查看是否可以比CQL COPY命令更快地将记录写入Cassandra.

I'm trying to use Python async with Cassandra to see if I can write records to Cassandra faster than the CQL COPY command.

我的python代码如下:

My python code looks like this:

from cassandra.cluster import Cluster
from cassandra import ConsistencyLevel
from cassandra.query import SimpleStatement
cluster = Cluster(['1.2.1.4'])

session = cluster.connect('test')

with open('dataImport.txt') as f:
    for line in f:
        query = SimpleStatement (
            "INSERT INTO tstTable (id, accts, info) VALUES (%s) " %(line),
            consistency_level=ConsistencyLevel.ONE)
        session.execute_async (query)

但是它给我的性能与COPY命令相同...大约2,700行/秒....异步处理是否应该更快?

but its giving me the same performance as the COPY command...around 2,700 rows/sec....should it be faster with async?

我需要在python中使用多线程吗?只是读了一下,但不确定它是否适合这个...

Do I need to use multithreading in python? Just reading about it but not sure how it fits into this...

所以我在网上发现了一些我要修改的东西,但无法正常工作...到目前为止,我已经拥有了..而且我将该文件分为3个文件,放入/Data/toImport/dir:

so I found something online that i'm trying to modify but can't get to quite work...I have this so far..also I split the file into 3 file into /Data/toImport/ dir:

import multiprocessing
import time
import os
from cassandra.cluster import Cluster
from cassandra import ConsistencyLevel
from cassandra.query import SimpleStatement


cluster = Cluster(['1.2.1.4'])

session = cluster.connect('test')

def mp_worker(inputArg):
        with open(inputArg[0]) as f:
            for line in f:
                query = SimpleStatement (
                    "INSERT INTO CustInfo (cust_id, accts, offers) values (%s)" %(line),
                    consistency_level=ConsistencyLevel.ONE)
                session.execute_async (query)


def mp_handler(inputData, nThreads = 8):
    p = multiprocessing.Pool(nThreads)
    p.map(mp_worker, inputData, chunksize=1)
    p.close()
    p.join()

if __name__ == '__main__':
    temp_in_data = file_list
    start = time.time()
    in_dir = '/Data/toImport/'
    N_Proc = 8
    file_data = [(in_dir) for i in temp_in_data]

    print '----------------------------------Start Working!!!!-----------------------------'
    print 'Number of Processes using: %d' %N_Proc
    mp_handler(file_data, N_Proc)
    end = time.time()
    time_elapsed = end - start
    print '----------------------------------All Done!!!!-----------------------------'
    print "Time elapsed: {} seconds".format(time_elapsed)

但出现此错误:

Traceback (most recent call last):
  File "multiCass.py", line 27, in <module>
    temp_in_data = file_list
NameError: name 'file_list' is not defined

推荐答案

让它像这样工作:

import multiprocessing
import time
import os
from cassandra.cluster import Cluster
from cassandra import ConsistencyLevel
from cassandra.query import SimpleStatement



def mp_worker(inputArg):
        cluster = Cluster(['1.2.1.4'])
        session = cluster.connect('poc')


        with open(inputArg[0]) as f:
            for line in f:
                query = SimpleStatement (
                    "INSERT INTO testTable (cust_id, accts, offers) values (%s)" %(line),
                    consistency_level=ConsistencyLevel.ONE)
                session.execute_async (query)


def mp_handler(inputData, nThreads = 8):
    p = multiprocessing.Pool(nThreads)
    p.map(mp_worker, inputData, chunksize=1)
    p.close()
    p.join()

if __name__ == '__main__':
    temp_in_data = ['/toImport/part-00000', '/toImport/part-00001', '/toImport/part-00002']
    start = time.time()
    N_Proc = 3
    file_data = [(i,) for i in temp_in_data]

    print '----------------------------------Start Working!!!!-----------------------------'
    print 'Number of Processes using: %d' %N_Proc
    mp_handler(file_data, N_Proc)
    end = time.time()
    time_elapsed = end - start
    print '----------------------------------All Done!!!!-----------------------------'
    print "Time elapsed: {} seconds".format(time_elapsed)

这篇关于python cassandra驱动程序插入性能与复制相同的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
相关文章
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆