使用RxJS进行批处理? [英] Batching using RxJS?

查看:100
本文介绍了使用RxJS进行批处理?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我猜这应该有点容易实现,但我遇到了麻烦(概念上,我猜)找出解决方法。



我是什么has是一个返回JSON对象数组的API。我需要逐步浏览这些对象,并为每个对象进行另一个AJAX调用。问题是处理每个AJAX调用的系统一次只能处理两个活动调用(因为它是一个CPU密集型任务,可以挂钩到桌面应用程序中)。



<我想知道如何使用RxJS(使用版本5或4)来实现这一目标?



编辑:此外,是否可以有一系列步骤同时运行。 ie




下载文件:1
处理文件:1
转换文件:1
上传文件:1
下载文件:2
处理文件:2
转换文件:2
上传文件:2
下载文件:3
处理文件:3
转换文件:3
上传文件:3



我尝试过做某事喜欢:

  Rx.Observable.fromPromise(start())
.concatMap(arr => Rx.Observable .from(arr))
.concatMap(x => downloadFile(x))
.concatMap((entry)=> processFile(entry))
.concatMap((entry) => convertFile(entry))
.concatMap((entry)=> UploadFile(entry))
.subscribe(
data => console.log('data',new Date()。getTime(),data),
error => logger.warn('err',error),
complete => logger.info('complete')
);

然而,这似乎不起作用。例如,downloadFile不等待processFile,convertFile和uploadFile全部完成,而是下一个将在前一个完成后再次运行。

解决方案

如果你想要的请求序列完全像这样,有两种方法

 正在下载文件:1 
处理文件:1
转换文件:1
上传文件:1
下载文件:2
处理文件:2
...

你需要解决单个concatMap方法中的所有承诺,比如这个

  Rx.Observable.fromPromise(getJSONOfAjaxRequests())
.flatMap(function(x){return x;})
.concatMap( function(item){
return downloadFile(item)
.then(processFile)
.then(convertFile);
})
.subscribe(function(data) {
console.log(data);
});

在此处查看工作插件: https://plnkr.co/edit/iugdlC2PpW3NeNF2yLzS?p=preview
这样,新的ajax调用只有在前一个调用时才会发送已完成。



另一种方法是允许文件并行发送请求,但操作的下载,处理,转换,上传将按顺序进行。为此你可以通过它来实现它

  Rx.Observable.fromPromise(getJSONOfAjaxRequests())
.flatMap(function (x){return x;})
.merge(2)//万一需要最大并发数为2
.concatMap(function(item){
return downloadFile(item);
})
.concatMap(function(item){
return processFile(item);
})
.concatMap(function(item){
return convertFile(item)
})
.subscribe(function(data){
//console.log(data);
});

请参阅此处的plunkr: https://plnkr.co/edit/mkDj6Q7lt72jZKQk8r0p?p=preview


I'm guessing this should be somewhat easy to achieve but I've having trouble (conceptually, I guess) figuring out how to tackle it.

What I have is an API that returns an array of JSON objects. I need to step through these objects, and, for each object, make another AJAX call. The issue is the system that handles each AJAX call can only handle two active calls at a time (as it's quite a CPU-intensive task that hooks out into a desktop application).

I was wondering how I could achieve this using RxJS (either using version 5 or 4)?

EDIT: In addition, is it possible to have a chain of steps running concurrently. i.e.

Downloading File: 1 Processing File: 1 Converting File: 1 Uploading File: 1 Downloading File: 2 Processing File: 2 Converting File: 2 Uploading File: 2 Downloading File: 3 Processing File: 3 Converting File: 3 Uploading File: 3

I've tried doing something like:

Rx.Observable.fromPromise(start())
    .concatMap(arr => Rx.Observable.from(arr))
    .concatMap(x => downloadFile(x))
    .concatMap((entry) => processFile(entry))
    .concatMap((entry) => convertFile(entry))
    .concatMap((entry) => UploadFile(entry))
    .subscribe(
        data => console.log('data', new Date().getTime(), data),
        error => logger.warn('err', error),
        complete => logger.info('complete')
    );

However that doesn't seem to work. The downloadFile, for example doesn't wait for processFile, convertFile and uploadFile to all complete, rather, the next one will run again as soon as the previous one completes.

解决方案

Here are 2 approaches, if you want the sequence of requests exactly like this

Downloading File: 1
Processing File: 1
Converting File: 1
Uploading File: 1
Downloading File: 2
Processing File: 2
...

You need to resolve all promises inside single concatMap method, like this

Rx.Observable.fromPromise(getJSONOfAjaxRequests())
  .flatMap(function(x) { return x;})
  .concatMap(function(item) {
    return downloadFile(item)
      .then(processFile)
      .then(convertFile);
  })
  .subscribe(function(data) {
    console.log(data);
  });

see the working plunkr here: https://plnkr.co/edit/iugdlC2PpW3NeNF2yLzS?p=preview This way, the new ajax call will be sent only when the previous is finished.

Another approach is that allow the files to send requests in parallel but the operations 'downloading,processing,converting,uploading' will be in sequence. For this you can get it working by

Rx.Observable.fromPromise(getJSONOfAjaxRequests())
  .flatMap(function(x) { return x;})
  .merge(2)  // in case maximum concurrency required is 2
  .concatMap(function(item) {
    return downloadFile(item);
  })
  .concatMap(function(item) {
    return processFile(item);
  })
  .concatMap(function(item) {
    return convertFile(item)
  })
  .subscribe(function(data) {
    //console.log(data);
  });

see plunkr here: https://plnkr.co/edit/mkDj6Q7lt72jZKQk8r0p?p=preview

这篇关于使用RxJS进行批处理?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆