使用存储过程的Azure documentdb批量插入 [英] Azure documentdb bulk insert using stored procedure

查看:620
本文介绍了使用存储过程的Azure documentdb批量插入的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

您好我现在用16集每object.I正在使用存储过程来插入这些documents.I插入大约3-4亿JSON对象从5-10K有22容量机组。

Hi I am using 16 collections to insert around 3-4 million json objects ranging from 5-10k per object.I am using stored procedure to insert these documents.I have 22 Capacity Unit.

function bulkImport(docs) {
    var collection = getContext().getCollection();
    var collectionLink = collection.getSelfLink();

    // The count of imported docs, also used as current doc index.
    var count = 0;

    // Validate input.
    if (!docs) throw new Error("The array is undefined or null.");

    var docsLength = docs.length;
    if (docsLength == 0) {
        getContext().getResponse().setBody(0);
    }

    // Call the CRUD API to create a document.
    tryCreateOrUpdate(docs[count], callback);

    // Note that there are 2 exit conditions:
    // 1) The createDocument request was not accepted. 
    //    In this case the callback will not be called, we just call setBody and we are done.
    // 2) The callback was called docs.length times.
    //    In this case all documents were created and we don't need to call tryCreate anymore. Just call setBody and we are done.
    function tryCreateOrUpdate(doc, callback) {
        var isAccepted = true;
        var isFound = collection.queryDocuments(collectionLink, 'SELECT * FROM root r WHERE r.id = "' + doc.id + '"', function (err, feed, options) {
            if (err) throw err;
            if (!feed || !feed.length) {
                isAccepted = collection.createDocument(collectionLink, doc, callback);
            }
            else {
                // The metadata document.
                var existingDoc = feed[0];
                isAccepted = collection.replaceDocument(existingDoc._self, doc, callback);
            }
        });

        // If the request was accepted, callback will be called.
        // Otherwise report current count back to the client, 
        // which will call the script again with remaining set of docs.
        // This condition will happen when this stored procedure has been running too long
        // and is about to get cancelled by the server. This will allow the calling client
        // to resume this batch from the point we got to before isAccepted was set to false
        if (!isFound && !isAccepted) getContext().getResponse().setBody(count);
    }

    // This is called when collection.createDocument is done and the document has been persisted.
    function callback(err, doc, options) {
        if (err) throw err;

        // One more document has been inserted, increment the count.
        count++;

        if (count >= docsLength) {
            // If we have created all documents, we are done. Just set the response.
            getContext().getResponse().setBody(count);
        } else {
            // Create next document.
            tryCreateOrUpdate(docs[count], callback);
        }
    }

我的C#codeS看起来像这样

my C# codes looks like this

    public async Task<int> Add(List<JobDTO> entities)
            {

                    int currentCount = 0;
                    int documentCount = entities.Count;

                    while(currentCount < documentCount)
                    {
                        string argsJson = JsonConvert.SerializeObject(entities.Skip(currentCount).ToArray());
                        var args = new dynamic[] { JsonConvert.DeserializeObject<dynamic[]>(argsJson) };

                        // 6. execute the batch.
                        StoredProcedureResponse<int> scriptResult = await DocumentDBRepository.Client.ExecuteStoredProcedureAsync<int>(sproc.SelfLink, args);

                        // 7. Prepare for next batch.
                        int currentlyInserted = scriptResult.Response;

                        currentCount += currentlyInserted;

                    }

                    return currentCount;
            }

我现在面临的问题是出400K的文件,我试图在次文件中插入得到错过了出来给予任何的错误。

The problem I am facing is out of 400k documents that I try to insert at times documents get missed with out giving any error.

在应用程序部署云工作者的作用。
如果我增加documentDB插入线程或实例的数目的错过文档的数量也高得多。

The application is worker role deployed on cloud. If I increase the number of threads or instances inserting in documentDB the number of documents missed are much higher.

如何弄清楚什么是提前problem.Thanks。

how to figure out what is the problem.Thanks in Advance.

推荐答案

我发现,尝试这种code时,我会得到在docs.length一个错误,其中指出,长度是不确定的。

I found that when trying this code I would get an error at docs.length which stated that length was undefined.

function bulkImport(docs) {
    var collection = getContext().getCollection();
    var collectionLink = collection.getSelfLink();

    // The count of imported docs, also used as current doc index.
    var count = 0;

    // Validate input.
    if (!docs) throw new Error("The array is undefined or null.");

    var docsLength = docs.length; // length is undefined
}

许多测试(找不到Azure的文档中的任何东西),我意识到,当有人建议我不能错过的数组。参数必须是一个对象。我不得不修改批次code这样才能让它运行。

After many tests (could not find anything in Azure documentation) I realized that I could not pass an array as was suggested. The parameter had to be an object. I had to modify the batch code like this in order for it to run.

我也发现我不能简单地尝试,要么通过在DocumentDB脚本资源管理器(输入框)文件的数组。即使占位符的帮助文本说就可以了。

I also found I could not simply try and pass an array of documents in the DocumentDB script explorer (Input box) either. Even though the placeholder help text says you can.

这code为我工作:

// psuedo object for reference only
docObject = {
  "items": [{doc}, {doc}, {doc}]
}

function bulkImport(docObject) {
    var context = getContext();
    var collection = context.getCollection();
    var collectionLink = collection.getSelfLink();
    var count = 0;

    // Check input
    if (!docObject.items || !docObject.items.length) throw new Error("invalid document input parameter or undefined.");
    var docs = docObject.items;
    var docsLength = docs.length;
    if (docsLength == 0) {
        context.getResponse().setBody(0);
    }

    // Call the funct to create a document.
    tryCreateOrUpdate(docs[count], callback);

    // Obviously I have truncated this function. The above code should help you understand what has to change.
}

希望Azure的文件将赶上或变得更容易找到,如果我错过了。

Hopefully Azure documentation will catch up or become easier to find if I missed it.

我也可以放置在Script Explorer中的bug报告,希望该Azurites将更新。

I'll also be placing a bug report for the Script Explorer in hopes that the Azurites will update.

这篇关于使用存储过程的Azure documentdb批量插入的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆