同步阻止对基于可观察的自定义ReplaySubject的订阅调用 [英] Synchronous blocking subscribe call to a custom ReplaySubject based observable
问题描述
该视图包含以下元素:
<div *ngIf="showMe">Hello</div>
调用组件方法时:
downloadDemo(): void {
this.download$ = this.downloadService.downloadUrlAsBlobWithProgressAndSaveInFile('assets/skypeforlinux-64.deb', 'demo')
this.download$.subscribe((download: Download) => {
this.showMe = true;
console.log('Progress: ' + download.progress);
})
}
该元素在视图中显示在所有 Progress
记录器之前.这就是应该的方式.这种基于HTTP的下载效果很好.
the element shows in the view before all the Progress
loggers. And that is how it should be. This HTTP based downloading works just fine.
但是,调用组件方法时:
However when calling the component method:
downloadSoundtrack(soundtrack: Soundtrack): void {
const fileName: string = soundtrack.name + '.' + MIDI_FILE_SUFFIX;
const progress$: Observable<ProgressTask<Uint8Array>> = this.midiService.progressiveCreateSoundtrackMidi(soundtrack);
this.download$ = this.downloadService.downloadObservableDataAsBlobWithProgressAndSaveInFile(progress$, fileName);
this.download$.subscribe((download: Download) => {
this.showMe = true;
console.log('Progress: ' + download.progress);
})
}
该元素显示在所有 Progress
记录器之后的视图中.这不是应该的.此基于自定义 ReplaySubject
的可观察对象无法正常工作.实际上,该元素应该在所有 Progress
记录器之前而不是之后显示.
the element shows in the view last after all the Progress
loggers. It is not how it should be. This custom ReplaySubject
based observable is not working as expected. Indeed, the element should show before and not after all the Progress
loggers.
我想看看是否有一个订阅呼叫被阻止.
I wanted to see if one subscribe call was blocking.
因此,我将两种方法更改为:
So I changed the two methods to:
downloadSoundtrack(soundtrack: Soundtrack): void {
const fileName: string = soundtrack.name + '.' + MIDI_FILE_SUFFIX;
const progress$: Observable<ProgressTask<Uint8Array>> = this.midiService.progressiveCreateSoundtrackMidi(soundtrack);
this.download$ = this.downloadService.downloadObservableDataAsBlobWithProgressAndSaveInFile(progress$, fileName);
this.showMe = true;
this.download$.subscribe((download: Download) => {
console.log('Progress: ' + download.progress);
});
console.log('Call done');
}
downloadDemo(): void {
this.download$ = this.downloadService.downloadUrlAsBlobWithProgressAndSaveInFile('assets/skypeforlinux-64.deb', 'demo')
this.showMe = true;
this.download$.subscribe((download: Download) => {
console.log('Progress: ' + download.progress);
});
console.log('Call done');
}
以下是调用 downloadDemo()
方法的记录器:
Here are the loggers when calling the downloadDemo()
method:
Progress: 0
Call done
Progress: 0
Progress: 0
Progress: 2
Progress: 3
我们可以看到 subscribe()
调用是非阻塞的.
We can see the subscribe()
call is non blocking.
以下是调用 downloadSoundtrack()
方法的记录器:
Here are the loggers when calling the downloadSoundtrack()
method:
Progress: 96
Progress: 97
Progress: 100
Call done
我们可以看到 subscribe()
调用被阻止.
We can see the subscribe()
call is blocking.
添加一个明确的 this.detectChanges();
调用没有区别:
Adding an explicit this.detectChanges();
call made no difference:
downloadSoundtrack(soundtrack: Soundtrack): void {
const fileName: string = soundtrack.name + '.' + MIDI_FILE_SUFFIX;
const progress$: Observable<ProgressTask<Uint8Array>> = this.midiService.progressiveCreateSoundtrackMidi(soundtrack);
this.download$ = this.downloadService.downloadObservableDataAsBlobWithProgressAndSaveInFile(progress$, fileName);
this.download$.subscribe((download: Download) => {
this.showMe = true;
this.detectChanges();
console.log('Progress: ' + download.progress);
})
}
在所有 Progress
记录器之后仍然显示.
It still showed after all the Progress
loggers.
我还尝试了一些显式订阅来代替 * ngIf =" download $ |模板中的下载时异步" >>,但是它没有任何帮助:
I have also tried some explicit subscription in place of the *ngIf="download$ | async as download"
in the template, but it did not help any:
downloadInProgress(soundtrack: Soundtrack): boolean {
let inProgress: boolean = false;
if (soundtrack.download) {
if (soundtrack.download.progress > 0 && soundtrack.download.progress < 100) {
inProgress = true;
} else if (soundtrack.download.progress == 100) {
console.log('complete');
soundtrack.download = undefined;
}
}
console.log('inProgress ' + inProgress);
return inProgress;
}
长期运行的服务:
public progressiveCreateSoundtrackMidi(soundtrack: Soundtrack): Observable<ProgressTask<Uint8Array>> {
return Observable.create((progressTaskBis$: ReplaySubject<ProgressTask<Uint8Array>>) => {
this.createSoundtrackMidi(soundtrack, progressTaskBis$);
progressTaskBis$.complete();
return { unsubscribe() { } };
});
}
public createSoundtrackMidi(soundtrack: Soundtrack, progressTask$?: ReplaySubject<ProgressTask<Uint8Array>>): Uint8Array {
const midi: Midi = new Midi();
midi.name = soundtrack.name;
midi.header.name = soundtrack.name;
let noteIndex: number = 0;
if (soundtrack.hasTracks()) {
soundtrack.tracks.forEach((track: Track) => {
const midiTrack: any = midi.addTrack();
midiTrack.name = track.name;
midiTrack.channel = track.channel;
if (track.hasMeasures()) {
let totalDurationInSeconds: number = 0;
for (const measure of track.getSortedMeasures()) {
if (measure.placedChords) {
if (!this.notationService.isOnlyEndOfTrackChords(measure.placedChords)) {
for (const placedChord of measure.placedChords) {
if (!this.notationService.isEndOfTrackPlacedChord(placedChord)) {
const duration: string = placedChord.renderDuration();
const durationInSeconds: number = Tone.Time(duration).toSeconds();
const velocity: number = placedChord.velocity;
// const tempoInMicroSecondsPerBeat: number = this.beatsToMicroSeconds(1, measure.getTempo());
// const ticks: number = this.beatsToTicks(durationInBeats, DEFAULT_MIDI_PPQ, tempoInMicroSecondsPerBeat);
for (const note of placedChord.notes) {
if (!this.notationService.isEndOfTrackNote(note)) {
if (progressTask$) {
this.commonService.sleep(50);
progressTask$.next(this.downloadService.createProgressTask<Uint8Array>(soundtrack.getNbNotes(), noteIndex));
}
noteIndex++;
midiTrack.addNote({
midi: this.synthService.textToMidiNote(note.renderAbc()),
time: totalDurationInSeconds,
// ticks: ticks,
name: note.renderAbc(),
pitch: note.renderChroma(),
octave: note.renderOctave(),
velocity: velocity,
duration: durationInSeconds
});
}
}
totalDurationInSeconds += durationInSeconds;
}
}
}
}
}
}
});
}
if (progressTask$) {
progressTask$.next(this.downloadService.createProgressTask<Uint8Array>(soundtrack.getNbNotes(), soundtrack.getNbNotes(), midi.toArray()));
}
return midi.toArray();
}
有50ms的睡眠调用,这会减慢文件创建的速度,从而留出足够的时间.
There is a sleep call of 50ms slowing down the file creation, so as to give some ample time.
下载服务的实现基于本文
我使用的是Angular 9.1.0
I'm on Angular 9.1.0
推荐答案
为简单起见,我建议改用更具反应性的状态驱动方法,您可以执行以下操作:
To keep things simple I would suggest to move to a more reactive state-driven approach where you could do something similar to the following:
1.
将原声带
从数组更改为公共只读原声带:Observable< Soundtrack []>= this.soundtracksSubject.asObservable()
使您的UI可以注册更改.然后, this.soundtracksSubject
是私有只读音轨.Subject:BehaviorSubject< Soundtrack []>= new BehaviorSubject([]);
,然后可用于触发音轨
的观察者进行刷新.当您收到HTTP响应时,您无需设置 this.soundtracks = soundtracks;
,而是调用 this.soundtracksSubject.next(soundtracks))
.
Change soundtracks
from an array to a public readonly soundtracks: Observable<Soundtrack[]> = this.soundtracksSubject.asObservable()
to enable your UI to register for changes. this.soundtracksSubject
is then private readonly soundtracksSubject: BehaviorSubject<Soundtrack[]> = new BehaviorSubject([]);
that then can be used to trigger observers of soundtracks
to refresh. When you receive the HTTP response you then do not set this.soundtracks = soundtracks;
but rather call this.soundtracksSubject.next(soundtracks))
.
在进行实际下载( soundtrack.download = download;
)时,不仅要在更改后仅更改模型,还必须再次调用主题以将模型中的更改传播给侦听器:
Also when doing the actual download (soundtrack.download = download;
) instead of only changing your model after the change you have to call the subject again to propagate changes in your model to listeners:
const updatedSoundtracks: Soundtrack[] = this.soundtracksSubject.value.map(existingSoundtrack => existingSoundtrack);
const soundtrack: Soundtrack = updatedSoundtracks.find(existingSoundtrack => soundtrack.name === existingSoundtrack.name); // Or whatever identifies your soundtrack
if (soundtrack) {
soundtrack.download = download;
this.soundtracksSubject.next(updatedSoundtracks);
}
将您的用户界面从< tr * ngFor =让原声带原声" 更改为
< tr * ngFor ="让原声带原声|async>
来解析 Observable
以便在Angular组件中使用它.这也意味着您的组件将注册到 soundtracks
上的更改,并在有人呼叫主题/可观察对象时得到通知.
Change your UI from <tr *ngFor="let soundtrack of soundtracks">
to <tr *ngFor="let soundtrack of soundtracks | async">
to resolve the Observable
to use it in your Angular component. This also means your component will register to changes on soundtracks
and will get notified once somebody calls the subject/the observable.
Observable
和 BehaviorSubject
都是RxJS概念( import {BehaviorSubject,Observable}从"rxjs/index";
导入),并且值得进行研究,因为它们使您的生活变得如此轻松.
Observable
and BehaviorSubject
are both RxJS concepts (import {BehaviorSubject, Observable} from "rxjs/index";
) and are worthwhile to research because they make your life so much easier.
这篇关于同步阻止对基于可观察的自定义ReplaySubject的订阅调用的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!