PyMongo 的批量写入操作功能与多处理和生成器 [英] PyMongo’s bulk write operation features with multiprocessing and generators

查看:68
本文介绍了PyMongo 的批量写入操作功能与多处理和生成器的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

PyMongo 支持生成器以使用 sDB.insert(iter_something(converted)) 进行批处理.批量写入操作功能,批量执行写入操作,以减少网络往返次数并提高写入吞吐量.

PyMongo supports generators for batch processing with sDB.insert(iter_something(converted)). Bulk write operation features which executes write operations in batches in order to reduces the number of network round trips and increases write throughput.

以下代码似乎有效,但我不知道 PyMongo 是否仍然能够与多处理一起迭代生成器,直到它产生 1000 个文档或 16MB 数据,然后在将批处理插入 MongoDB 时暂停生成器.

The following code seems to work, but I do not whether PyMongo still is able iterate the generator together with multiprocessing until it has yielded 1000 a documents or 16MB of data, then pause the generator while it inserts the batch into MongoDB.

#!/usr/bin/env python
from __future__ import absolute_import, division, print_function
from itertools import groupby
from pymongo import MongoClient
from multiprocessing import Process, JoinableQueue
import csv

# > use test
# switched to db test
# > db.createCollection("abc")
# { "ok" : 1 }
# > db.abc.find()


parts = [["Test", "A", "B01", 828288,  1,    7, 'C', 5],
    ["Test", "A", "B01", 828288,  1,    7, 'T', 6],
    ["Test", "A", "B01", 171878,  3,    7, 'C', 5],
    ["Test", "A", "B01", 171878,  3,    7, 'T', 6],
    ["Test", "A", "B01", 871963,  3,    9, 'A', 5],
    ["Test", "A", "B01", 871963,  3,    9, 'G', 6],
    ["Test", "A", "B01", 1932523, 1,   10, 'T', 4],
    ["Test", "A", "B01", 1932523, 1,   10, 'A', 5],
    ["Test", "A", "B01", 1932523, 1,   10, 'X', 6],
    ["Test", "A", "B01", 667214,  1,   14, 'T', 4],
    ["Test", "A", "B01", 667214,  1,   14, 'G', 5],
    ["Test", "A", "B01", 667214,  1,   14, 'G', 6]]


def iter_something(rows):
    key_names = ['type', 'name', 'sub_name', 'pos', 's_type', 'x_type']
    chr_key_names = ['letter', 'no']
    for keys, group in groupby(rows, lambda row: row[:6]):
        result = dict(zip(key_names, keys))
        result['chr'] = [dict(zip(chr_key_names, row[6:])) for row in group]
        yield result

class Loading(Process):

    def __init__(self, task_queue):
        Process.__init__(self)
        self.task_queue = task_queue
        db = MongoClient().test
        self.sDB = db["abc"]

    def run(self):
        while True:
            doc = self.task_queue.get()
            if doc is None:  # None means shutdown
                self.task_queue.task_done()
                break
            else:
                self.sDB.insert(doc)

def main():
    num_cores = 2

    tasks = JoinableQueue()

    threads = [Loading(tasks) for i in range(num_cores)]

    for i, w in enumerate(threads):
        w.start()
        print('Thread ' + str(i+1) + ' has started!')

    converters = [str, str, str, int, int, int, str, int]
    with open("/home/mic/tmp/test.txt") as f:
        reader = csv.reader(f, skipinitialspace=True)
        converted = ([conv(col) for conv, col in zip(converters, row)] for row in reader)
        # sDB.insert(iter_something(converted))

        # Enqueue jobs
        for i in iter_something(converted):
            tasks.put(i)

    # Add None to kill each thread
    for i in range(num_cores):
        tasks.put(None)

    # Wait for all of the tasks to finish
    tasks.join()


if __name__ == '__main__':
    main()

推荐答案

在这种情况下,您没有利用批量插入.每次调用self.sDB.insert(doc)"都会立即将文档发送到 MongoDB 并等待服务器的回复.你可以试试这个:

In this case you are not taking advantage of batch inserts. Each call to "self.sDB.insert(doc)" immediately sends the document to MongoDB and waits for the reply from the server. You could try this:

def run(self):
    def gen():
        while True:
            doc = self.task_queue.get()
            if doc is None:  # None means shutdown
                self.task_queue.task_done()
                break

            else:
                yield doc

    try:
        self.sDB.insert(gen())
    except InvalidOperation as e:
        # Perhaps "Empty bulk write", this process received no documents.
        print(e)

使用 mongosniff 来验证您是否正在向服务器而不是一次插入一个文档.根据文档的数量和进程的数量,某些进程可能没有文档.如果您尝试从空迭代器插入,PyMongo 会抛出 InvalidOperation,因此我使用try/except"插入".

Use mongosniff to verify that you're sending large batches to the server instead of inserting one document at a time. Depending on the number of documents and the number of processes, some processes might get no documents. PyMongo throws InvalidOperation if you try to insert from an empty iterator, so I "insert" with a "try / except".

顺便说一下,您不需要使用 MongoDB 调用 createCollection:第一次插入到集合中时会自动创建它.createCollection 仅在您需要特殊选项时才需要,例如有上限的集合.

By the way, you don't need to call createCollection with MongoDB: the first insert into a collection creates it automatically. createCollection is only necessary if you want special options, like a capped collection.

这篇关于PyMongo 的批量写入操作功能与多处理和生成器的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆