如何让这个rxjava zip并行运行? [英] How can I make this rxjava zip to run in parallel?

查看:123
本文介绍了如何让这个rxjava zip并行运行?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个睡眠方法来模拟一个长时间运行的过程。

  private void sleep(){
try {
Thread.sleep(2000);
} catch(InterruptedException e){
e.printStackTrace();
}
}

然后我有一个方法返回一个包含列表的Observable参数中给出的2个字符串。它在返回字符串之前调用sleep。

  private Observable< List< String>> getStrings(final String str1,final String str2){
return Observable.fromCallable(new Callable< List< String>>(){
@Override
public List< String> call() {
sleep();
List< String> strings = new ArrayList<>();
strings.add(str1);
strings.add(str2);
返回字符串;
}
});
}

然后我在Observalb.zip中调用getStrings三次,我希望那些三个并行运行的调用,因此执行的总时间应该在 2秒之内,或者最多3秒,因为睡眠只有2秒。但是,它总共需要秒。 如何让它并行运行以便在2秒内完成?

 可观察
.zip(getStrings(One,Two),getStrings(Three,Four),getStrings(Five,Six),mergeStringLists())
.subscribeOn( Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer< List< String>>(){
@Override
public void onCompleted(){

}

@Override
public void onError(Throwable e){

}

@Override
public void onNext(List< String> strings){
//显示字符串
}
});

mergeStringLists方法

  private Func3< List< String> ;, List< String> ;, List< String> ;, List< String>> mergeStringLists(){
返回新的Func3< List< String>,List< String> ;, List< String> ;, List< String>>(){
@Override
public List< String> ; call(List< String> strings,List< String> strings2,List< String> strings3){
Log.d(TAG,...);

for(String s:strings2){
strings.add(s);
}

for(String s:strings3){
strings.add(s);
}

返回字符串;
}
};
}


解决方案

发生这种情况是因为订阅了你的压缩 observable发生在相同的 io 线程中。



<为什么不尝试这样做:

  Observable 
.zip(
getStrings(一个,两个)
.subscribeOn(Schedulers.newThread()),
getStrings(三个,四个)
.subscribeOn(Schedulers.newThread()),
getStrings(Five,Six)
.subscribeOn(Schedulers.newThread()),
mergeStringLists())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer< List< String>>(){
@Override
public void onCompleted(){

}

@Override
public void onError(Throwable e){

}

@Override
public void onNext(List< String>字符串){
//显示字符串
}
});

如果有帮助请告诉我


I have a sleep method for simulating a long running process.

private void sleep() {
    try {
        Thread.sleep(2000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
}

Then I have a method returns an Observable containing a list of 2 strings that is given in the parameters. It calls the sleep before return the strings back.

private Observable<List<String>> getStrings(final String str1, final String str2) {
    return Observable.fromCallable(new Callable<List<String>>() {
        @Override
        public List<String> call() {
            sleep();
            List<String> strings = new ArrayList<>();
            strings.add(str1);
            strings.add(str2);
            return strings;
        }
    });
}

Then I am calling the getStrings three times in Observalb.zip, I expect those three calls to run in parallel, so the total time of execution should be within 2 seconds or maybe 3 seconds the most because the sleep was only 2 seconds. However, it's taking a total of six seconds. How can I make this to run in parallel so it will finish within 2 seconds?

Observable
.zip(getStrings("One", "Two"), getStrings("Three", "Four"), getStrings("Five", "Six"), mergeStringLists())
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<List<String>>() {
    @Override
    public void onCompleted() {

    }

    @Override
    public void onError(Throwable e) {

    }

    @Override
    public void onNext(List<String> strings) {
        //Display the strings
    }
});

The mergeStringLists method

private Func3<List<String>, List<String>, List<String>, List<String>> mergeStringLists() {
    return new Func3<List<String>, List<String>, List<String>, List<String>>() {
        @Override
        public List<String> call(List<String> strings, List<String> strings2, List<String> strings3) {
            Log.d(TAG, "...");

            for (String s : strings2) {
                strings.add(s);
            }

            for (String s : strings3) {
                strings.add(s);
            }

            return strings;
        }
    };
}

解决方案

That's happening because subscribing to your zipped observable happens in the the same, io thread.

Why don't you try this instead:

Observable
    .zip(
        getStrings("One", "Two")
            .subscribeOn(Schedulers.newThread()),
        getStrings("Three", "Four")
            .subscribeOn(Schedulers.newThread()),
        getStrings("Five", "Six")
            .subscribeOn(Schedulers.newThread()),
        mergeStringLists())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(new Observer<List<String>>() {
        @Override
        public void onCompleted() {

        }

        @Override
        public void onError(Throwable e) {

        }

        @Override
        public void onNext(List<String> strings) {
            //Display the strings
        }
    });

Let me know if that helped

这篇关于如何让这个rxjava zip并行运行?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆