数据流以极快的工作,小作业,然后再组 [英] Dataflow with splitting work to small jobs and then group again
问题描述
我需要做这样的工作:
- 从数据库中获取Page对象
- 对于每个页面获取所有图像和处理它们(IO约束,例如,上传到CDN)
- 如果所有图像进展顺利将其标记页在数据库处理
因为我需要控制我并行多少网页的处理,我决定去与TPL数据流:
Since I need to control how much Pages I process in parallel I've decided to go with TPL Dataflows:
____________________________
| Data pipe |
| BufferBlock<Page> |
| BoundedCapacity = 1 |
|____________________________|
|
____________________________
| Process images |
| TransformBlock<Page, Page> |
| BoundedCapacity = 1 |
| MaxDegreeOfParallelism = 8 |
|____________________________|
|
____________________________
| Save page |
| ActionBlock<Page> |
| BoundedCapacity = 1 |
| MaxDegreeOfParallelism = 5 |
|____________________________|
现在我需要的处理图像来并行处理图像,但我想限制多少图片我在各地的工作当前所有并行处理页
Now I need the "Process images" to process images in parallel but I want to limit how much images I've processing across all parallel pages in work currently.
我可以使用TrasnformManyBlock为处理图像但我怎么收集他们早在保存页面块?
I can use TrasnformManyBlock for "Process images" but how do I gather them back in "Save page" block?
____________________________
| Data pipe |
| BufferBlock<Page> |
| BoundedCapacity = 1 |
|____________________________|
|
___________________________________
| Load images |
| TransformManyBlock<Page, Image[]> |
| BoundedCapacity = 1 |
| MaxDegreeOfParallelism = 8 |
|___________________________________|
/ | \
______________________________________________
_|____________________________________________ |
| Process image | |
| TransformBlock<ImageWithPage, ImageWithPage> | |
| BoundedCapacity = 1 | |
| MaxDegreeOfParallelism = 8 |_|
|______________________________________________|
\ | /
How to group images by page ?
|
____________________________
| Save page |
| ActionBlock<Page> |
| BoundedCapacity = 1 |
| MaxDegreeOfParallelism = 5 |
|____________________________|
在最重要的是潜在的图像之一可能无法将继续,我不想保存网页失败的图像。
On top of that potentially one of the images could fail to be proceed and I don't want to save page with failed images.
推荐答案
您可以把这些图像一起被记录,每当某个网页的图像到达然后再送就当所有的图像到达的页面。为了弄清这一点,网页需要知道多少图像包含的,但我想你知道,
You can group the images together by recording whenever an image for a given page arrives and then sending the page on when all images arrived. To figure that out, page needs to know how many images it contains, but I assume you know that.
在代码中,它可能是这个样子:
In code, it could look something like this:
public static IPropagatorBlock<TSplit, TMerged>
CreaterMergerBlock<TSplit, TMerged>(
Func<TSplit, TMerged> getMergedFunc, Func<TMerged, int> getSplitCount)
{
var dictionary = new Dictionary<TMerged, int>();
return new TransformManyBlock<TSplit, TMerged>(
split =>
{
var merged = getMergedFunc(split);
int count;
dictionary.TryGetValue(merged, out count);
count++;
if (getSplitCount(merged) == count)
{
dictionary.Remove(merged);
return new[] { merged };
}
dictionary[merged] = count;
return new TMerged[0];
});
}
用法:
Usage:
var dataPipe = new BufferBlock<Page>();
var splitter = new TransformManyBlock<Page, ImageWithPage>(
page => page.LoadImages(),
new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 8 });
var processImage = new TransformBlock<ImageWithPage, ImageWithPage>(
image =>
{
// process the image here
return image;
}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 8 });
var merger = CreaterMergerBlock(
(ImageWithPage image) => image.Page, page => page.ImageCount);
var savePage = new ActionBlock<Page>(
page => /* save the page here */,
new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 5 });
dataPipe.LinkTo(splitter);
splitter.LinkTo(processImage);
processImage.LinkTo(merger);
merger.LinkTo(savePage);
这篇关于数据流以极快的工作,小作业,然后再组的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!