RxJava 缓冲区/窗口,直到元素遇到条件 [英] RxJava buffer/window until element suffers condition

查看:34
本文介绍了RxJava 缓冲区/窗口,直到元素遇到条件的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试从阅读器中读取行并将它们分组到属于一起的块.

I am trying to read lines from a reader and group them to blocks that belong together.

源文本:

bla1
bla2
### block separator ###
bla3
bla4
### block separator ###
...

我需要得到两个块 (bla1, bla2) 和 (bla3, bla4).

I need to get the two blocks (bla1, bla2) and (bla3, bla4).

代码:

import org.apache.commons.lang3.StringUtils;
import rx.Observable;

import java.io.BufferedReader;
import java.io.FileReader;
import java.io.Reader;
import java.util.Iterator;

public class BlockBuilder {

  public static void main(String[] args) {

    try {
      FileReader fileReader = new FileReader("/path/to/some/file");
      LineIterable lineIterable = new LineIterable(fileReader);

      Observable.from(lineIterable)
          .buffer(100)
          // Needed instead of time/count: until line matches condition
          // something like .buffer(line -> line.equals("### block separator ###")
          .forEach(gatheredLines -> {
            String gatheredBlock = StringUtils.join(gatheredLines, '\n');
            System.out.println(gatheredBlock);
            System.out.println("###### ###### ###### ######");
          });
    } catch (Exception ex) {
      ex.printStackTrace();
    }
  }

  private static class LineIterable implements Iterable<String> {
    private final Iterator<String> iterator;
    public LineIterable(Reader reader) {
      iterator = new BufferedReader(reader).lines().iterator();
    }
    @Override
    public Iterator<String> iterator() {
      return iterator;
    }
  } 
}

是否使用缓冲区或窗口并不重要,或者我是否完全错误地考虑了这两者.

It doesn't matter if buffer or window is used or if I'm completely wrong for thinking of those two.

我认为对于缓冲区使用 bufferClosingSelector 或为窗口使用 closureSelector 一定是可能的.两者都是创建一个 Observer 的函数,它可以触发关闭当前缓冲区或窗口,但我看不到这里可以在哪里获取当前行.

I thought it must be possible with the bufferClosingSelector for the buffer or the closingSelector for the window. Both are functions that create an Observer which can trigger the closing of the current buffer or window but I can't see where I can get hold of the current line here.

推荐答案

您可以发布源代码并将其用于缓冲和缓冲区边界:

You can publish your source and use it for both buffering and buffer boundary:

Observable<String> source = Observable.just(
        "a", "b", "#", 
        "c", "d", "e", "#", 
        "f", "g");

source.publish(p -> 
        p.filter(v -> !"#".equals(v))
        .buffer(() -> p.filter(v -> "#".equals(v))))
.subscribe(System.out::println);

这篇关于RxJava 缓冲区/窗口,直到元素遇到条件的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆