使用Python从日志文件批量加载到PostgreSQL [英] Load bulk into PostgreSQLfrom log files using Python

查看:78
本文介绍了使用Python从日志文件批量加载到PostgreSQL的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

这是一个后续问题。下面是我的Python脚本的一部分,该脚本读取不断增长的日志文件(文本)并将数据插入Postgresql DB。每天生成新的日志文件。我要做的是提交每一行,这会导致巨大的负载和非常糟糕的性能(需要4个小时才能插入30分钟的文件数据!)。如何改进此代码以将成批插入行中?会有助于提高性能并减少负载吗?我已经读过有关copy_from的信息,但无法弄清楚如何在这种情况下使用它。

This is a follow-up question. Below is a piece of my Python script that reads a constantly growing log files (text) and insert data into Postgresql DB. New log file generated each day. What I do is I commit each line which cuases a huge load and a really poor performance (needs 4 hours to insert 30 min of the file data!). How can I improve this code to insert bulks insead of lines? and would this help improve the performance and reduce load? I've read about copy_from but couldn't figure out how to use it in such situation.

   import psycopg2 as psycopg
                    try:
                      connectStr = "dbname='postgis20' user='postgres' password='' host='localhost'"
                      cx = psycopg.connect(connectStr)
                      cu = cx.cursor()
                      logging.info("connected to DB")
                    except:
                      logging.error("could not connect to the database")


                import time
                file = open('textfile.log', 'r')
                while 1:
                    where = file.tell()
                    line = file.readline()
                    if not line:
                        time.sleep(1)
                        file.seek(where)
                    else:
                        print line, # already has newline
                        dodecode(line)
            ------------
    def dodecode(fields):
   global cx
   from time import strftime, gmtime
   from calendar import timegm
   import os
   msg = fields.split(',')
   part = eval(msg[2])
   msgnum = int(msg[3:6])
   print "message#:", msgnum
   print fields

   if (part==1):
     if msgnum==1:
       msg1 = msg_1.decode(bv)
       #print "message1 :",msg1
       Insert(msgnum,time,msg1)
     elif msgnum==2:
       msg2 = msg_2.decode(bv)
       #print "message2 :",msg2
       Insert(msgnum,time,msg2)
     elif msgnum==3:
     ....
     ....
     ....    
        ----------------
        def Insert(msgnum,time,msg):
         global cx

         try:    
                 if msgnum in [1,2,3]:   
                  if msg['type']==0:
                    cu.execute("INSERT INTO table1 ( messageid, timestamp, userid, position, text ) SELECT "+str(msgnum)+", '"+time+"', "+str(msg['UserID'])+", ST_GeomFromText('POINT("+str(float(msg['longitude']), '"+text+"')+" "+str(float(msg['latitude']))+")']))+"  WHERE NOT EXISTS (SELECT * FROM table1 WHERE timestamp='"+time+"' AND text='"+text+"';")      
                    cu.execute("INSERT INTO table2 ( field1,field2,field3, time_stamp, pos,) SELECT "+str(msg['UserID'])+","+str(int(msg['UserName']))+","+str(int(msg['UserIO']))+", '"+time+"', ST_GeomFromText('POINT("+str(float(msg['longitude']))+" "+str(float(msg['latitude']))+")')," WHERE NOT EXISTS (SELECT * FROM table2 WHERE field1="+str(msg['UserID'])+");")
                    cu.execute("Update table2 SET field3='"+str(int(msg['UserIO']))+"',time_stamp='"+str(time)+"',pos=ST_GeomFromText('POINT("+str(float(msg['longitude']))+" "+str(float(msg['latitude']))+")'),"' WHERE field1='"+str(msg['UserID'])+"' AND time_stamp < '"+str(time)+"';")
                  elif msg['type']==1:
                    cu.execute("INSERT INTO table1 ( messageid, timestamp, userid, position, text ) SELECT "+str(msgnum)+", '"+time+"', "+str(msg['UserID'])+", ST_GeomFromText('POINT("+str(float(msg['longitude']), '"+text+"')+" "+str(float(msg['latitude']))+")']))+"  WHERE NOT EXISTS (SELECT * FROM table1 WHERE timestamp='"+time+"' AND text='"+text+"';")    
                    cu.execute("INSERT INTO table2 ( field1,field2,field3, time_stamp, pos,) SELECT "+str(msg['UserID'])+","+str(int(msg['UserName']))+","+str(int(msg['UserIO']))+", '"+time+"', ST_GeomFromText('POINT("+str(float(msg['longitude']))+" "+str(float(msg['latitude']))+")')," WHERE NOT EXISTS (SELECT * FROM table2 WHERE field1="+str(msg['UserID'])+");")
                    cu.execute("Update table2 SET field3='"+str(int(msg['UserIO']))+"',time_stamp='"+str(time)+"',pos=ST_GeomFromText('POINT("+str(float(msg['longitude']))+" "+str(float(msg['latitude']))+")'),"' WHERE field1='"+str(msg['UserID'])+"' AND time_stamp < '"+str(time)+"';")
                  elif msg['type']==2:
                ....
                ....
                ....
     except Exception, err:
             #print('ERROR: %s\n' % str(err))
             logging.error('ERROR: %s\n' % str(err))
             cx.commit()

     cx.commit()  


推荐答案

在每个事务中执行多行,并且每个查询都会使其运行更快,

doing multiple rows per transaction, and per query will make it go faster,

类似的问题是,我在插入查询的值部分
中插入了多行,但是您有复杂的插入查询,因此您可能需要使用其他方法。

when faced with a similar problem I put multiple rows in the values part of the insert query, but you have complicated insert queries, so you'll likely need a different approach.

我建议创建一个临时表,并使用普通的多行插入表插入10000行

I'd suggest creating a temporary table and inserting say 10000 rows into it with ordinary multi-row inserts

insert into temptable values ( /* row1 data */ ) ,( /* row2 data */ ) etc...



500行。是一个很好的起点。

500 rows per insert.is a good starting point.

然后将temp表与现有数据进行重复数据删除。

then joining the temp table with the existing data to de-dupe it.

delete from temptable using livetable where /* .join condition */ ;

并在需要时将其去对偶化

and de-duping it against itself if that is needed too

delete from temptable where id not in 
  ( select distinct on ( /* unique columns */) id from temptable);

然后使用insert-select将行从临时表复制到活动表中

then using insert-select to copy the rows from the temporary table into the live table

insert into livetable ( /* columns */ )
  select /* columns */ from temptable; 

似乎您可能需要更新自

并最终删除临时表并重新开始。

and finally dropping the temp table and starting again.

您正在编写两个表您将需要将所有这些操作加倍。

ans you're writing two tables you;re going to need to double-up all these operations.

我将通过维护一个计数和要插入的值列表,然后在插入时进行插入时间
构建重复查询的(%s,%s,%s,%s)部分并根据需要传递值列表

I'd do the insert by maintaing a count and a list of values to insert and then at insert time building a repeating the (%s,%s,%s,%s) part ot the query as many times as needed and passing the list of values in separately and letting psycopg2 deal with the formatting.

我希望进行这些更改可以使您的速度提高5倍,

I'd expect making those changes could get you a speed up of 5 times for more

这篇关于使用Python从日志文件批量加载到PostgreSQL的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆