基于内容的RxJava可观察缓冲区 [英] RxJava Observable buffer based on content

查看:118
本文介绍了基于内容的RxJava可观察缓冲区的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我使用vertX和RxJava启动了一个项目,但是有一个我找不到解决方案的问题.

I started a project using vertX and RxJava, and I have a problem for which I don't find a solution.

我有一个Observable,它发出用于传入通信的WebSocketFrame, 每个WebSocketFrame都由一个有效负载(一个ByteBuffer)和一些标志组成,这些标志指示消息是消息的第一帧还是最后一帧.

I have an Observable that emits WebSocketFrame for incoming communications, each WebSocketFrame is composed of a payload (a ByteBuffer) and flags that indicates wether it is the first frame of the message or the last.

我想对此Observable进行操作,以将其转换为一个Observable,该Observable发出包含每个消息的所有帧的ByteBufferd.

I want to make an operation on this Observable to transform it to an Observable that emits ByteBufferd that contain all the frame of a each message.

我尝试了buffer方法,但是它似乎旨在按照任意条件(时间或其他可观察的标准)对项目进行重新组合.

I tried the buffer method but it seems to be designed to regroup items by an arbitrary criteria (time or another observable).

另一种方法似乎使用compose订阅WebSocketFrame可观察的对象,在非结束帧上添加到缓冲区,并在结束帧上馈送" ByteBuffer可观察的对象.但是我不知道如何手动创建和填充缓冲区.

Another way seems to use compose to subscribe to the WebSocketFrame observable, to add to buffer on non-ending frame, and to "feed" the ByteBuffer Observable on ending frame. But I don't know how to create and feed a buffer manually.

因此,如果有人已经看到了这个问题(恕我直言,这很常见)并且对RxJava有足够的知识来提出实施方案,我将不胜感激.

So if someone has already seen this issue (which IMHO seems pretty common) and have enough knowledge of RxJava for proposing an implementation I would be most gratefull.

感谢您的阅读.

推荐答案

我想您必须使用此其他问题,该主题涵盖了大致相同的主题,并且

I guess you have to use the buffer operator for this (maybe you could do with the simpler buffer, but I'm not sure about that). See also this other question that covers roughly the same topic and this GitHub page for more discussion. Hope this helps you!

这篇关于基于内容的RxJava可观察缓冲区的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆