了解rxjs中的反压 - 只缓存5个图像等待上传 [英] Understanding back-pressure in rxjs - only cache 5 images waiting for upload

查看:94
本文介绍了了解rxjs中的反压 - 只缓存5个图像等待上传的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在开发一个节点项目,需要提交数千张图片进行处理。在将这些图像上传到处理服务器之前,需要调整它们的大小,以便我有类似的内容:

I am working on a node project that needs to submit thousands of images for processing. Before these images are uploaded to the processing server they need to be resized so I have something along the lines of this:

imageList
    .map(image => loadAndResizeImage)
    .merge(3)
    .map(image => uploadImage)
    .merge(3)
    .subscribe();

图像大小调整通常需要十分之几秒,上传和处理大约需要4秒。

Image resizing typically takes a few tenths of a second, uploading and processing takes around 4 seconds.

当我等待上传队列清除时,如何防止在内存中累积数千个已调整大小的图像?我可能希望调整5张图像大小并等待,这样一旦图像上传完成,下一个已调整大小的图像就会从队列中拉出并上传,新图像会被调整大小并添加到缓冲区。

How can I prevent thousands of resized images building up in memory as I wait for the upload queue to clear? I probably want to have 5 images resized and waiting to go so that as soon as an image upload finishes the next resized image is pulled from the queue and uploaded and a new image is resized and added to the 'buffer'.

可以在此处找到问题的说明:

An illustration of the issue can be found here:

https://jsbin.com/webaleduka/4/edit?js,console

这里有一个加载步骤(耗时200ms)和一个处理步骤(耗时4秒)。每个进程限制为2的并发性。
我们可以看到,在25个初始项目中,我们在内存中获得了20个图像。

Here there is a load step (taking 200ms) and a process step (taking 4 seconds). Each process is limited to a concurrency of 2. We can see that with 25 initial items we get to 20 images in memory.

我确实看过了缓冲区选项,但似乎都没有做我想做的事情。

I did look at the buffer options but neither seemed to do what I wanted to do.

目前我刚刚合并了负载,调整大小并上传到一个延迟的observable,我合并了一个最大并发。我想让图像等待上传,我相信它一定是可能的。

At the moment I have just combined the load, resize and upload into one deferred observable that I merge with a max concurrency. I would like to have the images waiting for upload though and I am sure that it must be possible.

我使用的是RXjs 4但是我想主要是相同的5。

I am using RXjs 4 but I imagine the principals will be the same for 5.

非常感谢。

推荐答案

我认为我已设法通过使用 controlled() rxjs运算符解决此问题:

I think that I have managed to solve this by using the controlled() rxjs operator:

var queuedImages = 0;

var imageSource = Rx.Observable.range(1, 25)
  .map(index => "image_" + index)
  .controlled();

imageSource
  .map(image => loadImage(image))
  .merge(2)
  .do((image) => {
    queuedImages++;
    console.log(`Images waiting for processing: ${queuedImages}`);
  })
  .map(image => processImage(image))
  .merge(2)
  .do( () => {
    queuedImages--;
    console.log(`Images waiting for processing: ${queuedImages}`);

    if(queuedImages < 4){
      console.log(`requesting more image loads`);
      imageSource.request(4-queuedImages);
    }
  })
  .subscribe( 
    (item) => {}, null, 
    () => console.log(`All Complete`) );

imageSource.request(4);

最初请求4张图片。这些是从光盘加载然后处理。在加载每个图像然后处理时,使用 queuedImages 变量跟踪内存中的图像数量。当这个数字低于4时,会要求更多图像。

Initially 4 images are requested. These are loaded from disc and then processed. As each image is loaded and then processed the number of images in memory are kept track of using the queuedImages variable. When this number drops below 4 more images are requested.

这里可以看到一个jsbin:

A jsbin of this can be seen here:

https://jsbin.com/webaleduka/11/edit?js,console

此方法意味着缓存中的图像数量不会超过6个,并确保缓存中始终有足够的图像等待上传。

This method means that there are never more than 6 or so images in the cache and ensures that there are always enough images in the cache waiting to be uploaded.

这篇关于了解rxjs中的反压 - 只缓存5个图像等待上传的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆