创建DB连接并维护多个进程(多进程) [英] Create DB connection and maintain on multiple processes (multiprocessing)

查看:178
本文介绍了创建DB连接并维护多个进程(多进程)的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

类似我做的另一篇文章,这回答了那个帖子并创建了一个新问题。

Similar to another post I made, this answers that post and creates a new question.

重述:我需要更新空间数据库中的每个记录,具有覆盖多边形数据集的点的数据集。对于每个点要素,我要分配一个键以将其与其所在的面要素相关联。所以如果我的点'纽约市'在多边形美国和美国多边形'GID = 1',我会指定'gid_fkey = 1'为我的点纽约市。

Recap: I need to update every record in a spatial database in which I have a data set of points that overlay data set of polygons. For each point feature I want to assign a key to relate it to the polygon feature that it lies within. So if my point 'New York City' lies within polygon USA and for the USA polygon 'GID = 1' I will assign 'gid_fkey = 1' for my point New York City.

好,所以这是使用多重处理实现的。我注意到,使用这个速度增加150%,所以它的工作。但我认为有一堆不必要的开销,因为每个记录需要一个数据库连接。

Okay so this has been achieved using multiprocessing. I have noticed a 150% increase in speed using this so it does work. But I think there is a bunch of unecessary overhead as one DB connection is required for each record.

这里是代码:

import multiprocessing, time, psycopg2

class Consumer(multiprocessing.Process):

    def __init__(self, task_queue, result_queue):
        multiprocessing.Process.__init__(self)
        self.task_queue = task_queue
        self.result_queue = result_queue

    def run(self):
        proc_name = self.name
        while True:
            next_task = self.task_queue.get()
            if next_task is None:
                print 'Tasks Complete'
                self.task_queue.task_done()
                break            
            answer = next_task()
            self.task_queue.task_done()
            self.result_queue.put(answer)
        return


class Task(object):
    def __init__(self, a):
        self.a = a

    def __call__(self):        
        pyConn = psycopg2.connect("dbname='geobase_1' host = 'localhost'")
        pyConn.set_isolation_level(0)
        pyCursor1 = pyConn.cursor()

        procQuery = 'UPDATE city SET gid_fkey = gid FROM country  WHERE ST_within((SELECT the_geom FROM city WHERE city_id = %s), country.the_geom) AND city_id = %s' % (self.a, self.a)

        pyCursor1.execute(procQuery)
        print 'What is self?'
        print self.a

        return self.a

    def __str__(self):
        return 'ARC'
    def run(self):
        print 'IN'

if __name__ == '__main__':
    tasks = multiprocessing.JoinableQueue()
    results = multiprocessing.Queue()

    num_consumers = multiprocessing.cpu_count() * 2
    consumers = [Consumer(tasks, results) for i in xrange(num_consumers)]
    for w in consumers:
        w.start()

    pyConnX = psycopg2.connect("dbname='geobase_1' host = 'localhost'")
    pyConnX.set_isolation_level(0)
    pyCursorX = pyConnX.cursor()

    pyCursorX.execute('SELECT count(*) FROM cities WHERE gid_fkey IS NULL')    
    temp = pyCursorX.fetchall()    
    num_job = temp[0]
    num_jobs = num_job[0]

    pyCursorX.execute('SELECT city_id FROM city WHERE gid_fkey IS NULL')    
    cityIdListTuple = pyCursorX.fetchall()    

    cityIdListList = []

    for x in cityIdListTuple:
        cityIdList.append(x[0])


    for i in xrange(num_jobs):
        tasks.put(Task(cityIdList[i - 1]))

    for i in xrange(num_consumers):
        tasks.put(None)

    while num_jobs:
        result = results.get()
        print result
        num_jobs -= 1

看起来每个连接在0.3到1.5秒之间它与时间模块。

It looks to be between 0.3 and 1.5 seconds per connection as I have measure it with 'time' module.

有一种方法,使每个进程的DB连接,然后只是使用city_id信息作为一个变量,我可以馈入一个查询对于在这个打开的游标?这种方式我说,四个过程每个与一个数据库连接,然后drop me city_id以某种方式来处理。

Is there a way to make a DB connection per process and then just use the city_id info as a variable that I can feed into a query for the cursor in this open? This way I make say four processes each with a DB connection and then drop me city_id in somehow to process.

推荐答案

在Consumer构造函数中创建您的连接,然后将其赋给执行的任务:

Try to isolate the creation of your connection in the Consumer constructor, then give it to the executed Task :

import multiprocessing, time, psycopg2

class Consumer(multiprocessing.Process):

    def __init__(self, task_queue, result_queue):
        multiprocessing.Process.__init__(self)
        self.task_queue = task_queue
        self.result_queue = result_queue
        self.pyConn = psycopg2.connect("dbname='geobase_1' host = 'localhost'")
        self.pyConn.set_isolation_level(0)


    def run(self):
        proc_name = self.name
        while True:
            next_task = self.task_queue.get()
            if next_task is None:
                print 'Tasks Complete'
                self.task_queue.task_done()
                break            
            answer = next_task(connection=self.pyConn)
            self.task_queue.task_done()
            self.result_queue.put(answer)
        return


class Task(object):
    def __init__(self, a):
        self.a = a

    def __call__(self, connection=None):        
        pyConn = connection
        pyCursor1 = pyConn.cursor()

        procQuery = 'UPDATE city SET gid_fkey = gid FROM country  WHERE ST_within((SELECT the_geom FROM city WHERE city_id = %s), country.the_geom) AND city_id = %s' % (self.a, self.a)

        pyCursor1.execute(procQuery)
        print 'What is self?'
        print self.a

        return self.a

    def __str__(self):
        return 'ARC'
    def run(self):
        print 'IN'

这篇关于创建DB连接并维护多个进程(多进程)的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆