如何使用java.util.stream处理文件的块 [英] How to process chuncks of a file with java.util.stream

查看:89
本文介绍了如何使用java.util.stream处理文件的块的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

为了熟悉流api,我尝试编写一个非常简单的模式。

To get familliar with the stream api, I tried to code a quite simple pattern.

问题:有一个不包含的文本文件嵌套的文本块。所有块都由开始/结束模式标识(例如< start> < stop> 。 block在语法上与块之间的噪声没有区别。因此,使用简单(无状态)lambda是不可能的。

Problem: Having a text file containing not nested blocks of text. All blocks are identified by start/endpatterns (e.g. <start> and <stop>. The content of a block isn't syntactically distinguishable from the noise between the blocks. Therefore it is impossible, to work with simple (stateless) lambdas.

我只能实现丑陋的东西喜欢:

Files.lines(path).collect(new MySequentialParseAndProsessEachLineCollector<>());

说实话,这不是我想要的。

I was just able to implement something ugly like:
Files.lines(path).collect(new MySequentialParseAndProsessEachLineCollector<>());
To be honest, this is not what I want.

我正在寻找一个像这样的映射器:

Files.lines(path) .map(MyMapAllLinesOfBlockToBuckets())。parallelStream()。collect(new MyProcessOneBucketCollector<>());

Im looking for a mapper something like:
Files.lines(path).map(MyMapAllLinesOfBlockToBuckets()).parallelStream().collect(new MyProcessOneBucketCollector<>());

有一个很好的方法来提取来自java 8流的数据块似乎包含了一个解决方案的框架。不幸的是,我要陷入困境吃了我的问题。 ; - )

is there a good way to extract chunks of data from a java 8 stream seems to contain a skeleton of a solution. Unfortunatly, I'm to stubid to translate that to my problem. ;-)

任何提示?

推荐答案

这是一个可用于转换 Stream< String> ,每个元素代表一行,到 Stream< List< String>> ,每个元素代表一个使用指定分隔符找到的块:

Here is a solution which can be used for converting a Stream<String>, each element representing a line, to a Stream<List<String>>, each element representing a chunk found using a specified delimiter:

public class ChunkSpliterator implements Spliterator<List<String>> {
    private final Spliterator<String> source;
    private final Predicate<String> start, end;
    private final Consumer<String> getChunk;
    private List<String> current;

    ChunkSpliterator(Spliterator<String> lineSpliterator,
        Predicate<String> chunkStart, Predicate<String> chunkEnd) {
        source=lineSpliterator;
        start=chunkStart;
        end=chunkEnd;
        getChunk=s -> {
            if(current!=null) current.add(s);
            else if(start.test(s)) current=new ArrayList<>();
        };
    }
    public boolean tryAdvance(Consumer<? super List<String>> action) {
        while(current==null || current.isEmpty()
                            || !end.test(current.get(current.size()-1)))
            if(!source.tryAdvance(getChunk)) return false;
        current.remove(current.size()-1);
        action.accept(current);
        current=null;
        return true;
    }
    public Spliterator<List<String>> trySplit() {
        return null;
    }
    public long estimateSize() {
        return Long.MAX_VALUE;
    }
    public int characteristics() {
        return ORDERED|NONNULL;
    }

    public static Stream<List<String>> toChunks(Stream<String> lines,
        Predicate<String> chunkStart, Predicate<String> chunkEnd,
        boolean parallel) {

        return StreamSupport.stream(
            new ChunkSpliterator(lines.spliterator(), chunkStart, chunkEnd),
            parallel);
    }
}

与谓词匹配的行不包含在块中;如果需要,可以很容易地改变这种行为。

The lines matching the predicates are not included in the chunk; it would be easy to change this behavior, if desired.

它可以像这样使用:

ChunkSpliterator.toChunks( Files.lines(Paths.get(myFile)),
    Pattern.compile("^<start>$").asPredicate(),
    Pattern.compile("^<stop>$").asPredicate(),
    true )
   .collect(new MyProcessOneBucketCollector<>())

模式指定为 ^ word $ ,要求整行只包含单词;没有这些锚点,包含模式的行可以开始和结束一个块。源流的性质在创建块时不允许并行,因此当使用立即收集操作进行链接时,整个操作的并行性相当有限。它取决于 MyProcessOneBucketCollector ,如果可以有任何并行性。

The patterns are specifying as ^word$ to require the entire line to consist of the word only; without these anchors, lines containing the pattern can start and end a chunk. The nature of the source stream does not allow parallelism when creating the chunks, so when chaining with an immediate collection operation the parallelism for the entire operation is rather limited. It depends on the MyProcessOneBucketCollector if there can be any parallelism at all.

如果你的最终结果不依赖于在源文件中出现桶的顺序,强烈建议您的收集器报告自己是 UNORDERED 或插入 unordered() collect 之前的流的方法链中。

If your final result does not depend on the order of occurrences of the buckets in the source file, it is strongly recommended that either your collector reports itself to be UNORDERED or you insert an unordered() in the stream’s method chains before the collect.

这篇关于如何使用java.util.stream处理文件的块的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆