RxAndroid 下载多个文件,最多 3 个并发线程 [英] RxAndroid download multiple files, max 3 concurrent thread
问题描述
我有 api 可以从服务器下载单个 mp3 文件,使用 RxJava 如下所示.
I have api to download single mp3 file from server,which is consumed using RxJava as bellow.
Observable<ResponseBody> observable = audioService.getFile(fileNameWithExtension);
observable.subscribeOn(Schedulers.newThread())
.observeOn(Schedulers.newThread())
.subscribe(someCallBackClass<ResponseBody>);
这只是下载单个文件,回调将文件保存在磁盘上.我想下载文件列表,将每个文件保存在磁盘上并等待所有下载完成,最多应并行执行 3 个调用.如何使用 RXAndroid 进行操作,我尝试过 flatmap 但我无法完全理解它.
This just downloads single file , callback saves the file on disk. I want to download list of files save each file on disk and wait till all download completes , At max 3 calls should be executing in parallel. How to do it with RXAndroid , I tried flatmap but i am not able to understand it fully.
List<Observable<Response<ResponseBody>>> audioFiles = new ArrayList<>();
for (String fileNameWithExtension : fileNamesWithExtension) {
Observable<Response<ResponseBody>> observable = restFactory.getAudioService().getFile(fileNameWithExtension);
audioFiles.add(observable);
}
Observable.from(audioFiles).flatMap(audioFile -> Observable.fromCallable(() -> {
audioFile.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.toBlocking()
.subscribe(new CallBackWithErrorHandling<>(Downloader.this));
return 0;
}).subscribeOn(Schedulers.io()), MAX_CONCURRENT)
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<Integer>() {
@Override
public void onCompleted() {
goToMainActivity();
}
@Override
public void onError(Throwable e) {
Log.e(TAG, "Something went wrong , " + Thread.currentThread().getName());
Log.e(TAG, "Something went wrong , " + e.toString());
showToast(R.string.something_went_wrong);
goToMainActivity();
}
@Override
public void onNext(Integer integer) {
}
});
这工作正常,但当网络出现故障或互联网连接速度缓慢时
this is working fine but when network is down or slow internet connection i am getting
java.lang.RuntimeException: Can't create handler inside thread that has not called Looper.prepare()
我无法理解哪个线程需要observeOn() android 主线程.
I am unable to understand which thread exactly need to observeOn() android main thread.
推荐答案
您可以使用 flatMap 来实现这一点,限制其并发性,但还需要一个在后台调度程序上运行的内部 Observable 来执行文件传输:
You can achieve this with flatMap, limiting its concurrency, but also need an inner Observable running on a background scheduler that does the file transfer:
fileNames
.flatMap(name -> {
return Observable.fromCallable(() -> {
// put your blocking download code here, save the data
return name; // return what you need down below
})
.subscribeOn(Schedulers.io());
}, 3)
.observeOn(AndroidSchedulers.mainThread())
.subscribe(completedFile -> { }, error -> { },
() -> { /* all completed.*/ });
由于您使用 Observable API 进行网络下载,因此无需阻止:
Since you are using an Observable API for the network download, you don't need to block:
Observable.from(audioFiles)
.flatMap(audioFile ->
audioFile.subscribeOn(Schedulers.io()), // <-- apply extra transforms here
MAX_CONCURRENT)
.observeOn(AndroidSchedulers.mainThread())
.subscribe(completedFile -> { }, error -> { },
() -> { /* all completed.*/ })
目前还不清楚你用 CallBackWithErrorHandling
做什么.
It is unclear though what you do with CallBackWithErrorHandling
.
这篇关于RxAndroid 下载多个文件,最多 3 个并发线程的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!