从C#到SQL Server的批量插入策略 [英] Bulk insert strategy from c# to SQL Server

查看:60
本文介绍了从C#到SQL Server的批量插入策略的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

在我们当前的项目中,客户会将复杂/嵌套消息的集合发送到我们的系统。这些消息的频率约为。 1000-2000 msg /每秒。

In our current project, customers will send collection of a complex/nested messages to our system. Frequency of these messages are approx. 1000-2000 msg/per seconds.

这些复杂对象包含交易数据(要添加)以及主数据(如果找不到则将添加)。但是,客户没有传递主数据的ID,而是传递了名称列。

These complex objects contains the transaction data (to be added) as well as master data (which will be added if not found). But instead of passing the ids of the master data, customer passes the 'name' column.

系统会检查这些名称是否存在主数据。如果找到,它将使用数据库中的ID,否则先创建此主数据,然后再使用这些ID。

System checks if master data exist for these names. If found, it uses the ids from database otherwise create this master data first and then use these ids.

主数据ID解析后,系统将事务数据插入SQL Server数据库(使用主数据ID)。每封邮件的主实体数量约为15-20。

Once master data ids are resolved, system inserts the transactional data to a SQL Server database (using master data ids). Number of master entities per message are around 15-20.

以下是我们可以采用的一些策略。

Following are the some strategies we can adopt.


  1. 我们可以首先从C#代码解析主ID(如果找不到主ID,则插入主数据),并将这些ID存储在C#缓存中。解析完所有ID后,我们可以使用 SqlBulkCopy 类批量插入事务数据。我们可以访问数据库15次以获取不同实体的ID,然后再访问数据库一次以插入最终数据。我们可以使用相同的连接在完成所有这些处理后将其关闭。

  1. We can resolve master ids first from our C# code (and insert master data if not found) and store these ids in C# cache. Once all ids are resolved, we can bulk insert the transactional data using SqlBulkCopy class. We can hit the database 15 times to fetch the ids for different entities and then hit database one more time to insert the final data. We can use the same connection will close it after doing all this processing.

我们可以将所有包含主数据和事务数据的消息一次性发送到数据库(以多个TVP的形式),然后在存储过程内部,首先为丢失的数据库创建主数据,然后插入事务数据。

We can send all these messages containing master data and transactional data in single hit to the database (in the form of multiple TVP) and then inside stored procedure, create the master data first for the missing ones and then insert the transactional data.

有人可以在这种用例中建议最好的方法吗?

Could anyone suggest the best approach in this use case?

由于某些隐私问题,我无法共享实际的对象结构。但是,这里的假设对象结构非常接近我们的业务对象

一条这样的消息将包含有关不同供应商的一种产品(其主数据)及其价格详细信息(交易数据)的信息:

One such message will contain information about one product (its master data) and its price details (transaction data) from different vendors:

主数据(如果找不到则需要添加)

产品名称:ABC,产品类别:XYZ,制造商:XXX和其他一些详细信息(属性数在15到20之间)。

Product name: ABC, ProductCateory: XYZ, Manufacturer: XXX and some other other details (number of properties are in the range of 15-20).

交易数据(将始终添加)

供应商名称:A,标价:XXX,折扣:XXX

Vendor Name: A, ListPrice: XXX, Discount: XXX

供应商名称:B,标价:XXX,折扣:XXX

Vendor Name: B, ListPrice: XXX, Discount: XXX

供应商名称:C,标价:XXX,折扣:XXX

Vendor Name: C, ListPrice: XXX, Discount: XXX

供应商名称:D,标价:XXX,折扣:XXX

Vendor Name: D, ListPrice: XXX, Discount: XXX

对于一条消息属于一个产品,大多数有关主数据的信息将保持不变(并且更改频率较低) ),但交易数据始终会波动。因此,系统将检查系统中是否存在产品 XXX。如果不是,请检查是否存在此产品提到的类别。如果没有,它将为类别和产品插入新记录。

Most of the information about the master data will remain the same for a message belong to one product (and will change less frequently) but transaction data will always fluctuate. So, system will check if the product 'XXX' exist in the system or not. If not it check if the 'Category' mentioned with this product exist of not. If not, it will insert a new record for category and then for product. This will be done to for Manufacturer and other master data.

多个供应商将同时发送有关多个产品(2000-5000)的数据。

Multiple vendors will be sending data about multiple products (2000-5000) at the same time.

因此,假设我们有1000个供应商,每个供应商正在发送有关10-15种不同产品的数据。每隔2-3秒,每个供应商都会向我们发送这10种产品的价格更新。他可能会开始发送有关新产品的数据,但是不会很频繁。

So, assume that we have 1000 suppliers, Each vendor is sending data about 10-15 different products. After each 2-3 seconds, every vendor sends us the price updates of these 10 products. He may start sending data about new products, but which will not be very frequent.

推荐答案

#2想法(即使用多个TVP一次性将所有15-20个实体发送到数据库,并处理多达2000条消息的整个集合。)

You would likely be best off with your #2 idea (i.e. sending all of the 15 - 20 entities to the DB in one shot using multiple TVPs and processing as a whole set of up to 2000 messages).

在应用程序层缓存主数据查找并在发送到数据库之前进行翻译听起来不错,但是会丢失一些东西:

Caching master data lookups at the app layer and translating prior to sending to the DB sounds great, but misses something:


  1. 您将拥有进入数据库以获取初始列表

  2. 您将不得不进入数据库以插入新条目

  3. 在其中查找值完全用 字典替换ID(假定每个名称到ID的查找都使用非聚集索引)

  4. 经常查询值会将其数据页缓存在缓冲池中(这是内存缓存)

  1. You are going to have to hit the DB to get the initial list anyway
  2. You are going to have to hit the DB to insert new entries anyway
  3. Looking up values in a dictionary to replace with IDs is exactly what a database does (assume a Non-Clustered Index on each of these name-to-ID lookups)
  4. Frequently queried values will have their datapages cached in the buffer pool (which is a memory cache)

为什么在重复应用程序层我已经提供了并且现在在数据库层发生了,尤其是在以下情况下:

Why duplicate at the app layer what is already provided and happening right now at the DB layer, especially given:


  • 15-20个实体可以最多有2万条记录(这是一个相对较小的记录,尤其是考虑到非聚集索引仅需要两个字段时:名称 ID ,当使用100%填充因子时,可以将多个行打包到一个数据页中。)

  • 并非所有的2万个条目都处于活动或当前状态,因此您不必担心将它们全部都缓存。因此,无论当前值是什么,都可以轻松地将其识别为要查询的值,并且那些数据页(其中可能包含一些不活动的条目,但在那里没什么大不了的)将被缓存在Buffer中池。

  • The 15 - 20 entities can have up to 20k records (which is a relatively small number, especially when considering that the Non-Clustered Index only needs to be two fields: Name and ID which can pack many rows into a single data page when using a 100% Fill Factor).
  • Not all 20k entries are "active" or "current", so you don't need to worry about caching all of them. So whatever values are current will be easily identified as the ones being queried, and those data pages (which may include some inactive entries, but no big deal there) will be the ones to get cached in the Buffer Pool.

因此,您不必担心老化旧条目或由于可能的更改而导致任何密钥到期或重新加载值(即为特定的 ID 更新的名称),因为它是自然处理的。

Hence, you don't need to worry about aging out old entries OR forcing any key expirations or reloads due to possibly changing values (i.e. updated Name for a particular ID) as that is handled naturally.

是的,内存缓存是一种很棒的技术,可以极大地加快网站访问速度,但是这些方案/用例适用于非数据库进程出于纯粹的只读目的一遍又一遍地请求相同数据的情况。但是,在这种特殊情况下,数据被合并,并且查找值的列表可能会经常更改(更多是由于新条目,而不是由于更新条目)。

Yes, in-memory caching is wonderful technology and greatly speeds up websites, but those scenarios / use-cases are for when non-database processes are requesting the same data over and over in pure read-only purposes. But this particular scenario is one in which data is being merged and the list of lookup values can be changing frequently (moreso due to new entries than due to updated entries).

总而言之,选择#2是必经之路。尽管没有15个TVP,我已经多次成功地完成了这项技术。可能需要对该方法进行一些优化/调整以调整这种特殊情况,但是我发现效果很好的是:

That all being said, Option #2 is the way to go. I have done this technique several times with much success, though not with 15 TVPs. It might be that some optimizations / adjustments need to be made to the method to tune this particular situation, but what I have found to work well is:


  • 通过TVP接受数据。我比较喜欢 SqlBulkCopy ,因为:

    • 它使存储过程易于自包含

    • 它非常适合应用程序代码,以将集合完全流式传输到数据库,而无需先将集合复制到 DataTable ,这会复制集合,这会浪费CPU和内存。这要求您为每个集合创建一个返回 IEnumerable< SqlDataRecord> 的方法,接受该集合作为输入,并使用 yield return; 来发送 for foreach 循环中的每条记录。

    • Accept the data via TVP. I prefer this over SqlBulkCopy because:
      • it makes for an easily self-contained Stored Procedure
      • it fits very nicely into the app code to fully stream the collection(s) to the DB without needing to copy the collection(s) to a DataTable first, which is duplicating the collection, which is wasting CPU and memory. This requires that you create a method per each collection that returns IEnumerable<SqlDataRecord>, accepts the collection as input, and uses yield return; to send each record in the for or foreach loop.

      步骤1 :为每个实体插入缺少的名称。请记住,每个实体的 [名称] 字段上应该有一个非聚集索引,并且假设ID为聚集索引,则该值自然将成为非聚集索引的一部分。索引,因此 [Name] 仅会提供覆盖索引,并有助于以下操作。并且还请记住,对该客户端的任何先前执行(即大致相同的实体值)将导致这些索引的数据页保留在缓冲池(即内存)中。

      Step 1: Insert missing Names for each entity. Remember that there should be a NonClustered Index on the [Name] field for each entity, and assuming that the ID is the Clustered Index, that value will naturally be a part of the index, hence [Name] only will provide a covering index in addition to helping the following operation. And also remember that any prior executions for this client (i.e. roughly the same entity values) will cause the data pages for these indexes to remain cached in the Buffer Pool (i.e. memory).

      ;WITH cte AS
      (
        SELECT DISTINCT tmp.[Name]
        FROM   @EntityNumeroUno tmp
      )
      INSERT INTO EntityNumeroUno ([Name])
        SELECT cte.[Name]
        FROM   cte
        WHERE  NOT EXISTS(
                       SELECT *
                       FROM   EntityNumeroUno tab
                       WHERE  tab.[Name] = cte.[Name]
                         )
      


    • 步骤2:在简单的 INSERT ... SELECT 中插入所有消息,其中查找表的数据页(即实体)位于由于第1步已经缓存在缓冲池中

    • Step 2: INSERT all of the "messages" in simple INSERT...SELECT where the data pages for the lookup tables (i.e. the "entities") are already cached in the Buffer Pool due to Step 1

      最后,请记住,猜想/假设/有根据的猜测不能替代测试ing。您需要尝试一些方法来查看哪种方法最适合您的特定情况,因为可能还有一些尚未共享的详细信息可能会影响此处的理想内容。

      Finally, keep in mind that conjecture / assumptions / educated guesses are no substitute for testing. You need to try a few methods to see what works best for your particular situation since there might be additional details that have not been shared that could influence what is considered "ideal" here.

      我会说,如果消息仅是插入的,那么弗拉德的想法可能会更快。我在这里描述的方法是在更复杂且需要完全同步(更新和删除)的情况下使用的,并进行了其他验证和创建相关的操作数据(而不是查找值)。使用 SqlBulkCopy 可能在直接插入时会更快(尽管仅2000条记录,我怀疑两者之间是否存在很大差异),但这假定您是直接加载到目标表(消息和查找),而不是加载到中间表/临时表(我相信Vlad的想法是直接将 SqlBulkCopy 加载到目标表)。但是,如上所述,由于更新查找值的问题,使用外部高速缓存(即不使用缓冲池)也更容易出错。无效外部缓存可能需要花费更多的代码,尤其是使用外部缓存仅快一点时。需要将额外的风险/维护因素综合考虑为哪种方法更适合您的需求。

      I will say that if the Messages are insert-only, then Vlad's idea might be faster. The method I am describing here I have used in situations that were more complex and required full syncing (updates and deletes) and did additional validations and creation of related operational data (not lookup values). Using SqlBulkCopy might be faster on straight inserts (though for only 2000 records I doubt there is much difference if any at all), but this assumes you are loading directly to the destination tables (messages and lookups) and not into intermediary / staging tables (and I believe Vlad's idea is to SqlBulkCopy directly to the destination tables). However, as stated above, using an external cache (i.e. not the Buffer Pool) is also more error prone due to the issue of updating lookup values. It could take more code than it's worth to account for invalidating an external cache, especially if using an external cache is only marginally faster. That additional risk / maintenance needs to be factored into which method is overall better for your needs.

      UPDATE

      根据评论中提供的信息,我们现在知道:

      Based on info provided in comments, we now know:


      • 有多个供应商

      • 每个供应商提供多种产品

      • 产品并非供应商独有;产品由1个或多个供应商出售

      • 产品属性是单个的

      • 定价信息具有可以包含多个记录的属性

      • 定价信息仅是INSERT(即时间点历史记录)

      • 唯一产品由SKU(或类似字段)确定

      • 一旦创建,带有现有SKU但其他属性(例如类别,制造商等)不同的产品将被视为同一产品

      • There are multiple Vendors
      • There are multiple Products offered by each Vendor
      • Products are not unique to a Vendor; Products are sold by 1 or more Vendors
      • Product properties are singular
      • Pricing info has properties that can have multiple records
      • Pricing info is INSERT-only (i.e. point-in-time history)
      • Unique Product is determined by SKU (or similar field)
      • Once created, a Product coming through with an existing SKU but different properties otherwise (e.g. category, manufacturer, etc) will be considered the same Product; the differences will be ignored

      考虑到所有这些,我仍然会推荐TVP,但要重新思考方法并它以供应商为中心,而不是以产品为中心。这里的假设是供应商随时发送文件。因此,当您获取文件时,将其导入。您将提前进行的唯一查找是供应商。这是基本布局:

      With all of this in mind, I will still recommend TVPs, but to re-think the approach and make it Vendor-centric, not Product-centric. The assumption here is that Vendor's send files whenever. So when you get a file, import it. The only lookup you would be doing ahead of time is the Vendor. Here is the basic layout:


      1. 似乎可以合理地假设您此时已经有了VendorID,因为系统为什么要导入文件

      2. 您可以批量导入

      3. 创建一个 SendRows 方法:

        • 接受FileStream或允许前进的文件

        • 接受类似 int BatchSize

        • 返回 IEnumerable< SqlDataRecord>

        • 创建一个 SqlDataRecord 匹配TVP结构

        • 通过FileStream循环,直到达到BatchSize或文件中没有更多记录为止

        • 对数据执行任何必要的验证

        • 将数据映射到 SqlDataRecord

        • 通话收益回报;

      1. Seems reasonable to assume that you already have a VendorID at this point because why would the system be importing a file from an unknown source?
      2. You can import in batches
      3. Create a SendRows method that:
        • accepts a FileStream or something that allows for advancing through a file
        • accepts something like int BatchSize
        • returns IEnumerable<SqlDataRecord>
        • creates a SqlDataRecord to match the TVP structure
        • for loops though the FileStream until either BatchSize has been met or no more records in the File
        • perform any necessary validations on the data
        • map the data to the SqlDataRecord
        • call yield return;

      • 调用时e将proc

      • 传递给VendorID

      • 传递 SendRows(FileStream,BatchSize) TVP

      • call the stored proc
      • pass in VendorID
      • pass in SendRows(FileStream, BatchSize) for the TVP

      • 在围绕FileStream的循环之前打开SqlConnection,并在循环完成后将其关闭

      • 打开SqlConnection,执行存储过程,并在内部关闭SqlConnection。 FileStream循环

      使用这种类型的结构,您将发送未使用的产品属性(即,仅SKU用于查找现有产品)。但是,它可以很好地扩展,因为文件大小没有上限。如果供应商发送了50种产品,则可以。如果他们发送50k产品,则可以。如果他们发送了400万个产品(我正在使用的系统,并且确实处理了更新的产品信息,而该信息因其任何属性而异!),那就很好。应用程序层或数据库层的内存没有增加,无法处理1000万个产品。导入所需的时间应与发送的产品数量同步增加。

      Using this type of structure you will be sending in Product properties that are not used (i.e. only the SKU is used for the look up of existing Products). BUT, it scales very well as there is no upper-bound regarding file size. If the Vendor sends 50 Products, fine. If they send 50k Products, fine. If they send 4 million Products (which is the system I worked on and it did handle updating Product info that was different for any of its properties!), then fine. No increase in memory at the app layer or DB layer to handle even 10 million Products. The time the import takes should increase in step with the amount of Products sent.

      更新2

      与源数据有关的新详细信息:

      UPDATE 2
      New details related to Source data:


      • 来自Azure EventHub

      • 以C#对象的形式出现(没有文件)

      • 产品详细信息通过OP的系统API进入

      • 被收集在单个队列中(只需将数据插入数据库中即可)

      • comes from Azure EventHub
      • comes in the form of C# objects (no files)
      • Product details come in through O.P.'s system's APIs
      • is collected in single queue (just pull data out insert into database)

      如果数据源是C#对象,那么我肯定会使用TVP,因为您可以发送通过我在第一次更新中描述的方法(即返回 IEnumerable< SqlDataRecord> 的方法)将它们原样保留下来。发送一个或多个TVP,以获取每个供应商的价格/报价详细信息,但发送常规输入参数以获取单个属性属性。例如:

      If the data source is C# objects then I would most definitely use TVPs as you can send them over as is via the method I described in my first update (i.e. a method that returns IEnumerable<SqlDataRecord>). Send one or more TVPs for the Price/Offer per Vendor details but regular input params for the singular Property attributes. For example:

      CREATE PROCEDURE dbo.ImportProduct
      (
        @SKU             VARCHAR(50),
        @ProductName     NVARCHAR(100),
        @Manufacturer    NVARCHAR(100),
        @Category        NVARCHAR(300),
        @VendorPrices    dbo.VendorPrices READONLY,
        @DiscountCoupons dbo.DiscountCoupons READONLY
      )
      SET NOCOUNT ON;
      
      -- Insert Product if it doesn't already exist
      IF (NOT EXISTS(
               SELECT  *
               FROM    dbo.Products pr
               WHERE   pr.SKU = @SKU
                    )
         )
      BEGIN
        INSERT INTO dbo.Products (SKU, ProductName, Manufacturer, Category, ...)
        VALUES (@SKU, @ProductName, @Manufacturer, @Category, ...);
      END;
      
      ...INSERT data from TVPs
      -- might need OPTION (RECOMPILE) per each TVP query to ensure proper estimated rows
      

      这篇关于从C#到SQL Server的批量插入策略的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆