数据流以极快的工作,小作业,然后再组 [英] Dataflow with splitting work to small jobs and then group again

查看:180
本文介绍了数据流以极快的工作,小作业,然后再组的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我需要做这样的工作:


  1. 从数据库中获取Page对象

  2. 对于每个页面获取所有图像和处理它们(IO约束,例如,上传到CDN)

  3. 如果所有图像进展顺利将其标记页在数据库处理

因为我需要控制我并行多少网页的处理,我决定去与TPL数据流:

Since I need to control how much Pages I process in parallel I've decided to go with TPL Dataflows:

 ____________________________
|         Data pipe          |
|   BufferBlock<Page>        |
|   BoundedCapacity = 1      |
|____________________________|
              |
 ____________________________
|       Process images       |
| TransformBlock<Page, Page> |
| BoundedCapacity = 1        |
| MaxDegreeOfParallelism = 8 |
|____________________________|
              |
 ____________________________
|        Save page           |
| ActionBlock<Page>          |
| BoundedCapacity = 1        |
| MaxDegreeOfParallelism = 5 |
|____________________________|

现在我需要的处理图像来并行处理图像,但我想限制多少图片我在各地的工作当前所有并行处理页

Now I need the "Process images" to process images in parallel but I want to limit how much images I've processing across all parallel pages in work currently.

我可以使用TrasnformManyBlock为处理图像但我怎么收集他们早在保存页面块?

I can use TrasnformManyBlock for "Process images" but how do I gather them back in "Save page" block?

         ____________________________
        |         Data pipe          |
        |   BufferBlock<Page>        |
        |   BoundedCapacity = 1      |
        |____________________________|
                      |
     ___________________________________
    |           Load images             |
    | TransformManyBlock<Page, Image[]> |
    | BoundedCapacity = 1               |
    | MaxDegreeOfParallelism = 8        |
    |___________________________________|
      /              |              \
   ______________________________________________
 _|____________________________________________  |
|              Process image                   | |
| TransformBlock<ImageWithPage, ImageWithPage> | |
| BoundedCapacity = 1                          | |
| MaxDegreeOfParallelism = 8                   |_|
|______________________________________________|
      \              |               /
         How to group images by page ?
                     |
        ____________________________
       |        Save page           |
       | ActionBlock<Page>          |
       | BoundedCapacity = 1        |
       | MaxDegreeOfParallelism = 5 |
       |____________________________|

在最重要的是潜在的图像之一可能无法将继续,我不想保存网页失败的图像。

On top of that potentially one of the images could fail to be proceed and I don't want to save page with failed images.

推荐答案

您可以把这些图像一起被记录,每当某个网页的图像到达然后再送就当所有的图像到达的页面。为了弄清这一点,网页需要知道多少图像包含的,但我想你知道,

You can group the images together by recording whenever an image for a given page arrives and then sending the page on when all images arrived. To figure that out, page needs to know how many images it contains, but I assume you know that.

在代码中,它可能是这个样子:

In code, it could look something like this:

public static IPropagatorBlock<TSplit, TMerged>
    CreaterMergerBlock<TSplit, TMerged>(
    Func<TSplit, TMerged> getMergedFunc, Func<TMerged, int> getSplitCount)
{
    var dictionary = new Dictionary<TMerged, int>();

    return new TransformManyBlock<TSplit, TMerged>(
        split =>
        {
            var merged = getMergedFunc(split);
            int count;
            dictionary.TryGetValue(merged, out count);
            count++;
            if (getSplitCount(merged) == count)
            {
                dictionary.Remove(merged);
                return new[] { merged };
            }

            dictionary[merged] = count;
            return new TMerged[0];
        });
}



用法:

Usage:

var dataPipe = new BufferBlock<Page>();

var splitter = new TransformManyBlock<Page, ImageWithPage>(
    page => page.LoadImages(),
    new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 8 });

var processImage = new TransformBlock<ImageWithPage, ImageWithPage>(
    image =>
    {
        // process the image here
        return image;
    }, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 8 });

var merger = CreaterMergerBlock(
    (ImageWithPage image) => image.Page, page => page.ImageCount);

var savePage = new ActionBlock<Page>(
    page => /* save the page here */,
    new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 5 });

dataPipe.LinkTo(splitter);
splitter.LinkTo(processImage);
processImage.LinkTo(merger);
merger.LinkTo(savePage);

这篇关于数据流以极快的工作,小作业,然后再组的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆