同步阻止对基于可观察的自定义ReplaySubject的订阅调用 [英] Synchronous blocking subscribe call to a custom ReplaySubject based observable

查看:78
本文介绍了同步阻止对基于可观察的自定义ReplaySubject的订阅调用的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

该视图包含以下元素:

<div *ngIf="showMe">Hello</div>

调用组件方法时:

downloadDemo(): void {
  this.download$ = this.downloadService.downloadUrlAsBlobWithProgressAndSaveInFile('assets/skypeforlinux-64.deb', 'demo')
  this.download$.subscribe((download: Download) => {
    this.showMe = true;
    console.log('Progress: ' + download.progress);
  })
}

该元素在视图中显示在所有 Progress 记录器之前.这就是应该的方式.这种基于HTTP的下载效果很好.

the element shows in the view before all the Progress loggers. And that is how it should be. This HTTP based downloading works just fine.

但是,调用组件方法时:

However when calling the component method:

downloadSoundtrack(soundtrack: Soundtrack): void {
  const fileName: string = soundtrack.name + '.' + MIDI_FILE_SUFFIX;
  const progress$: Observable<ProgressTask<Uint8Array>> = this.midiService.progressiveCreateSoundtrackMidi(soundtrack);
  this.download$ = this.downloadService.downloadObservableDataAsBlobWithProgressAndSaveInFile(progress$, fileName);
  this.download$.subscribe((download: Download) => {
    this.showMe = true;
    console.log('Progress: ' + download.progress);
  })
}

该元素显示在所有 Progress 记录器之后的视图中.这不是应该的.此基于自定义 ReplaySubject 的可观察对象无法正常工作.实际上,该元素应该在所有 Progress 记录器之前而不是之后显示.

the element shows in the view last after all the Progress loggers. It is not how it should be. This custom ReplaySubject based observable is not working as expected. Indeed, the element should show before and not after all the Progress loggers.

我想看看是否有一个订阅呼叫被阻止.

I wanted to see if one subscribe call was blocking.

因此,我将两种方法更改为:

So I changed the two methods to:

downloadSoundtrack(soundtrack: Soundtrack): void {
  const fileName: string = soundtrack.name + '.' + MIDI_FILE_SUFFIX;
  const progress$: Observable<ProgressTask<Uint8Array>> = this.midiService.progressiveCreateSoundtrackMidi(soundtrack);
  this.download$ = this.downloadService.downloadObservableDataAsBlobWithProgressAndSaveInFile(progress$, fileName);
  this.showMe = true;
  this.download$.subscribe((download: Download) => {
    console.log('Progress: ' + download.progress);
  });
  console.log('Call done');
}

downloadDemo(): void {
  this.download$ = this.downloadService.downloadUrlAsBlobWithProgressAndSaveInFile('assets/skypeforlinux-64.deb', 'demo')
  this.showMe = true;
  this.download$.subscribe((download: Download) => {
    console.log('Progress: ' + download.progress);
  });
  console.log('Call done');
}

以下是调用 downloadDemo()方法的记录器:

Here are the loggers when calling the downloadDemo() method:

Progress: 0
Call done
Progress: 0
Progress: 0
Progress: 2
Progress: 3

我们可以看到 subscribe()调用是非阻塞的.

We can see the subscribe() call is non blocking.

以下是调用 downloadSoundtrack()方法的记录器:

Here are the loggers when calling the downloadSoundtrack() method:

Progress: 96
Progress: 97
Progress: 100
Call done

我们可以看到 subscribe()调用被阻止.

We can see the subscribe() call is blocking.

添加一个明确的 this.detectChanges(); 调用没有区别:

Adding an explicit this.detectChanges(); call made no difference:

downloadSoundtrack(soundtrack: Soundtrack): void {
  const fileName: string = soundtrack.name + '.' + MIDI_FILE_SUFFIX;
  const progress$: Observable<ProgressTask<Uint8Array>> = this.midiService.progressiveCreateSoundtrackMidi(soundtrack);
  this.download$ = this.downloadService.downloadObservableDataAsBlobWithProgressAndSaveInFile(progress$, fileName);
  this.download$.subscribe((download: Download) => {
    this.showMe = true;
    this.detectChanges();
    console.log('Progress: ' + download.progress);
  })
}

在所有 Progress 记录器之后仍然显示.

It still showed after all the Progress loggers.

我还尝试了一些显式订阅来代替 * ngIf =" download $ |模板中的下载时异步" >,但是它没有任何帮助:

I have also tried some explicit subscription in place of the *ngIf="download$ | async as download" in the template, but it did not help any:

downloadInProgress(soundtrack: Soundtrack): boolean {
  let inProgress: boolean = false;
  if (soundtrack.download) {
    if (soundtrack.download.progress > 0 && soundtrack.download.progress < 100) {
      inProgress = true;
    } else if (soundtrack.download.progress == 100) {
      console.log('complete');
      soundtrack.download = undefined;
    }
  }
  console.log('inProgress ' + inProgress);
  return inProgress;
}

长期运行的服务:

public progressiveCreateSoundtrackMidi(soundtrack: Soundtrack): Observable<ProgressTask<Uint8Array>> {
  return Observable.create((progressTaskBis$: ReplaySubject<ProgressTask<Uint8Array>>) => {
    this.createSoundtrackMidi(soundtrack, progressTaskBis$);
    progressTaskBis$.complete();
    return { unsubscribe() { } };
  });
}

public createSoundtrackMidi(soundtrack: Soundtrack, progressTask$?: ReplaySubject<ProgressTask<Uint8Array>>): Uint8Array {
  const midi: Midi = new Midi();
  midi.name = soundtrack.name;
  midi.header.name = soundtrack.name;
  let noteIndex: number = 0;
  if (soundtrack.hasTracks()) {
    soundtrack.tracks.forEach((track: Track) => {
      const midiTrack: any = midi.addTrack();
      midiTrack.name = track.name;
      midiTrack.channel = track.channel;
      if (track.hasMeasures()) {
        let totalDurationInSeconds: number = 0;
        for (const measure of track.getSortedMeasures()) {
          if (measure.placedChords) {
            if (!this.notationService.isOnlyEndOfTrackChords(measure.placedChords)) {
              for (const placedChord of measure.placedChords) {
                if (!this.notationService.isEndOfTrackPlacedChord(placedChord)) {
                  const duration: string = placedChord.renderDuration();
                  const durationInSeconds: number = Tone.Time(duration).toSeconds();
                  const velocity: number = placedChord.velocity;
                  // const tempoInMicroSecondsPerBeat: number = this.beatsToMicroSeconds(1, measure.getTempo());
                  // const ticks: number = this.beatsToTicks(durationInBeats, DEFAULT_MIDI_PPQ, tempoInMicroSecondsPerBeat);
                  for (const note of placedChord.notes) {
                    if (!this.notationService.isEndOfTrackNote(note)) {
                      if (progressTask$) {
                        this.commonService.sleep(50);
                        progressTask$.next(this.downloadService.createProgressTask<Uint8Array>(soundtrack.getNbNotes(), noteIndex));
                      }
                      noteIndex++;
                      midiTrack.addNote({
                        midi: this.synthService.textToMidiNote(note.renderAbc()),
                        time: totalDurationInSeconds,
                        // ticks: ticks,
                        name: note.renderAbc(),
                        pitch: note.renderChroma(),
                        octave: note.renderOctave(),
                        velocity: velocity,
                        duration: durationInSeconds
                      });
                    }
                  }
                totalDurationInSeconds += durationInSeconds;
                }
              }
            }
          }
        }
      }
    });
  }
  if (progressTask$) {
    progressTask$.next(this.downloadService.createProgressTask<Uint8Array>(soundtrack.getNbNotes(), soundtrack.getNbNotes(), midi.toArray()));
  }
  return midi.toArray();
}

有50ms的睡眠调用,这会减慢文件创建的速度,从而留出足够的时间.

There is a sleep call of 50ms slowing down the file creation, so as to give some ample time.

下载服务的实现基于本文

我使用的是Angular 9.1.0

I'm on Angular 9.1.0

推荐答案

为简单起见,我建议改用更具反应性的状态驱动方法,您可以执行以下操作:

To keep things simple I would suggest to move to a more reactive state-driven approach where you could do something similar to the following:

1.

原声带从数组更改为公共只读原声带:Observable< Soundtrack []>= this.soundtracksSubject.asObservable()使您的UI可以注册更改.然后, this.soundtracksSubject 私有只读音轨.Subject:BehaviorSubject< Soundtrack []>= new BehaviorSubject([]); ,然后可用于触发音轨的观察者进行刷新.当您收到HTTP响应时,您无需设置 this.soundtracks = soundtracks; ,而是调用 this.soundtracksSubject.next(soundtracks)).

Change soundtracks from an array to a public readonly soundtracks: Observable<Soundtrack[]> = this.soundtracksSubject.asObservable() to enable your UI to register for changes. this.soundtracksSubject is then private readonly soundtracksSubject: BehaviorSubject<Soundtrack[]> = new BehaviorSubject([]); that then can be used to trigger observers of soundtracks to refresh. When you receive the HTTP response you then do not set this.soundtracks = soundtracks; but rather call this.soundtracksSubject.next(soundtracks)).

在进行实际下载( soundtrack.download = download; )时,不仅要在更改后仅更改模型,还必须再次调用主题以将模型中的更改传播给侦听器:

Also when doing the actual download (soundtrack.download = download;) instead of only changing your model after the change you have to call the subject again to propagate changes in your model to listeners:

const updatedSoundtracks: Soundtrack[] = this.soundtracksSubject.value.map(existingSoundtrack => existingSoundtrack);
const soundtrack: Soundtrack = updatedSoundtracks.find(existingSoundtrack => soundtrack.name === existingSoundtrack.name); // Or whatever identifies your soundtrack

if (soundtrack) {
    soundtrack.download = download;
    this.soundtracksSubject.next(updatedSoundtracks);
}

将您的用户界面从< tr * ngFor =让原声带原声" 更改为< tr * ngFor ="让原声带原声|async> 来解析 Observable 以便在Angular组件中使用它.这也意味着您的组件将注册到 soundtracks 上的更改,并在有人呼叫主题/可观察对象时得到通知.

Change your UI from <tr *ngFor="let soundtrack of soundtracks"> to <tr *ngFor="let soundtrack of soundtracks | async"> to resolve the Observable to use it in your Angular component. This also means your component will register to changes on soundtracks and will get notified once somebody calls the subject/the observable.

Observable BehaviorSubject 都是RxJS概念( import {BehaviorSubject,Observable}从"rxjs/index"; 导入),并且值得进行研究,因为它们使您的生活变得如此轻松.

Observable and BehaviorSubject are both RxJS concepts (import {BehaviorSubject, Observable} from "rxjs/index";) and are worthwhile to research because they make your life so much easier.

这篇关于同步阻止对基于可观察的自定义ReplaySubject的订阅调用的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆