TPL数据流-如何调用动作项多个项 [英] TPL Dataflow - how to call action item multiple items

查看:88
本文介绍了TPL数据流-如何调用动作项多个项的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我是TPL Dataflow上的新手.我有一个需要处理的项目编号列表.一个项目可能有大约8000个项目,我需要获取项目中每个项目的数据,然后将这些数据推送到5个单独的服务器中.

I am a newbie on TPL Dataflow. I have a list of project numbers that I need to process. A project could have about 8000 items and I need to get the data for each item in the project and then push this data into 5 separate servers.

这是我到目前为止编码的内容.我被困在如何将这些数据加载到5台服务器中的步骤.我不确定这是否正确编码.任何建议都将不胜感激.

Here is what I have coded thus far. I'm stuck at the step of how to load this data into the 5 servers. I am not sure if this is coded correctly. Any advice is much appreciated.

public  static bool PushData(string projectId)
{
    var linkCompletion = new DataflowLinkOptions
    {
        PropagateCompletion = true
    };

    var projectItems = new TransformBlock<ProjectDTO, ProjectDTO>(
        dto => dto.GetItemData(dto), new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 5 });

    var itemData = new ActionBlock<ProjectDTO>(
         dto =>  PostEachServerAsync(dto, "server1", "setmemcache"), new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 5 });


    projectItems.LinkTo(projectRules, linkCompletion);

    IList<ProjectDTO> dtoList = new List<ProjectDTO>();
    dtoList = MemcachedDTO.GetDataByProject(projectId);

    foreach (ProjectDTOd in dtoList)
    {
        projectItems.Post(d);
    }

    projectItems.Complete();
    projectItems.Completion.Wait();
    return false;
}

这是我的代码-但它无法正确完成-有人可以告诉我我做错了什么吗

Here is my code now - but it does not complete properly - could anyone please tell me what I am doing wrong?

             [HttpGet]
    public HttpResponseMessage ReloadItem(string projectQuery)
    {
        try
        {

            var linkCompletion = new DataflowLinkOptions
            {
                PropagateCompletion = true
            };

            IList<string> projectIds = projectQuery.Split(',').ToList();
            IEnumerable<string> serverList = ConfigurationManager.AppSettings["ServerList"].Split(',').Cast<string>();

            var iR = new TransformBlock<MemcachedDTO, MemcachedDTO>(
                dto => dto.GetIR(dto), new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 3 });

            var pR = serverList.Select(
                    i => new { Id = i, Action = new ActionBlock<MemcachedDTO>(dto => PostEachServerAsync(dto, i, "set"), new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 3 }) });

            List<MemcachedDTO> dtoList = new List<MemcachedDTO>();

            foreach (string pid in projectIds)
            {
                IList<MemcachedDTO> dtoTemp = new List<MemcachedDTO>();
                dtoTemp = MemcachedDTO.GetItemIdsByProject(pid);
                dtoList.AddRange(dtoTemp);
            }


            foreach (var action in pR)
            {
                iR.LinkTo(action.Action, linkCompletion);
            }

            foreach (MemcachedDTO d in dtoList)
            {
                iR.Post(d);
            }
            iR.Complete();
            foreach (var action in pR)
            {
                action.Action.Completion.Wait();
            }


            return Request.CreateResponse(HttpStatusCode.OK, new { message = projectIds.ToString() + " reload success" });
        }
        catch (Exception ex)
        {
            return Request.CreateResponse(HttpStatusCode.InternalServerError, new { message = ex.Message.ToString() });
        }
    }

推荐答案

您的代码根本无法编译,如何运行?

Your code doesn't compile at all, how do you run it?

首先,不要使用.Wait()阻塞线程,请使用 async/await模式在这里.其次,您需要 BroadcastBlock 来通知多个块与您的数据.第三,您需要5个不同的 ActionBlock ,而不是1并行度5.第四,您正在等待错误的

First of all, do not block your thread with .Wait(), use async/await pattern here. Second, you need a BroadcastBlock to notify more than 1 block with your data. Third, you need 5 different ActionBlocks, not 1 with degree of parallelism 5. Fourth, you're awaiting wrong Completion task - wait for the last block completion, not the first one, so in your case you need to wait for 5 block completions with WhenAll method.

所以您的代码可能是这样的(我假设projectRulesitemsData是同一块):

So your code could be like this (I assume that projectRules and itemsData is the same block):

public static async Task<bool> PushData(string projectId)
{
    var linkCompletion = new DataflowLinkOptions
    {
        PropagateCompletion = true
    };

    var projectItems = new TransformBlock<ProjectDTO, ProjectDTO>(
        dto => dto.GetItemData(dto), new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 5 });

    var broadcast = new BroadcastBlock<ProjectDTO>();
    projectItems.LinkTo(broadcast, linkCompletion);

    var pR = serverList.Select(
            i => new { Id = i, Action = new ActionBlock<MemcachedDTO>(dto => PostEachServerAsync(dto, i, "set"), new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 3 }) });

    foreach (var action in pR)
    {
        broadcast.LinkTo(action.Action, linkCompletion);
    }

    var dtoList = MemcachedDTO.GetDataByProject(projectId);

    foreach (var d in dtoList)
    {
        projectItems.Post(d);
    }
    projectItems.Complete();

    // wait all the action blocks to finish
    await Task.WhenAll(projectRules1.Completion, projectRules2.Completion, projectRules3.Completion, projectRules4.Completion, projectRules5.Completion);
    return false;
}

这篇关于TPL数据流-如何调用动作项多个项的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆