Dapper批量插入返回序列号 [英] Dapper Bulk Insert Returning Serial IDs

查看:182
本文介绍了Dapper批量插入返回序列号的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试使用Nappersql上的Dapper执行大容量插入,该插入返回新插入的行的ID。在我的两个示例中都使用了以下插入语句:

I am attempting to perform a bulk-insert using Dapper over Npgsql, that returns the ids of the newly inserted rows. The following insert statement is used in both of my examples:

var query = "INSERT INTO \"MyTable\" (\"Value\") VALUES (@Value) RETURNING \"ID\"";

首先,我尝试添加具有 Value属性的对象数组:

First, I attempted to add an array of objects with a "Value" property:

var values = new[] {
    new { Value = 0.0 },
    new { Value = 0.5 }
};
var ids = connection.Query<int>(query, values);

但是,由于NpgsqlException而失败:错误:42703:值列不存在 。阅读这个问题后,我认为也许我必须传递一个DataTable对象而不是对象数组:

However, that fails with the NpgsqlException: "ERROR: 42703: column "value" does not exist". After reading this question, I thought that perhaps I have to pass a DataTable object instead of an object array:

var dataTable = new DataTable();
dataTable.Columns.Add("Value", typeof(double));
dataTable.Rows.Add(0.0);
dataTable.Rows.Add(0.5);
var ids = connection.Query<int>(query, dataTable);

但是,这会以相同的例外失败。我如何执行批量插入并通过Npgsql从Dapper中获得结果序列号?

However, this fails with the exact same exception. How can I perform a bulk-insert and get the resulting serial ids out of Dapper over Npgsql?

我确实注意到异常的大小写与列名不匹配。 ,但我可以肯定在表名和列名两边都有引号,因此我不确定为什么在例外情况下为什么用 value而不是 Value。只是以为我会提到它,以防万一它与错误相关,因为它很容易忽略大小写。

I did note that the casing of the exception does not match the column name, but I am certain that I have quotes around the table and column names, so I'm not certain why it says "value" instead of "Value" in the exception. Just thought I would mention it in case it is related to the error somehow, as it is easy to overlook casing.

-编辑-

为澄清起见,这是创建表的SQL

To clarify, this is the SQL to create the table

CREATE TABLE "MyTable" (
    "ID" SERIAL PRIMARY KEY,
    "Value" DOUBLE PRECISION NOT NULL
);

并使用上面定义的变量 query和 values,这是有效的代码

And using the variables "query" and "values" defined above, this is the code that is working on a per-row basis:

var ids = new List<int>();
foreach (var valueObj in values) {
    var queryParams = new DynamicParamaters();
    queryParams.Add("Value", valueObj.Value);
    ids.AddRange(connection.Query<int>(query, queryParams));
}

问题是我需要能够插入数百个(可能是数千个)每秒的行数进入 MyTable,因此等待此循环将每个值迭代地发送到数据库很麻烦,并且(我认为,但尚未进行基准测试)非常耗时。此外,我对可能需要或可能不会导致需要我对 MyTable条目的外键引用的附加插入的值进行附加计算。

The issue is that I need to be able to insert hundreds (perhaps thousands in the near future) of rows per second into "MyTable", so waiting for this loop to iteratively send each value to the database is cumbersome and (I assume, but have yet to benchmark) time consuming. Further, I perform additional computation on the values that may or may not result in additional inserts where I need a foreign key reference to the "MyTable" entry.

因为这些问题,我正在寻找一种替代方法,可将所有值在单个语句中发送到数据库,以减少网络流量和处理延迟。再一次,我还没有对迭代方法进行基准测试...我正在寻找的是一种进行大容量插入的替代方法,因此我可以将两种方法彼此进行基准测试。

Because of these issues, I am looking for an alternative that sends all values in a single statement to the database, in order to reduce network traffic and processing latency. Again, I have NOT benchmarked the iterative approach yet... what I am looking for is an alternative that does a bulk insert so I can benchmark the two approaches against each other.

推荐答案

最终,我想出了四种解决此问题的方法。我生成了500个随机值以插入MyTable中,并对四种方法(包括启动和回滚运行它的事务)中的每种方法进行计时。在我的测试中,数据库位于localhost上。但是,具有最佳性能的解决方案也只需要与数据库服务器进行一次往返,因此,我发现最好的解决方案在部署到与数据库不同的服务器时仍应胜过其他选择。

Ultimately, I came up with four different approaches to this problem. I generated 500 random values to insert into MyTable, and timed each of the four approaches (including starting and rolling back the transaction in which it was run). In my test, the database is located on localhost. However, the solution with the best performance also requires only one round trip to the database server, so the best solution I found should still beat the alternatives when deployed to a different server than the database.

请注意,以下代码中使用了变量 connection transaction ,并假定它们是有效的Npgsql数据对象。还请注意,符号 Nx slower 表示操作花费的时间等于最优解乘以 N 的时间。

Note that the variables connection and transaction are used in the following code, and are assumed to be valid Npgsql data objects. Also note that the notation Nx slower indicates an operation took an amount of time equal to the optimal solution multiplied by N.

方法1(1,494ms =慢18.7倍):将数组展开为单个参数

Approach #1 (1,494ms = 18.7x slower): Unroll the array into individual parameters

public List<MyTable> InsertEntries(double[] entries)
{
    // Create a variable used to dynamically build the query
    var query = new StringBuilder(
        "INSERT INTO \"MyTable\" (\"Value\") VALUES ");

    // Create the dictionary used to store the query parameters
    var queryParams = new DynamicParameters();

    // Get the result set without auto-assigned ids
    var result = entries.Select(e => new MyTable { Value = e }).ToList();

    // Add a unique parameter for each id
    var paramIdx = 0;
    foreach (var entry in result)
    {
        var paramName = string.Format("value{1:D6}", paramIdx);
        if (0 < paramIdx++) query.Append(',');
        query.AppendFormat("(:{0})", paramName);
        queryParams.Add(paramName, entry.Value);
    }
    query.Append(" RETURNING \"ID\"");

    // Execute the query, and store the ids
    var ids = connection.Query<int>(query, queryParams, transaction);
    ids.ForEach((id, i) => result[i].ID = id);

    // Return the result
    return result;
}

我真的不知道为什么这是最慢的,因为

I'm really not sure why this came out to be the slowest, since it only requires a single round trip to the database, but it was.

方法2(267ms =慢3.3倍):

public List<MyTable> InsertEntries(double[] entries)
{
    const string query =
        "INSERT INTO \"MyTable\" (\"Value\") VALUES (:val) RETURNING \"ID\"";

    // Get the result set without auto-assigned ids
    var result = entries.Select(e => new MyTable { Value = e }).ToList();

    // Add each entry to the database
    foreach (var entry in result)
    {
        var queryParams = new DynamicParameters();
        queryParams.Add("val", entry.Value);
        entry.ID = connection.Query<int>(
            query, queryParams, transaction);
    }

    // Return the result
    return result;
}

我很震惊,它仅比最佳解决方案慢3.3倍,但是我希望在实际环境中情况会变得更糟,因为此解决方案需要向服务器串行发送500条消息。但是,这也是最简单的解决方案。

I was shocked that this was only 3.3x slower than the optimal solution, but I would expect that to get significantly worse in the real environment, since this solution requires sending 500 messages to the server serially. However, this is also the simplest solution.

方法3(223ms =慢2.8倍):异步循环迭代

public List<MyTable> InsertEntries(double[] entries)
{
    const string query =
        "INSERT INTO \"MyTable\" (\"Value\") VALUES (:val) RETURNING \"ID\"";

    // Get the result set without auto-assigned ids
    var result = entries.Select(e => new MyTable { Value = e }).ToList();

    // Add each entry to the database asynchronously
    var taskList = new List<Task<IEnumerable<int>>>();
    foreach (var entry in result)
    {
        var queryParams = new DynamicParameters();
        queryParams.Add("val", entry.Value);
        taskList.Add(connection.QueryAsync<int>(
            query, queryParams, transaction));
    }

    // Now that all queries have been sent, start reading the results
    for (var i = 0; i < result.Count; ++i)
    {
        result[i].ID = taskList[i].Result.First();
    }

    // Return the result
    return result;
}

这种情况越来越好,但仍然不是最优的,因为我们只能排队线程池中有可用线程的插入数量。但是,这几乎与非线程方法一样简单,因此是速度和可读性之间的良好折衷。

This is getting better, but is still less than optimal because we can only queue as many inserts as there are available threads in the thread pool. However, this is almost as simple as the non-threaded approach, so it is a good compromise between speed and readability.

方法4(134ms = 1.7 x较慢):批量插入

此方法要求在运行下面的代码段之前定义以下Postgres SQL:

This approach requires the following Postgres SQL be defined prior to running the code segment below it:

CREATE TYPE "MyTableType" AS (
    "Value" DOUBLE PRECISION
);

CREATE FUNCTION "InsertIntoMyTable"(entries "MyTableType"[])
    RETURNS SETOF INT AS $$

    DECLARE
        insertCmd TEXT := 'INSERT INTO "MyTable" ("Value") '
            'VALUES ($1) RETURNING "ID"';
        entry "MyTableType";
    BEGIN
        FOREACH entry IN ARRAY entries LOOP
            RETURN QUERY EXECUTE insertCmd USING entry."Value";
        END LOOP;
    END;
$$ LANGUAGE PLPGSQL;

以及相关代码:

public List<MyTable> InsertEntries(double[] entries)
{
    const string query =
        "SELECT * FROM \"InsertIntoMyTable\"(:entries::\"MyTableType\")";

    // Get the result set without auto-assigned ids
    var result = entries.Select(e => new MyTable { Value = e }).ToList();

    // Convert each entry into a Postgres string
    var entryStrings = result.Select(
        e => string.Format("({0:E16})", e.Value).ToArray();

    // Create a parameter for the array of MyTable entries
    var queryParam = new {entries = entryStrings};

    // Perform the insert
    var ids = connection.Query<int>(query, queryParam, transaction);

    // Assign each id to the result
    ids.ForEach((id, i) => result[i].ID = id);

    // Return the result
    return result;
}

这种方法有两个问题:首先是我必须对MyTableType成员的顺序进行硬编码。如果顺序发生变化,我必须修改此代码以使其匹配。第二个原因是,在将所有输入值发送到postgres之前,必须将所有输入值转换为字符串(在实际代码中,我有不止一列,所以我不能只改变SI除非我传入N个数组(其中N是MyTableType上的字段数),否则数据库函数的精度将达到双精度[]。

There are two issues that I have with this approach. The first is that I have to hard-code the ordering of the members of MyTableType. If that ordering ever changes, I have to modify this code to match. The second is that I have to convert all input values to a string prior to sending them to postgres (in the real code, I have more than one column, so I can't just change the signature of the database function to take a double precision[], unless I pass in N arrays, where N is the number of fields on MyTableType).

尽管存在这些陷阱,

-开始编辑-

自从原始帖子以来,我想出了四种其他方法,这些方法都比上面列出的方法快。我已经修改了 Nx慢速编号,以反映以下新的最快方法。

Since the original post, I came up with four additional approaches that are all faster than those listed above. I have modified the Nx slower numbers to reflect the new fastest method, below.

方法5(105毫秒=慢1.3倍) ):与#4相同,没有动态查询

Approach #5 (105ms = 1.3x slower): Same as #4, without a dynamic query

此方法与方法#4 之间的唯一区别如下更改为 InsertIntoMyTable函数:

The only difference between this approach and Approach #4 is the following change to the "InsertIntoMyTable" function:

CREATE FUNCTION "InsertIntoMyTable"(entries "MyTableType"[])
    RETURNS SETOF INT AS $$

    DECLARE
        entry "MyTableType";
    BEGIN
        FOREACH entry IN ARRAY entries LOOP
            RETURN QUERY INSERT INTO "MyTable" ("Value")
                VALUES (entry."Value") RETURNING "ID";
        END LOOP;
    END;
$$ LANGUAGE PLPGSQL;

除了方法4的问题之外,此方法的缺点就是说,在生产环境中, MyTable已分区。使用这种方法,每个目标分区都需要一个方法。

In addition to the issues with Approach #4, the downside to this is that, in the production environment, "MyTable" is partitioned. Using this approach, I need one method per target partition.

方法6(89ms =慢1.1倍):带数组参数的插入语句

Approach #6 (89ms = 1.1x slower): Insert statement with array argument

public List<MyTable> InsertEntries(double[] entries)
{
    const string query =
        "INSERT INTO \"MyTable\" (\"Value\") SELECT a.* FROM " +
            "UNNEST(:entries::\"MyTableType\") a RETURNING \"ID\"";

    // Get the result set without auto-assigned ids
    var result = entries.Select(e => new MyTable { Value = e }).ToList();

    // Convert each entry into a Postgres string
    var entryStrings = result.Select(
        e => string.Format("({0:E16})", e.Value).ToArray();

    // Create a parameter for the array of MyTable entries
    var queryParam = new {entries = entryStrings};

    // Perform the insert
    var ids = connection.Query<int>(query, queryParam, transaction);

    // Assign each id to the result
    ids.ForEach((id, i) => result[i].ID = id);

    // Return the result
    return result;
}

唯一的缺点是与方法4的第一个问题相同 MyTableType 的实现仍然是我的第二喜欢的方法,因为它非常快速,并且不需要任何数据库功能即可正常工作

The only downside to this is the same as the first issue with Approach #4. Namely, that it couples the implementation to the ordering of "MyTableType". Still, I found this to be my second favorite approach since it is very fast, and does not require any database functions to work correctly.

方法7(80毫秒=稍慢) :与#1相同,但没有参数

Approach #7 (80ms = VERY slightly slower): Same as #1, but without parameters

public List<MyTable> InsertEntries(double[] entries)
{
    // Create a variable used to dynamically build the query
    var query = new StringBuilder(
        "INSERT INTO \"MyTable\" (\"Value\") VALUES");

    // Get the result set without auto-assigned ids
    var result = entries.Select(e => new MyTable { Value = e }).ToList();

    // Add each row directly into the insert statement
    for (var i = 0; i < result.Count; ++i)
    {
        entry = result[i];
        query.Append(i == 0 ? ' ' : ',');
        query.AppendFormat("({0:E16})", entry.Value);
    }
    query.Append(" RETURNING \"ID\"");

    // Execute the query, and store the ids
    var ids = connection.Query<int>(query, null, transaction);
    ids.ForEach((id, i) => result[i].ID = id);

    // Return the result
    return result;
}

这是我最喜欢的方法。它仅比最快的速度慢一点(即使有4000条记录,它仍在1秒内运行),但是不需要特殊的数据库功能或类型。我唯一不喜欢的是,我必须对双精度值进行字符串化,然后再由Postgres进行解析。最好以二进制形式发送值,以便它们占用8个字节,而不是我为它们分配的20个字节左右。

This is my favorite approach. It is only marginally slower than the fastest (even with 4000 records, it still runs under 1 second), but requires no special database functions or types. The only thing I don't like about it is that I have to stringify the double precision values, only to be parsed out again by Postgres. It would be preferable to send the values in binary so they took up 8 bytes instead of the massive 20 or so bytes I have allocated for them.

方法# 8(80毫秒):与#5相同,但在纯SQL中

Approach #8 (80ms): Same as #5, but in pure sql

此方法与方法#5 之间的唯一区别是对 InsertIntoMyTable函数的以下更改:

The only difference between this approach and Approach #5 is the following change to the "InsertIntoMyTable" function:

CREATE FUNCTION "InsertIntoMyTable"(
    entries "MyTableType"[]) RETURNS SETOF INT AS $$

    INSERT INTO "MyTable" ("Value")
        SELECT a.* FROM UNNEST(entries) a RETURNING "ID";
$$ LANGUAGE SQL;

此方法与#5一样,每个 MyTable 需要一个函数划分。这是最快的,因为可以为每个功能生成一次查询计划,然后重新使用。在其他方法中,必须先解析查询,然后计划查询,然后执行查询。尽管这是最快的方法,但由于方法7 对数据库方面的额外要求,因此我没有选择它,但是速度收益却很少。

This approach, like #5, requires one function per "MyTable" partition. This is the fastest because the query plan can be generated once for each function, then reused. In the other approaches, the query must be parsed, then planned, then executed. Despite this being the fastest, I didn't choose it due to the additional requirements on the database side over Approach #7, with very little speed benefit.

这篇关于Dapper批量插入返回序列号的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆