将 Observable 与反馈合并,将 Observable 与自身合并 [英] Merging Observables with feedback, merging Observable with Itself

查看:35
本文介绍了将 Observable 与反馈合并,将 Observable 与自身合并的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我需要创建 Observable,它将收集来自不同来源(其他 Observables)的信息,每个来源都会影响事件值,但是,该值仍然是基于先前的值(一种状态机)构建的.

I need to create Observable, that will collect information from different sources(other Observables), and each source has impact on the event value, but still, the value is built based on previous value(kind of state machine).

我们有带有 int 值和操作码的消息:

We have message with int value and operation code:

class Message{
    Integer value;
    String operation;

    public Message(Integer value, String operation) {
        this.value = value;
        this.operation = operation;
    }
}

以及一些此类值的来源,带有初始值:

And some source of such values, with init value:

Observable<Message> dynamicMessage = Observable.just(new Message(1, "+"));

现在有了事件源.这些源将根据动态消息的先前值和新动态消息值将出现的值发出值.实际上它的事件类型是:

Now there are sources of event. These sources will emit values based on which and based on previos value of dynamicMessage the new dynamicMessage value will appear. Actually its events types are:

class Changer1 {
    String operation;

    public Changer1(String operation) {
        this.operation = operation;
    }
}


class Changer2 {
    Integer value;

    public Changer2(Integer value) {
        this.value = value;
    }
}

Changer1 负责更改操作.Changer2 负责改变价值.

Changer1 responcible for changing of operation. Changer2 responcible for value of change.

以及这些值的来源:

static Observable<Changer1> changers1 = Observable.just(new Changer1("-"))
        .delay(1, TimeUnit.SECONDS).concatWith(Observable.just(new Changer1("+"))
                .delay(2, TimeUnit.SECONDS));


static Observable<Changer2> changers2 = Observable.just(new Changer2(2))
        .delay(2, TimeUnit.SECONDS).concatWith(Observable.just(new Changer2(2))
                .delay(2, TimeUnit.SECONDS));

现在我需要更改 dynamicMessage Observable 并考虑来自更改者的消息.这是一个图表:

Now I need change dynamicMessage Observable and take in respect messages from changers. here is a diagram:

我也尝试编写一个程序,我如何看待解决方案,但它挂起 OutOfMemoryException,在第一个值之后:
消息{值=1,操作='+'}
清单:

Also I try to wrote a program, how I see the solution, but it hangs with OutOfMemoryException, rigth after first value:
Message{value=1, operation='+'}
Listing:

import rx.Observable;
import rx.functions.Action1;
import rx.functions.Func1;

public class RxDilemma {


static class Message{
    Integer value;
    String operation;

    public Message(Integer value, String operation) {
        this.value = value;
        this.operation = operation;
    }

    @Override
    public String toString() {
        return "Message{" +
                "value=" + value +
                ", operation='" + operation + '\'' +
                '}';
    }
}

static class Changer1 {
    String operation;

    public Changer1(String operation) {
        this.operation = operation;
    }

    @Override
    public String toString() {
        return "Changer1{" +
                "operation='" + operation + '\'' +
                '}';
    }
}


static class Changer2 {
    Integer value;

    public Changer2(Integer value) {
        this.value = value;
    }

    @Override
    public String toString() {
        return "Changer2{" +
                "value=" + value +
                '}';
    }
}





static Observable<Changer1> changers1 = Observable.just(new Changer1("-"))
        .delay(1, TimeUnit.SECONDS).concatWith(Observable.just(new Changer1("+"))
                .delay(2, TimeUnit.SECONDS));


static Observable<Changer2> changers2 = Observable.just(new Changer2(2))
        .delay(2, TimeUnit.SECONDS).concatWith(Observable.just(new Changer2(2))
                .delay(2, TimeUnit.SECONDS));


static Observable<Message> dynamicMessage = Observable.just(new Message(1, "+")).mergeWith(changers1.flatMap(new Func1<Changer1, Observable<Message>>() {
    @Override
    public Observable<Message> call(final Changer1 changer) {
        return dynamicMessage.last().map(new Func1<Message, Message>() {
            @Override
            public Message call(Message message) {
                message.operation = changer.operation;
                return message;
            }
        });
    }
})).mergeWith(changers2.flatMap(new Func1<Changer2, Observable<Message>>() {
    @Override
    public Observable<Message> call(final Changer2 changer2) {
        return dynamicMessage.last().map(new Func1<Message, Message>() {
            @Override
            public Message call(Message message) {
                if("+".equalsIgnoreCase(message.operation)){
                    message.value = message.value+changer2.value;
                } else if("-".equalsIgnoreCase(message.operation)){
                    message.value = message.value-changer2.value;
                }
                return message;
            }
        });
    }
}));

public static void main(String[] args) throws InterruptedException {

    dynamicMessage.subscribe(new Action1<Message>() {
        @Override
        public void call(Message message) {
            System.out.println(message);
        }
    });


    Thread.sleep(5000000);
}
}

所以我期望的输出是:

Message{value=1, operation='+'}
消息{值=1,操作='-'}
消息{值=-1,操作='-'}
消息{值=1,操作='+'}
消息{value=2, operation='+'}

Message{value=1, operation='+'}
Message{value=1, operation='-'}
Message{value=-1, operation='-'}
Message{value=1, operation='+'}
Message{value=2, operation='+'}

我还想将所有内容与CombineLatest 合并,但后来我发现CombineLatest 没有提供更改了哪一个元素,如果没有这些信息,我将不知道需要对消息进行哪些更改.有什么帮助吗?

Also I thougth about merging all with CombineLatest, but then I found that CombineLatest does not provide which one element was changed, and without this information I will not know which changes in message needs to be done. Any help?

推荐答案

换句话说,你想减少 扫描您的行为以反映状态的变化.有一个运算符,正确命名为 reduce scan,它完全符合您的要求.它接受每个发生的事件,并让您通过添加该转换的先前结果来转换它,这将是您的状态.

In other words, you want to reduce scan your actions to reflect changes in a state. There is an operator, rightly named reduce scan, that does exactly what you are asking for. It takes every event that comes and lets you transform it with the addition of previous result of that transformation, which will be your state.

interface Changer {
    State apply(State state);
}

class ValueChanger implements Changer {
    ...
    State apply(State state){
        // implement your logic of changing state by this action and return a new one
    }
    ...
}


class OperationChanger implements Changer {
    ...
    State apply(State state){
        // implement your logic of changing state by this action and return a new one
    }
    ...
}

Observable<ValueChanger> valueChangers
Observable<OperationChanger> operationChangers

Observable.merge(valueChangers, operationChangers)
    .scan(initialState, (state, changer) -> changer.apply(state))
    .subscribe(state -> {
        // called every time a change happens
    });

请注意,我稍微更改了您的命名以使其更有意义.另外,我在这里使用 Java8 lambda 表达式.如果您不能在项目中使用它们,只需将它们换成匿名类即可.

Note that I changed your naming a little bit to make more sense. Also, I use Java8 lambda expressions here. Just swap them out for anonymous classes if you can't use them in your project.

使用 scan 操作符而不是 reduce,因为它会一路传递每个结果,而不是只发出最终结果

use scan operator instead of reduce as it delivers every result along the way, rather than emitting only the final result

这篇关于将 Observable 与反馈合并,将 Observable 与自身合并的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆