尝试轮询服务器,但是由于算法中断,rxjs 方法可能不正确 [英] attempting to poll a server however rxjs methods may be incorrect as algorithm breaks

查看:78
本文介绍了尝试轮询服务器,但是由于算法中断,rxjs 方法可能不正确的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

所以我试图轮询我的服务器.我试图每 5 秒轮询一次我的服务器.投票一分钟后超时.我在调试方法中有一些控制台日志,但即使在等待 5 秒后触发的唯一 console.log 是 'validate stream start'

我关注了this教程而无需创建单独的服务,因为我的应用程序中的这一页面只需要它.

我敢打赌这是一个简单的错误或对这些 rxjs 运算符如何工作的误解.

我做错了什么?

startastream(){this.startastreamboolan = true;让计数 = 12;this.http.get(environment.shochat_content_creator_set_valid_stream_start).subscribe((req: any)=>{console.log('验证流开始');计时器(5000).管道(switchMap(() =>{console.log('定时器启动');如果(计数> 0){返回 this.http.get(environment.check_if_stream_is_active_on_mux).订阅((要求:任何)=>{this.streamready = true;返回0;},错误 =>{计数 = 计数 -1;控制台日志(计数);});}}));});}

解决方案

timer

创建一个 Observable,它在 dueTime 后开始发射,并在此后的每个 period 时间后发射不断增加的数字.

timer(dueTime: number | Date = 0, periodOrScheduler?: number | SchedulerLike, scheduler?: SchedulerLike): Observable

<块引用>

它类似于interval,但您可以指定何时开始排放.

在您的代码中,您忘记了 timer 运算符的第二个参数,这意味着它只会等待 5s 然后只发出一次.此外,您没有订阅 timer,这就是为什么它不会更进一步.如果你想每 5s 轮询你的服务器,你需要使用 :

定时器(0, 5000)

在这里,定时器不会等待,而是每隔 5s 直接开始发送值.


接下来,我看到您正在使用 switchMap 来创建内部流,这很棒.但是如果您的服务器需要超过 5s 来处理您的请求,您可能希望使用 mergeMap 代替,因为它不会取消之前正在进行的内部流,这取决于您.

这里的错误是switchMap(或mergeMap)接受一个回调函数,它必须返回一个Observable,然后运营商将自己订阅它.在这里,您返回的是 Subscription.

这是你可以做的:

const start$ = this.http.get(startUrl).pipe(tap(() => console.log('流开始')));const poll$ = this.http.get(pollUrl).pipe(tap(() => (this.streamReady = true)),捕获错误(错误 => {控制台日志(错误);返回空;}));开始$.pipe(switchMap(() => timer(0, 5000).pipe(tap(() => console.log('每 5 秒轮询一次')),mergeMap(() => poll$)))).订阅();

我在这里创建了两个 Observables.start$ 负责启动流,而 poll$ 负责轮询您的服务器.这里的想法是启动流,然后切换到 timer 内部流,它将每 5s 发出一次,然后再次切换到另一个将轮询服务器的内部流.

我的示例中没有包含 count,因此流将使用此代码永远运行.为此,您应该查看 takeUntil 运算符.

希望对您有所帮助,欢迎提问!

so I am attempting to poll my server. I am trying to poll my server every 5 seconds. And the poll times out after one minute. I have a few console logs in the method for debugging but the only console.log that fires even after waiting 5 seconds is 'validate stream start'

I followed this tutorial without creating a separate service as I just need it for this one page in my application.

I'm willing to bet its a simple error or a misunderstanding in how these rxjs operators work.

What am I doing wrong?

startastream(){
    this.startastreamboolan = true;
    let count = 12;
    this.http.get(environment.shochat_content_creator_set_valid_stream_start)
      .subscribe((req: any)=>{
        console.log('validate stream start');
        timer(5000).pipe(
          switchMap(() =>{
            console.log('timer start');
            if(count > 0){
              return this.http.get(environment.check_if_stream_is_active_on_mux)
                .subscribe(
                  (req: any)=>{
                    this.streamready = true;
                    return 0;
                  },
                  error => {
                    count = count -1;
                    console.log(count);

                  });
            }
          }));
      });
  } 

解决方案

timer

Creates an Observable that starts emitting after an dueTime and emits ever increasing numbers after each period of time thereafter.

timer(dueTime: number | Date = 0, periodOrScheduler?: number | SchedulerLike, scheduler?: SchedulerLike): Observable<number>

Its like interval, but you can specify when should the emissions start.

In your code, you are forgetting the second parameter of the timer operator, which means it will just wait for 5s and then emit only once. In addition, you are not subscribing to the timer, that's why it's not going any further. If you want to poll your server every 5s, you need to use :

timer(0, 5000)

Here, the timer won't wait and will directly start emitting values every 5s.


Next, I see that you're using a switchMap to create an inner stream, that's great. But If your server takes more than 5s to process your request, you might want to use mergeMap instead as it doesn't cancel the previous ongoing inner streams, it's up to you.

The error here is that the switchMap (or the mergeMap) takes a callback function which must return an Observable, the operator will then subscribe to it by itself. Here, you are returning a Subscription instead.

Here is what you could do :

const start$ = this.http.get(startUrl).pipe(
  tap(() => console.log('Stream start'))
);
const poll$ = this.http.get(pollUrl).pipe(
  tap(() => (this.streamReady = true)),
  catchError(error => {
    console.log(error);
    return EMPTY;
  })
);

start$.pipe(
  switchMap(() => timer(0, 5000).pipe(
    tap(() => console.log('Polling every 5s')),
    mergeMap(() => poll$)
  ))
).subscribe();

I've created two Observables here. start$, which is responsible for starting the stream, and poll$, which is responsible for polling your server. The idea here is to start the stream, then switch to the timer inner stream which will emit every 5s, and then switch again to another inner stream that will poll the server.

I didn't include the count in my example, so the stream will run forever with this code. You should take a look at the takeUntil operator for that purpose.

Hope it helped, feel free to ask questions !

这篇关于尝试轮询服务器,但是由于算法中断,rxjs 方法可能不正确的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆