多个线程上的sqlalchemy连接池 [英] sqlalchemy connection pool on multiple threads

查看:733
本文介绍了多个线程上的sqlalchemy连接池的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

首先,我创建一个简单的表

First I create a simple table

import threading
from sqlalchemy import create_engine
from sqlalchemy import Column, Integer, Float, String, Date, DateTime, Boolean, select
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.pool import SingletonThreadPool
from sqlalchemy.orm import scoped_session
from sqlalchemy.orm import sessionmaker

inven_schema = {'__tablename__' : 'inventory',
            'item_no'       : Column(Integer, primary_key=True, autoincrement=True),
            'desc'          : Column(String(255), nullable=False),
            'volume'        : Column(Integer, nullable=False)
            }


Base = declarative_base()

# Dynamically create Inventory for ORM
Inventory = type('Inventory', (Base,), inven_schema)

some_inventory = [{'item_no' : 0, 'desc' : 'toy crane', 'volume' : 12},
              {'item_no' : 1, 'desc' : 'puddle jumper', 'volume' : 2},
              {'item_no' : 2, 'desc' : 'pet snake', 'volume' : 1},
              {'item_no' : 3, 'desc' : 'bowling ball', 'volume' : 4},
              {'item_no' : 4, 'desc' : 'spinning top', 'volume' : 3},
              {'item_no' : 5, 'desc' : 'pumpkin', 'volume' : 2}]

但是,如果会话和插入位于不同的线程上,则无法正常工作.这是我的尝试,引发:

But I can't get it to work if the session and inserts are on different threads. Here's my attempt, which throws:

File "/usr/local/lib/python3.5/dist-packages/sqlalchemy/util/compat.py", line 203, in raise_from_cause
reraise(type(exception), exception, tb=exc_tb, cause=cause)
  File "/usr/local/lib/python3.5/dist-packages/sqlalchemy/util/compat.py", line 186, in reraise
raise value.with_traceback(tb)
  File "/usr/local/lib/python3.5/dist-packages/sqlalchemy/engine/base.py", line 1159, in _execute_context
context)
  File "/usr/local/lib/python3.5/dist-packages/sqlalchemy/engine/default.py", line 467, in do_executemany
cursor.executemany(statement, parameters)
sqlalchemy.exc.OperationalError: (sqlite3.OperationalError) no such table: inventory [SQL: 'INSERT INTO inventory (item_no, "desc", volume) VALUES (?, ?, ?)'] [parameters: ((0, 'toy crane', 12), (1, 'puddle jumper', 2), (2, 'pet snake', 1), (3, 'bowling ball', 4), (4, 'spinning top', 3), (5, 'pumpkin', 2))]

我尝试在线程调用中创建会话,还有更多变体.我应该如何修改以下方法?

I've tried creating the session in the thread call, and a few more variations. How should I modify the following approach?

def insert_inventory(inventory, session):
    inventory_ = [Inventory(**inven) for inven in inventory]
    session.add_all(inventory_)
    session.commit()

def results(session):
    q = select([Inventory.item_no]).distinct(Inventory.item_no)
    distinct_items = session.execute(q).fetchall()
    print('{!r}'.format(distinct_items))

def demo2():
    engine = create_engine('sqlite://')
    conn = engine.connect()
    Base.metadata.create_all(conn.engine)

    session_factory = sessionmaker(bind=conn.engine)

    Session = scoped_session(session_factory)
    session = Session()

    thread_0 = threading.Thread(target=insert_inventory, args=(some_inventory, session))
    thread_0.start()

    session.close()

    results(session)

推荐答案

我无法在session()级别使用add()add_all().如果engine()是使用其他参数创建的,则可以进行以下操作.

I am not able to add() or add_all() at the session() level. If the engine() is created with additional args, the following works.

我相信它正在序列化对sqlite3 db表的调用,这不是理想的,但对于sqlite是必需的.

It is serializing calls to sqlite3 db tables I believe, which is not ideal but necessary for sqlite.

engine = create_engine('sqlite://', connect_args={'check_same_thread' : False})
conn = engine.connect()

metadata = MetaData(engine)
table = Table('inventory',
              metadata,
              Column('item_no', Integer, primary_key=True, autoincrement=True),
              Column('desc', String(255), nullable=False),
              Column('volume', Integer, nullable=False)
              )

metadata.create_all()

some_inventory = [{'item_no' : 0, 'desc' : 'toy crane', 'volume' : 12},
                  {'item_no' : 1, 'desc' : 'puddle jumper', 'volume' : 2},
                  {'item_no' : 2, 'desc' : 'pet snake', 'volume' : 1},
                  {'item_no' : 3, 'desc' : 'bowling ball', 'volume' : 4},
                  {'item_no' : 4, 'desc' : 'spinning top', 'volume' : 3},
                  {'item_no' : 5, 'desc' : 'pumpkin', 'volume' : 2}]


thread_0 = threading.Thread(target=insert_inventory_table, args=(conn, table, some_inventory[0:3]))
thread_1 = threading.Thread(target=insert_inventory_table, args=(conn, table, some_inventory[3:]))

thread_0.start()
thread_1.start()

这篇关于多个线程上的sqlalchemy连接池的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆