如何一次提交模型实例并将其从工作内存中删除 [英] How to commit model instances and remove them from working memory a few at a time

查看:39
本文介绍了如何一次提交模型实例并将其从工作内存中删除的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个金字塔视图,用于将大文件中的数据加载到数据库中.对于文件中的每一行,它都会做一些处理,然后创建一些模型实例并将它们添加到会话中.这工作正常,除非文件很大.对于大文件,视图会慢慢耗尽我所有的内存,直到一切都有效地停止.

I have a pyramid view that is used for loading data from a large file into a database. For each line in the file it does a little processing then creates some model instances and adds them to the session. This works fine except when the files are big. For large files the view slowly eats up all my ram until everything effectively grinds to a halt.

所以我的想法是使用创建会话的函数单独处理每一行,创建必要的模型实例并将它们添加到当前会话,然后提交.

So my idea is to process each line individually with a function that creates a session, creates the necessary model instances and adds them to the current session, then commits.

def commit_line(lTitles,lLine,oStartDate,oEndDate,iDS,dSettings):
    from sqlalchemy.orm import (
            scoped_session,
            sessionmaker,
    )
    from sqlalchemy import engine_from_config
    from pyramidapp.models import Base, DataEntry
    from zope.sqlalchemy import ZopeTransactionExtension
    import transaction

    oCurrentDBSession = scoped_session(sessionmaker(extension=ZopeTransactionExtension()))
    engine = engine_from_config(dSettings, 'sqlalchemy.')
    oCurrentDBSession.configure(bind=engine)
    Base.metadata.bind = engine

    oEntry = DataEntry()
    oCurrentDBSession.add(oEntry)
    ...
    transaction.commit()

我对这个功能的要求如下:

My requirements for this function are as follows:

  1. 创建会话(检查)
  2. 制作一堆模型实例(检查)
  3. 将这些实例添加到会话中(检查)
  4. 将这些模型提交到数据库
  5. 摆脱会话(以便它和在 2 中创建的对象被垃圾收集)

我确保在必要时将新创建的会话作为参数传递,以防止与多个会话有关的错误等等.可惜!我无法让数据库连接消失,并且没有提交内容.

I've made sure that the newly created session is passed as an argument whenever necessary in order to stop errors to do with multiple sessions blah blah. But alas! I can't get database connections to go away and stuff isn't being committed.

我尝试将函数分离到 celery 任务中,以便视图执行完成并执行它需要的操作,但是无论我在提交和关闭和处理,我不知道为什么.是的,当我进行更改时,我会重新启动 celery 服务器.

I tried separating the function out into a celery task so the view executes to completion and does what it needs to but I'm getting an error in celery about having too many mysql connections no matter what I try in terms of committing and closing and disposing and I'm not sure why. And yes, I restart the celery server when I make changes.

当然有一种简单的方法可以做到这一点吗?我想要做的就是提交一个会话,然后离开,让我一个人呆着.

Surely there is a simple way to do this? All I want to do is make a session commit then go away and leave me alone.

推荐答案

所以我尝试了很多方法,虽然使用 SQLAlchemy 的内置功能来解决这个问题可能是可能的,但我找不到任何方法来解决这个问题.

So I tried a bunch of things and, although using SQLAlchemy's built in functionality to solve this was probably possible I could not find any way of pulling that off.

这里是我所做工作的概要:

So here's an outline of what I did:

  1. 将要处理的生产线分成批次
  2. 为每一批线排队一个celery任务来处理这些线
  3. 在 celery 任务中启动了一个单独的进程,该进程对行执行必要的操作.

推理:

  1. 批处理的东西很明显
  2. 使用 Celery 是因为处理整个文件需要很长时间,所以排队是有道理的
  3. 该任务启动了一个单独的进程,因为如果它没有启动,我就会遇到与金字塔应用程序相同的问题

一些代码:

芹菜任务:

def commit_lines(lLineData,dSettings,cwd):
    """
    writes the line data to a file then calls a process that reads the file and creates
    the necessary data entries. Then deletes the file
    """
    import lockfile
    sFileName = "/home/sheena/tmp/cid_line_buffer"
    lock = lockfile.FileLock("{0}_lock".format(sFileName))
    with lock:
        f = open(sFileName,'a') #in case the process was at any point interrupted...
        for d in lLineData:
            f.write('{0}\n'.format(d))
        f.close()

    #now call the external process
    import subprocess
    import os
    sConnectionString = dSettings.get('sqlalchemy.url')
    lArgs = [
                'python',os.path.join(cwd,'commit_line_file.py'),
                '-c',sConnectionString,
                '-f',sFileName
        ]
    #open the subprocess. wait for it to complete before continuing with stuff. if errors: raise
    subprocess.check_call(lArgs,shell=False)
    #and clear the file
    lock = lockfile.FileLock("{0}_lock".format(sFileName))
    with lock:
        f = open(sFileName,'w')
        f.close()

外部流程:

"""
this script goes through all lines in a file and creates data entries from the lines
"""
def main():
    from optparse import OptionParser
    from sqlalchemy import create_engine
    from pyramidapp.models import Base,DBSession

    import ast
    import transaction

    #get options

    oParser = OptionParser()
    oParser.add_option('-c','--connection_string',dest='connection_string')
    oParser.add_option('-f','--input_file',dest='input_file')
    (oOptions, lArgs) = oParser.parse_args()

    #set up connection

    #engine = engine_from_config(dSettings, 'sqlalchemy.')
    engine = create_engine(
        oOptions.connection_string,
        echo=False)
    DBSession.configure(bind=engine)
    Base.metadata.bind = engine

    #commit stuffs
    import lockfile
    lock = lockfile.FileLock("{0}_lock".format(oOptions.input_file))
    with lock:
        for sLine in open(oOptions.input_file,'r'):
            dLine = ast.literal_eval(sLine)
            create_entry(**dLine)

    transaction.commit()

def create_entry(iDS,oStartDate,oEndDate,lTitles,lValues):
    #import stuff
    oEntry = DataEntry()
    #do some other stuff, make more model instances...
    DBSession.add(oEntry)


if __name__ == "__main__":
    main()

在视图中:

 for line in big_giant_csv_file_handler:
     lLineData.append({'stuff':'lots'})

 if lLineData:
            lLineSets = [lLineData[i:i+iBatchSize] for i in range(0,len(lLineData),iBatchSize)]
            for l in lLineSets:
                commit_lines.delay(l,dSettings,sCWD)  #queue it for celery

这篇关于如何一次提交模型实例并将其从工作内存中删除的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆