Android Retrofit 2 + RxJava:聆听无尽的视频流 [英] Android Retrofit 2 + RxJava: listen to endless stream

查看:84
本文介绍了Android Retrofit 2 + RxJava:聆听无尽的视频流的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我可以使用Retrofit + RxJava收听无休止的流吗?例如Twitter流.我有这个:

Can I use Retrofit + RxJava to listen to an endless stream? For instance the Twitter stream. What I have is this:

public interface MeetupAPI {
    @GET("http://stream.meetup.com/2/rsvps/")
    Observable<RSVP> getRSVPs();
}

MeetupAPI api = new Retrofit.Builder()
            .baseUrl(MeetupAPI.RSVP_API)
            .addCallAdapterFactory(RxJavaCallAdapterFactory.create())
            .addConverterFactory(GsonConverterFactory.create())
            .build()
            .create(MeetupAPI.class);

api.getRSVPs()
        .subscribeOn(Schedulers.newThread())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(rsvp -> Log.d(TAG, "got rsvp"),
                error -> Log.d(TAG, "error: " + error),
                () -> Log.d(TAG, "onComplete"));

,但是在解析第一个对象之后将调用"onComplete".有没有办法告诉Retrofit保持开放直到另行通知?

but the "onComplete" is invoked after the first object has been parsed. Is there a way to tell Retrofit to stay open until further notice?

推荐答案

这是我的解决方案:

您可以使用@Streaming批注:

You can use the @Streaming annotation:

public interface ITwitterAPI {

    @GET("/2/rsvps")
    @Streaming
    Observable<ResponseBody> twitterStream();
}

ITwitterAPI api = new Retrofit.Builder()
          .baseUrl("http://stream.meetup.com")
          .addCallAdapterFactory(RxJavaCallAdapterFactory.create())
          .build().create(ITwitterAPI.class);

使用 @Streaming ,我们可以从 ResponseBody 获取原始输入.

With @Streaming we can get raw input From ResponseBody.

这是我的函数,用于按事件将行包裹起来:

Here my function to wrap body divided by lines with events:

public static Observable<String> events(BufferedSource source) {
    return Observable.create(new Observable.OnSubscribe<String>() {
        @Override
        public void call(Subscriber<? super String> subscriber) {
            try {
                while (!source.exhausted()) {
                    subscriber.onNext(source.readUtf8Line());
                }
                subscriber.onCompleted();
            } catch (IOException e) {
                e.printStackTrace();
                subscriber.onError(e);
            }
        }
    });
}

结果用法:

api.twitterStream()
  .flatMap(responseBody -> events(responseBody.source()))
  .subscribe(System.out::println);

更新有关正常停止的信息

当我们退订时,改造会关闭输入流.但是无法检测输入流本身是否已关闭或是否从输入流本身中检测出来,所以唯一的方法-尝试从流中读取-我们会收到 Socketclosed 消息出现异常.我们可以将此异常解释为关闭:

When we unsubscribing, retrofit closes inputstream. But impossible to detect inputstream closed or not from inputstream themselves, so only way - try reading from stream - we gets exception with Socket closed message. We can interpret this exception as closing:

        @Override
        public void call(Subscriber<? super String> subscriber) {
            boolean isCompleted = false;
            try {
                while (!source.exhausted()) {
                    subscriber.onNext(source.readUtf8Line());
                }
            } catch (IOException e) {
                if (e.getMessage().equals("Socket closed")) {
                    isCompleted = true;
                    subscriber.onCompleted();
                } else {
                    throw new UncheckedIOException(e);
                }
            }
            //if response end we get here
            if (!isCompleted) {
                subscriber.onCompleted();
            }
        }

如果由于响应结束而关闭连接,则没有任何异常.在这里 isCompleted 进行检查.让我知道我是否错了:)

And if connection closed because response end, we haven't any exceptions. Here isCompleted check for that. Let me know if i am wrong :)

这篇关于Android Retrofit 2 + RxJava:聆听无尽的视频流的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆