如何最好地将grakn查询与Python并行化? [英] How best to parallelize grakn queries with Python?

查看:126
本文介绍了如何最好地将grakn查询与Python并行化?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我运行Windows 10,Python 3.7,并具有6核CPU。我的机器上的单个Python线程每秒提交1,000次插入以进行grakn。我想并行处理代码以更快地插入和匹配。人们是如何做到的?

I run Windows 10, Python 3.7, and have a 6-core CPU. A single Python thread on my machine submits 1,000 inserts per second to grakn. I'd like to parallelize my code to insert and match even faster. How are people doing this?

我唯一的并行化经验是在另一个项目上,我在该项目中向自定义的分布式客户端提交了自定义函数,以生成数千个任务。现在,每当自定义函数接收或生成grakn事务对象/句柄时,这种方法都会失败。我收到类似这样的错误:

My only experience with parellelization is on another project, where I submit a custom function to a dask distributed client to generate thousands of tasks. Right now, this same approach fails whenever the custom function receives or generates a grakn transaction object/handle. I get errors like:

Traceback (most recent call last):
  File "C:\Users\dvyd\.conda\envs\activefiction\lib\site-packages\distributed\protocol\pickle.py", line 41, in dumps
    return cloudpickle.dumps(x, protocol=pickle.HIGHEST_PROTOCOL)
...
  File "stringsource", line 2, in grpc._cython.cygrpc.Channel.__reduce_cython__
TypeError: no default __reduce__ due to non-trivial __cinit__

我从没直接使用过Python的多处理模块。

I've never used Python's multiprocessing module directly. What are other people doing to parallelize their queries to grakn?

推荐答案

我发现其他人正在做什么,以使查询并行化为grakn?查询是将Grakn会话传递给 ThreadPool 中的每个线程。在每个线程中,您可以管理事务,当然也可以执行一些更复杂的逻辑:

The easiest approach that I've found to execute a batch of queries is to pass a Grakn session to each thread in a ThreadPool. Within each thread you can manage transactions and of course do some more complex logic:

from grakn.client import GraknClient
from multiprocessing.dummy import Pool as ThreadPool
from functools import partial

def write_query_batch(session, batch):
    tx = session.transaction().write()
    for query in batch:
        tx.query(query)
    tx.commit()

def multi_thread_write_query_batches(session, query_batches, num_threads=8):
    pool = ThreadPool(num_threads)
    pool.map(partial(write_query_batch, session), query_batches)
    pool.close()
    pool.join()

def generate_query_batches(my_data_entries_list, batch_size):
    batch = []
    for index, data_entry in enumerate(my_data_entries_list):
        batch.append(data_entry)
        if index % batch_size == 0 and index != 0:
            yield batch
            batch = []
    if batch:
        yield batch


# (Part 2) Somewhere in your application open a client and a session
client = GraknClient(uri="localhost:48555")
session = client.session(keyspace="grakn")

query_batches_iterator = generate_query_batches(my_data_entries_list, batch_size)
multi_thread_write_query_batches(session, query_batches_iterator, num_threads=8)

session.close()
client.close()

以上是通用方法。作为一个具体的示例,您可以使用上面的内容(第2部分)来并行化来自两个文件的批量 insert 语句。将其附加到上面应该可以工作:

The above is a generic method. As a concrete example, you can use the above (omitting part 2) to parallelise batches of insert statements from two files. Appending this to the above should work:

files = [
    {
        "file_path": f"/path/to/your/file.gql",
    },
    {
        "file_path": f"/path/to/your/file2.gql",
    }
]

KEYSPACE = "grakn"
URI = "localhost:48555"
BATCH_SIZE = 10
NUM_BATCHES = 1000

# ​Entry point where migration starts
def migrate_graql_files():
    start_time = time.time()

    for file in files:
        print('==================================================')
        print(f'Loading from {file["file_path"]}')
        print('==================================================')

        open_file = open(file["file_path"], "r")  # Here we are assuming you have 1 Graql query per line!
        batches = generate_query_batches(open_file.readlines(), BATCH_SIZE)

        with GraknClient(uri=URI) as client:  # Using `with` auto-closes the client
            with client.session(KEYSPACE) as session:  # Using `with` auto-closes the session
                multi_thread_write_query_batches(session, batches, num_threads=16)  # Pick `num_threads` according to your machine

        elapsed = time.time() - start_time
        print(f'Time elapsed {elapsed:.1f} seconds')

    elapsed = time.time() - start_time
    print(f'Time elapsed {elapsed:.1f} seconds')

if __name__ == "__main__":
    migrate_graql_files()

您还应该能够以这种方式查看如何从 csv 或任何其他文件类型加载,但是您在该文件中找到的值,并将它们替换为Graql查询字符串模板。查看文档中的迁移示例以了解更多信息

You should also be able to see how you can load from a csv or any other file type in this way, but taking the values you find in that file and substitution them into Graql query string templates. Take a look at the migration example in the docs for more on that.

这篇关于如何最好地将grakn查询与Python并行化?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆