有没有办法使用 FORALL 从数组中插入数据? [英] Is there a way to use FORALL to insert data from an array?

查看:36
本文介绍了有没有办法使用 FORALL 从数组中插入数据?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在运行 oracle 19c,我想获得最佳的插入性能.目前,我使用 INSERT/*+APPEND */... 插入,这很好,但不是我想要的速度.
我读到使用 FORALL 会快很多,但我真的找不到任何例子.这是代码片段(python 3):

connection = pool.acquire()游标 = connection.cursor()cursor.executemany("INSERT/*+APPEND*/INTO RANDOM VALUES (:1, :2, :3)", list(random))连接提交()游标.close()连接.close()

解决方案

我真的对什么会更快很感兴趣,所以我测试了一些可能的方法来比较它们:

  • 简单的executemany,没有任何技巧.
  • 与语句中的 APPEND_VALUES 提示相同.
  • union all 您在另一个问题中尝试过的方法.这应该比上面慢,因为它会生成一个非常大的语句(这可能需要比数据本身更多的网络).然后应该在 DB 端解析它,这也会消耗大量时间并忽略所有好处(不谈论潜在的大小限制).然后我 executemany 对其进行了测试,而不是为 100k 条记录构建单个语句.我没有在语句中使用值的串联,因为想保证它的安全.
  • 全部插入.同样的缺点,但没有工会.将其与 union 版本进行比较.
  • 序列化 JSON 中的数据,并在数据库端使用 <代码>json_table.单个短语句和单个数据传输具有潜在的良好性能,而 JSON 开销很小.
  • 您建议的FORALL 在 PL/SQL 包装程序中.应该与 executemany 相同,因为它的作用相同,但在数据库端.将数据转换为集合的开销.
  • 相同的 FORALL,但使用列式方法来传递数据:传递列值的简单列表而不是复杂类型.应该比带有集合的 FORALL 快得多,因为不需要将数据序列化为集合的类型.

我使用免费帐户在 Oracle 云中使用了 Oracle 自治数据库.每个方法循环执行 10 次,输入数据集相同,包含 100k 条记录,每次测试前重新创建表.这是我得到的结果.这里的准备时间和执行时间分别是客户端DB调用本身的数据转换.

<预><代码>>>>t = 性能测试(100000)>>>t.run("exec_many", 10)方法:exec_many.持续时间,平均:2.3083874 秒准备时间,平均:0.0 秒执行时间,平均:2.3083874 秒>>>t.run("exec_many_append", 10)方法:exec_many_append.持续时间,平均:2.6031369 秒准备时间,平均:0.0 秒执行时间,平均:2.6031369 秒>>>t.run("union_all", 10, 10000)方法:union_all.持续时间,平均:27.9444233 秒准备时间,平均:0.0408773 s执行时间,平均:27.8457551 秒>>>t.run("insert_all", 10, 10000)方法:insert_all.持续时间,平均:70.6442494 秒准备时间,平均:0.0289269 s执行时间,平均:70.5541995 秒>>>t.run(json_table", 10)方法:json_table.持续时间,平均:10.4648237 秒平均准备时间:9.7907693 秒执行时间,平均:0.621006 秒>>>t.run(forall", 10)方法:全部.持续时间,平均:5.5622837 秒准备时间,平均:1.8972456000000002 s执行时间,平均:3.6650380999999994 s>>>t.run(forall_columnar", 10)方法:forall_columnar.持续时间,平均:2.6702698000000002 秒准备时间,平均:0.055710800000000005 s执行时间,平均:2.6105702 秒>>>

最快的方法就是executemany,没什么好惊讶的.有趣的是,APPEND_VALUES 并没有改进查询并且平均获得更多时间,因此这需要更多调查.

关于FORALL:正如预期的那样,每列的单个数组花费的时间更少,因为没有为其准备数据.它或多或少与 executemany 相当,但我认为 PL/SQL 开销在这里起到了一定的作用.

对我来说另一个有趣的部分是 JSON:大部分时间都花在将 LOB 写入数据库和序列化上,但查询本身非常快.也许可以使用 chuncsize 以某种方式改进写入操作,或者可以通过其他方式将 LOB 数据传递到 select 语句中,但是就我的代码而言,使用 executemany 远非非常简单和直接的方法.

也有可能在没有 Python 的情况下应该作为外部数据的本地工具更快,但我没有测试它们:

以下是我用于测试的代码.

导入 cx_Oracle 作为数据库导入操作系统,随机,json将日期时间导入为 dt性能测试类:def __init__(self, size):self._con = db.connect(os.environ[ora_cloud_usr"],os.environ[ora_cloud_pwd"],测试低",编码=UTF-8")self._cur = self._con.cursor()self.inp = [(i, "Test {i}".format(i=i), random.random()) for i in range(size)]def __del__(self):如果 self._con:self._con.rollback()self._con.close()#创建对象定义设置(自我):尝试:self._cur.execute(删除表rand")#print(桌子掉了")除了:经过self._cur.execute("""创建表rand(身份证号码,str varchar2(100),数值)""")self._cur.execute("""create or replace package pkg_test as类型 ts_test 是记录(id rand.id%type,str rand.str%type,val rand.val%type);类型 tt_test 是 pls_integer 的 ts_test 索引表;类型 tt_ids 是由 pls_integer 生成的 rand.id%type 索引表;类型 tt_strs 是由 pls_integer 生成的 rand.str% 类型索引表;类型 tt_vals 是由 pls_integer 生成的 rand.val%type 索引表;过程 write_data(tt_test 中的 p_data);过程 write_data_columnar(tt_ids 中的 p_ids,tt_strs 中的 p_strs,tt_vals 中的 p_vals);结束;""")self._cur.execute("""create or replace package body pkg_test as过程 write_data(tt_test 中的 p_data)作为开始forall i 在 p_data 的索引中插入 rand(id, str, val)值 (p_data(i).id, p_data(i).str, p_data(i).val);犯罪;结尾;过程 write_data_columnar(tt_ids 中的 p_ids,tt_strs 中的 p_strs,tt_vals 中的 p_vals) 作为开始forall i 在 p_ids 的索引中插入 rand(id, str, val)值(p_ids(i)、p_strs(i)、p_vals(i));犯罪;结尾;结尾;""")def build_union(self, size):return """ 插入 rand(id, str, val)select id, str, val from rand where 1 = 0 union all"+ """union all """.join([select :{}, :{}, :{} from dual".format(i*3+1, i*3+2, i*3+3)对于我在范围内(大小)])def build_insert_all(self, size):返回"""".join([转化为 rand(id, str, val) 值 (:{}, :{}, :{})".format(i*3+1, i*3+2, i*3+3)对于我在范围内(大小)])#使用executemany的测试用例def exec_many(self):开始 = dt.datetime.now()self._cur.executemany(插入 rand(id, str, val) 值 (:1, :2, :3)", self.inp)self._con.commit()返回 (dt.timedelta(0), dt.datetime.now() - 开始)#同上,但有prepared statement(不解析)def exec_many_append(self):开始 = dt.datetime.now()self._cur.executemany("insert/*+APPEND_VALUES*/into rand(id, str, val) values (:1, :2, :3)", self.inp)self._con.commit()返回 (dt.timedelta(0), dt.datetime.now() - 开始)#Union All 方法(分块).应该有很大的解析时间def union_all(self, size):##分块大元组列表start_prepare = dt.datetime.now()new_inp = [元组([r 中 t 的项目,t 中的项目])对于列表中的 r(zip(*[iter(self.inp)]*size))]new_stmt = self.build_union(size)dur_prepare = dt.datetime.now() - start_prepare#执行联合start_exec = dt.datetime.now()self._cur.executemany(new_stmt, new_inp)dur_exec = dt.datetime.now() - start_exec##如果大小不是除数余数 = len(self.inp) % 大小如果余数>0 :start_prepare = dt.datetime.now()new_stmt = self.build_union(余数)new_inp = 元组([self.inp[-remainder:] 中 t 中的项目])dur_prepare += dt.datetime.now() - start_preparestart_exec = dt.datetime.now()self._cur.execute(new_stmt, new_inp)dur_exec += dt.datetime.now() - start_execself._con.commit()返回(dur_prepare,dur_exec)#与union all相同,但不需要union某些东西def insert_all(self, size):##分块大元组列表start_prepare = dt.datetime.now()new_inp = [元组([r 中 t 的项目,t 中的项目])对于列表中的 r(zip(*[iter(self.inp)]*size))]new_stmt = """"插入所有{}select * from 双"dur_prepare = dt.datetime.now() - start_prepare#执行start_exec = dt.datetime.now()self._cur.executemany(new_stmt.format(self.build_insert_all(size)),新输入)dur_exec = dt.datetime.now() - start_exec##如果大小不是除数余数 = len(self.inp) % 大小如果余数>0 :start_prepare = dt.datetime.now()new_inp = 元组([self.inp[-remainder:] 中 t 中的项目])dur_prepare += dt.datetime.now() - start_preparestart_exec = dt.datetime.now()self._cur.execute(new_stmt.format(self.build_insert_all(remainder)),新输入)dur_exec += dt.datetime.now() - start_execself._con.commit()返回(dur_prepare,dur_exec)#server端序列化,DB端反序列化def json_table(self):start_prepare = dt.datetime.now()new_inp = json.dumps([{ "id":t[0], "str":t[1], "val":t[2]} 用于 self.inp 中的 t])lob_var = self._con.createlob(db.DB_TYPE_CLOB)lob_var.write(new_inp)start_exec = dt.datetime.now()self._cur.execute(""";插入 rand(id, str, val)选择 id、str、val来自 json_table(to_clob(:json), '$[*]'列身份证号码,str varchar2(100),数值)""", json=lob_var)dur_exec = dt.datetime.now() - start_execself._con.commit()返回 (start_exec - start_prepare, dur_exec)#PL/SQL 与 FORALLdef forall(self):start_prepare = dt.datetime.now()collection_type = self._con.gettype(PKG_TEST.TT_TEST")record_type = self._con.gettype(PKG_TEST.TS_TEST")def recBuilder(x):rec = record_type.newobject()rec.ID = x[0]rec.STR = x[1]rec.VAL = x[2]返回记录inp_collection = collection_type.newobject([recBuilder(i) for i in self.inp])start_exec = dt.datetime.now()self._cur.callproc("pkg_test.write_data", [inp_collection])dur_exec = dt.datetime.now() - start_exec返回 (start_exec - start_prepare, dur_exec)#PL/SQL 与 FORALL 和普通集合def forall_columnar(self):start_prepare = dt.datetime.now()ids, strs, vals = map(list, zip(*self.inp))start_exec = dt.datetime.now()self._cur.callproc("pkg_test.write_data_columnar", [ids, strs, vals])dur_exec = dt.datetime.now() - start_exec返回 (start_exec - start_prepare, dur_exec)#运行测试def 运行(自我,方法,迭代,*args):#清理模式自我设置()开始 = dt.datetime.now()运行时间 = []对于 i 在范围内(迭代):single_run = getattr(self, method)(*args)runtime.append(single_run)dur = dt.datetime.now() - 开始dur_prep_total = sum([i.total_seconds() for i, _ 在运行时])dur_exec_total = sum([i.total_seconds() for _, i in runtime])打印("方法:{meth}.持续时间,平均:{run_dur} s准备时间,平均:{prep} s执行时间,平均:{ex} s""".format(inp_s=len(self.inp),方法=方法,run_dur=dur.total_seconds()/迭代次数,prep=dur_prep_total/迭代次数,ex=dur_exec_total/迭代次数))

I am running oracle 19c and I want to get the best insert performance I can. Currently, I insert using INSERT /*+APPEND */ ... which is fine, but not the speeds I wanted.
I read that using FORALL is a lot faster but I couldn't really find any examples. here is the code snippet (python 3) :

connection = pool.acquire()
cursor = connection.cursor()
cursor.executemany("INSERT /*+APPEND*/ INTO RANDOM VALUES (:1, :2, :3)", list(random))
connection.commit()
cursor.close()
connection.close()

解决方案

I really get interested in what would be faster, so I've tested some possibile ways to compare them:

  • simple executemany with no tricks.
  • the same with APPEND_VALUES hint inside the statement.
  • union all approach you've tried in another question. This should be slower than above since it generates a really very large statement (that potentially can require more network than the data itself). It then should be parsed at DB side that will also consume a lot of time and neglect all the benefits (not talking about potential size limit). Then I've executemany'ed it to test with chunks not to build a single statement for 100k records. I didn't use concatenation of values inside the statement, because wanted to keep it safe.
  • insert all. The same downsides, but no unions. Compare it with the union version.
  • serialize the data in JSON and do deserialization at DB side with json_table. Potentially good performance with single short statement and single data transfer with little overhead of JSON.
  • Your suggested FORALL in PL/SQL wrapper procedure. Should be the same as executemany since does the same, but at the database side. Overhead of transformation of the data into the collection.
  • The same FORALL, but with columnar approach to pass the data: pass simple lists of column values instead of complex type. Should be much faster than FORALL with collection since there's no need to serialize the data into collection's type.

I've used Oracle Autonomous Database in Oracle Cloud with free account. Each method was executed for 10 times in loop with the same input dataset of 100k records, table was recreated before each test. This is the result I've got. Preparation and execution times here are data transformation at client side end DB call itself respectively.

>>> t = PerfTest(100000)
>>> t.run("exec_many", 10)
Method:  exec_many.
    Duration, avg: 2.3083874 s
    Preparation time, avg: 0.0 s
    Execution time, avg: 2.3083874 s
>>> t.run("exec_many_append", 10)
Method: exec_many_append.
    Duration, avg: 2.6031369 s
    Preparation time, avg: 0.0 s
    Execution time, avg: 2.6031369 s
>>> t.run("union_all", 10, 10000)
Method:  union_all.
    Duration, avg: 27.9444233 s
    Preparation time, avg: 0.0408773 s
    Execution time, avg: 27.8457551 s
>>> t.run("insert_all", 10, 10000)
Method: insert_all.
    Duration, avg: 70.6442494 s
    Preparation time, avg: 0.0289269 s
    Execution time, avg: 70.5541995 s
>>> t.run("json_table", 10)
Method: json_table.
    Duration, avg: 10.4648237 s
    Preparation time, avg: 9.7907693 s
    Execution time, avg: 0.621006 s
>>> t.run("forall", 10)
Method:     forall.
    Duration, avg: 5.5622837 s
    Preparation time, avg: 1.8972456000000002 s
    Execution time, avg: 3.6650380999999994 s
>>> t.run("forall_columnar", 10)
Method: forall_columnar.
    Duration, avg: 2.6702698000000002 s
    Preparation time, avg: 0.055710800000000005 s
    Execution time, avg: 2.6105702 s
>>> 

The fastest way is just executemany, not so much surprise. Interesting here is that APPEND_VALUES does not improve the query and gets more time on average, so this needs more investigation.

About FORALL: as expected, individual array for each column takes less time as there's no data preparation for it. It is more or less comparable with executemany, but I think PL/SQL overhead plays some role here.

Another interesting part for me is JSON: most of the time was spent on writing LOB into database and serialization, but the query itself was very fast. Maybe write operation can be improved in some way with chuncsize or some another way to pass LOB data into select statement, but as of my code it is far from very simple and straightforward approach with executemany.

There`re also possible approaches without Python that should be faster as native tools for external data, but I didn't tested them:

Below is the code I've used for testing.

import cx_Oracle as db
import os, random, json
import datetime as dt


class PerfTest:
  
  def __init__(self, size):
    self._con = db.connect(
      os.environ["ora_cloud_usr"],
      os.environ["ora_cloud_pwd"],
      "test_low",
      encoding="UTF-8"
    )
    self._cur = self._con.cursor()
    self.inp = [(i, "Test {i}".format(i=i), random.random()) for i in range(size)]
  
  def __del__(self):
    if self._con:
      self._con.rollback()
      self._con.close()
 
#Create objets
  def setup(self):
    try:
      self._cur.execute("drop table rand")
      #print("table dropped")
    except:
      pass
  
    self._cur.execute("""create table rand(
      id int,
      str varchar2(100),
      val number
    )""")
    
    self._cur.execute("""create or replace package pkg_test as
  type ts_test is record (
    id rand.id%type,
    str rand.str%type,
    val rand.val%type
  );
  type tt_test is table of ts_test index by pls_integer;
  
  type tt_ids is table of rand.id%type index by pls_integer;
  type tt_strs is table of rand.str%type index by pls_integer;
  type tt_vals is table of rand.val%type index by pls_integer;
  
  procedure write_data(p_data in tt_test);
  procedure write_data_columnar(
    p_ids in tt_ids,
    p_strs in tt_strs,
    p_vals in tt_vals
  );

end;""")
    self._cur.execute("""create or replace package body pkg_test as
  procedure write_data(p_data in tt_test)
  as
  begin
    forall i in indices of p_data
      insert into rand(id, str, val)
      values (p_data(i).id, p_data(i).str, p_data(i).val)
    ;
    
    commit;

  end;
  
  procedure write_data_columnar(
    p_ids in tt_ids,
    p_strs in tt_strs,
    p_vals in tt_vals
  ) as
  begin
    forall i in indices of p_ids
      insert into rand(id, str, val)
      values (p_ids(i), p_strs(i), p_vals(i))
    ;
    
    commit;
    
  end;

end;
""")

 
  def build_union(self, size):
      return """insert into rand(id, str, val)
    select id, str, val from rand where 1 = 0 union all
    """ + """ union all """.join(
      ["select :{}, :{}, :{} from dual".format(i*3+1, i*3+2, i*3+3)
        for i in range(size)]
    )
 
 
  def build_insert_all(self, size):
      return """
      """.join(
      ["into rand(id, str, val) values (:{}, :{}, :{})".format(i*3+1, i*3+2, i*3+3)
        for i in range(size)]
    )


#Test case with executemany
  def exec_many(self):
    start = dt.datetime.now()
    self._cur.executemany("insert into rand(id, str, val) values (:1, :2, :3)", self.inp)
    self._con.commit()
    
    return (dt.timedelta(0), dt.datetime.now() - start)
 
 
#The same as above but with prepared statement (no parsing)
  def exec_many_append(self):
    start = dt.datetime.now()
    self._cur.executemany("insert /*+APPEND_VALUES*/ into rand(id, str, val) values (:1, :2, :3)", self.inp)
    self._con.commit()
    
    return (dt.timedelta(0), dt.datetime.now() - start)


#Union All approach (chunked). Should have large parse time
  def union_all(self, size):
##Chunked list of big tuples
    start_prepare = dt.datetime.now()
    new_inp = [
      tuple([item for t in r for item in t])
      for r in list(zip(*[iter(self.inp)]*size))
    ]
    new_stmt = self.build_union(size)
    
    dur_prepare = dt.datetime.now() - start_prepare
    
    #Execute unions
    start_exec = dt.datetime.now()
    self._cur.executemany(new_stmt, new_inp)
    dur_exec = dt.datetime.now() - start_exec

##In case the size is not a divisor
    remainder = len(self.inp) % size
    if remainder > 0 :
      start_prepare = dt.datetime.now()
      new_stmt = self.build_union(remainder)
      new_inp = tuple([
        item for t in self.inp[-remainder:] for item in t
      ])
      dur_prepare += dt.datetime.now() - start_prepare
      
      start_exec = dt.datetime.now()
      self._cur.execute(new_stmt, new_inp)
      dur_exec += dt.datetime.now() - start_exec

    self._con.commit()
    
    return (dur_prepare, dur_exec)


#The same as union all, but with no need to union something
  def insert_all(self, size):
##Chunked list of big tuples
    start_prepare = dt.datetime.now()
    new_inp = [
      tuple([item for t in r for item in t])
      for r in list(zip(*[iter(self.inp)]*size))
    ]
    new_stmt = """insert all
    {}
    select * from dual"""
    dur_prepare = dt.datetime.now() - start_prepare
    
    #Execute
    start_exec = dt.datetime.now()
    self._cur.executemany(
      new_stmt.format(self.build_insert_all(size)),
      new_inp
    )
    dur_exec = dt.datetime.now() - start_exec

##In case the size is not a divisor
    remainder = len(self.inp) % size
    if remainder > 0 :
      start_prepare = dt.datetime.now()
      new_inp = tuple([
        item for t in self.inp[-remainder:] for item in t
      ])
      dur_prepare += dt.datetime.now() - start_prepare
      
      start_exec = dt.datetime.now()
      self._cur.execute(
        new_stmt.format(self.build_insert_all(remainder)),
        new_inp
      )
      dur_exec += dt.datetime.now() - start_exec

    self._con.commit()
    
    return (dur_prepare, dur_exec)

    
#Serialize at server side and do deserialization at DB side
  def json_table(self):
    start_prepare = dt.datetime.now()
    new_inp = json.dumps([
      { "id":t[0], "str":t[1], "val":t[2]} for t in self.inp
    ])
    
    lob_var = self._con.createlob(db.DB_TYPE_CLOB)
    lob_var.write(new_inp)
    
    start_exec = dt.datetime.now()
    self._cur.execute("""
    insert into rand(id, str, val)
    select id, str, val
    from json_table(
      to_clob(:json), '$[*]'
      columns
        id int,
        str varchar2(100),
        val number
    )
    """, json=lob_var)
    dur_exec = dt.datetime.now() - start_exec
    
    self._con.commit()
    
    return (start_exec - start_prepare, dur_exec)


#PL/SQL with FORALL
  def forall(self):
    start_prepare = dt.datetime.now()
    collection_type = self._con.gettype("PKG_TEST.TT_TEST")
    record_type = self._con.gettype("PKG_TEST.TS_TEST")
    
    def recBuilder(x):
      rec = record_type.newobject()
      rec.ID = x[0]
      rec.STR = x[1]
      rec.VAL = x[2]
      
      return rec

    inp_collection = collection_type.newobject([
      recBuilder(i) for i in self.inp
    ])
    
    start_exec = dt.datetime.now()
    self._cur.callproc("pkg_test.write_data", [inp_collection])
    dur_exec = dt.datetime.now() - start_exec
    
    return (start_exec - start_prepare, dur_exec)


#PL/SQL with FORALL and plain collections
  def forall_columnar(self):
    start_prepare = dt.datetime.now()
    ids, strs, vals = map(list, zip(*self.inp))
    start_exec = dt.datetime.now()
    self._cur.callproc("pkg_test.write_data_columnar", [ids, strs, vals])
    dur_exec = dt.datetime.now() - start_exec
    
    return (start_exec - start_prepare, dur_exec)

  
#Run test
  def run(self, method, iterations, *args):
    #Cleanup schema
    self.setup()

    start = dt.datetime.now()
    runtime = []
    for i in range(iterations):
      single_run = getattr(self, method)(*args)
      runtime.append(single_run)
    
    dur = dt.datetime.now() - start
    dur_prep_total = sum([i.total_seconds() for i, _ in runtime])
    dur_exec_total = sum([i.total_seconds() for _, i in runtime])
    
    print("""Method: {meth}.
    Duration, avg: {run_dur} s
    Preparation time, avg: {prep} s
    Execution time, avg: {ex} s""".format(
      inp_s=len(self.inp),
      meth=method,
      run_dur=dur.total_seconds() / iterations,
      prep=dur_prep_total / iterations,
      ex=dur_exec_total / iterations
    ))

这篇关于有没有办法使用 FORALL 从数组中插入数据?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆