RxJava + Websocket - 如何将 Observable 添加到 Websocket 侦听器? [英] RxJava + Websocket - How to add Observable to Websocket listener?

查看:174
本文介绍了RxJava + Websocket - 如何将 Observable 添加到 Websocket 侦听器?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个 ViewModel 正在观察我的 MainRepo 类中的 RxJava Observable.我试图让 MainRepo 类中的 WebsocketListener 发出事件,但我不确定如何这样做.

I have a ViewModel that is observing a RxJava Observable in my MainRepo class. I am trying to get my WebsocketListener in the MainRepo class to emit events, but I'm unsure how to do so.

MainRepo 类:

private WebSocket ws;

public void createWsConnection() {
        Request request = new Request.Builder()
                .url(Constants.WEBSOCKET_ENDPOINT)
                .addHeader(Constants.WEBSOCKET_HEADERS_KEY, Constants.USER_ID)
                .build();

        OkHttpClient client = new OkHttpClient
                .Builder()
                .pingInterval(30, TimeUnit.SECONDS)
                .build();

        this.ws = client.newWebSocket(request, webSocketListener);
    }

这就是我困惑的地方.我不知道如何将 websocket 与 RxJava observable 一起使用.

This is where I'm confused. I don't know how I would use the websocket with the RxJava observable.

public Observable<String> createListener(){
        return Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(@NonNull ObservableEmitter<String> emitter) {
                 //I don't know what to put here in order to emit messages
                 //back to my ViewModel class using the websocket listener
            }
        });
    }

websocket 监听器:

 private WebSocketListener webSocketListener = new WebSocketListener() {

        @Override
        public void onOpen(@NotNull WebSocket webSocket, Response response) {
            Timber.d("Ws connection opened...", response.toString());
        }

        @Override
        public void onClosing(@NotNull WebSocket webSocket, int code, @NotNull String reason) {
            Timber.d("Ws connection closing...");
        }

        @Override
        public void onClosed(@NotNull WebSocket webSocket, int code, @NotNull String reason) {
            Timber.d("Ws connection closed...");
        }

        @Override
        public void onMessage(@NotNull WebSocket webSocket, @NotNull String text) {
            Timber.d("Ws incoming message.");

        }

        @Override
        public void onFailure(@NotNull WebSocket webSocket, @NotNull Throwable t, Response response) {
            Timber.e(t, "Ws connection failure.", response.toString());

        }
    };

ViewModel 类中的一个函数,它正在观察我的 MainRepo 类中的 Observable:

public void connectToWs(){
        mainRepo.createListener()
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Observer<String>() {
            @Override
            public void onSubscribe(@NonNull Disposable d) {
                Timber.d("Subscribed");
            }

            @Override
            public void onNext(@NonNull String s) {
                Timber.d("Message: " + s);
            }

            @Override
            public void onError(@NonNull Throwable e) {
                Timber.e(e, "Something went wrong.");
            }

            @Override
            public void onComplete() {
                Timber.d("On complete.");
            }
        });
    }

推荐答案

创建一个 PublishSubject 并更改您的 createListener 方法以返回它:

Create a PublishSubject and change your createListener method to return it:

private PublishSubject<String> publishSubject = PublishSubject.create<String>();

public Observable<String> createListener(){
    return publishSubject;
}

PublishSubject 是一个 Observable,所以请注意,您不需要更改方法签名,但我建议您将方法名称重命名为 observeMessages 之类的名称.

PublishSubject is an Observable so notice that you don't need to change your method signature, but I'd suggest you to rename the method name to something like observeMessages.

然后在您的 websocket 侦听器中,您可以使用 onNext 方法.您还应该调用 onComplete 在 onClosed 方法和 onError 在 onFailure 方法中:

Then in your websocket listener you can emit the messages to the PublishSubject with onNext method. You should also call onComplete in the onClosed method and onError in the onFailure method:

 private WebSocketListener webSocketListener = new WebSocketListener() {

        @Override
        public void onOpen(@NotNull WebSocket webSocket, Response response) {
            Timber.d("Ws connection opened...", response.toString());
        }

        @Override
        public void onClosing(@NotNull WebSocket webSocket, int code, @NotNull String reason) {
            Timber.d("Ws connection closing...");
        }

        @Override
        public void onClosed(@NotNull WebSocket webSocket, int code, @NotNull String reason) {
            Timber.d("Ws connection closed...");

            publishSubject.onComplete();
        }

        @Override
        public void onMessage(@NotNull WebSocket webSocket, @NotNull String text) {
            Timber.d("Ws incoming message.");

            publishSubject.onNext(text);
        }

        @Override
        public void onFailure(@NotNull WebSocket webSocket, @NotNull Throwable t, Response response) {
            Timber.e(t, "Ws connection failure.", response.toString());

            publishSubject.onError(t);
        }
    };

这篇关于RxJava + Websocket - 如何将 Observable 添加到 Websocket 侦听器?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆