Java通过谓词将流拆分为流 [英] Java split stream by predicate into stream of streams

查看:69
本文介绍了Java通过谓词将流拆分为流的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有数百个要使用GZIPInputStream解析的大型(6GB)gzip压缩日志文件.假设每个格式都为:

I have hundreds of large (6GB) gziped log files that I'm reading using GZIPInputStreams that I wish to parse. Suppose each one has the format:

Start of log entry 1
    ...some log details
    ...some log details
    ...some log details
Start of log entry 2
    ...some log details
    ...some log details
    ...some log details
Start of log entry 3
    ...some log details
    ...some log details
    ...some log details

我正在通过BufferedReader.lines()逐行流式处理gziped文件的内容.流看起来像:

I'm streaming the gziped file contents line by line through BufferedReader.lines(). The stream looks like:

[
    "Start of log entry 1",
    "    ...some log details",
    "    ...some log details",
    "    ...some log details",
    "Start of log entry 2",
    "    ...some log details",
    "    ...some log details",
    "    ...some log details",
    "Start of log entry 2",
    "    ...some log details",
    "    ...some log details",
    "    ...some log details",
]

每个日志条目的开头都可以由谓词line -> line.startsWith("Start of log entry")标识.我想根据此谓词将此Stream<String>转换为Stream<Stream<String>>.每个子流"都应在谓词为true时开始,并在谓词为false时收集行,直到下一次谓词为true为止,这表示该子流的末尾和下一个子流的开始.结果如下:

The start of every log entry can by identified by the predicate: line -> line.startsWith("Start of log entry"). I would like to transform this Stream<String> into a Stream<Stream<String>> according to this predicate. Each "substream" should start when the predicate is true, and collect lines while the predicate is false, until the next time the predicate true, which denotes the end of this substream and the start of the next. The result would look like:

[
    [
        "Start of log entry 1",
        "    ...some log details",
        "    ...some log details",
        "    ...some log details",
    ],
    [
        "Start of log entry 2",
        "    ...some log details",
        "    ...some log details",
        "    ...some log details",
    ],
    [
        "Start of log entry 3",
        "    ...some log details",
        "    ...some log details",
        "    ...some log details",
    ],
]

从那里,我可以获取每个子流并将其映射到new LogEntry(Stream<String> logLines),以便将相关的日志行聚合到LogEntry对象中.

From there, I can take each substream and map it through new LogEntry(Stream<String> logLines) so as to aggregate related log lines into LogEntry objects.

这是一个大概的外观:

import java.io.*;
import java.nio.charset.*;
import java.util.*;
import java.util.function.*;
import java.util.stream.*;

import static java.lang.System.out;

class Untitled {
    static final String input = 
        "Start of log entry 1\n" +
        "    ...some log details\n" +
        "    ...some log details\n" +
        "    ...some log details\n" +
        "Start of log entry 2\n" +
        "    ...some log details\n" +
        "    ...some log details\n" +
        "    ...some log details\n" +
        "Start of log entry 3\n" +
        "    ...some log details\n" +
        "    ...some log details\n" +
        "    ...some log details";

    static final Predicate<String> isLogEntryStart = line -> line.startsWith("Start of log entry"); 

    public static void main(String[] args) throws Exception {
        try (ByteArrayInputStream gzipInputStream
        = new ByteArrayInputStream(input.getBytes(StandardCharsets.UTF_8)); // mock for fileInputStream based gzipInputStream
             InputStreamReader inputStreamReader = new InputStreamReader( gzipInputStream ); 
             BufferedReader reader = new BufferedReader( inputStreamReader )) {

            reader.lines()
                .splitByPredicate(isLogEntryStart) // <--- What witchcraft should go here?
                .map(LogEntry::new)
                .forEach(out::println);
        }
    }
}

约束:我有数百个这样的大文件要并行处理(但每个文件只有一个顺序流),这使得将它们完全加载到内存中(例如通过将它们存储为List<String> lines)是不可行的

Constraint: I have hundreds of these large files to process, in parallel (but only a single sequential stream per file), which makes loading them them entirely into memory (e.g. by storing them as a List<String> lines) is not feasible.

任何帮助表示赞赏!

推荐答案

Frederico的答案可能是解决此特定问题的最好方法.继他对自定义Spliterator的最后思考之后,我将为

Frederico's answer is probably the nicest way for this particular problem. Following his last thought about custom Spliterator, I'll add an adapted version of an answer to a similar question, where I proposed using a custom iterator to created a chunked stream. This approach would also work on other streams that are not created by input readers.

public class StreamSplitter<T>
    implements Iterator<Stream<T>>
{
    private Iterator<T>  incoming;
    private Predicate<T> startOfNewEntry;
    private T            nextLine;

    public static <T> Stream<Stream<T>> streamOf(Stream<T> incoming, Predicate<T> startOfNewEntry)
    {
        Iterable<Stream<T>> iterable = () -> new StreamSplitter<>(incoming, startOfNewEntry);
        return StreamSupport.stream(iterable.spliterator(), false);
    }

    private StreamSplitter(Stream<T> stream, Predicate<T> startOfNewEntry)
    {
        this.incoming = stream.iterator();
        this.startOfNewEntry = startOfNewEntry;
        if (incoming.hasNext())
            nextLine = incoming.next();
    }

    @Override
    public boolean hasNext()
    {
        return nextLine != null;
    }

    @Override
    public Stream<T> next()
    {
        List<T> nextEntrysLines = new ArrayList<>();
        do
        {
            nextEntrysLines.add(nextLine);
        } while (incoming.hasNext()
                 && !startOfNewEntry.test((nextLine = incoming.next())));

        if (!startOfNewEntry.test(nextLine)) // incoming does not have next
            nextLine = null;

        return nextEntrysLines.stream();
    }
}

示例

public static void main(String[] args)
{
    Stream<String> flat = Stream.of("Start of log entry 1",
                                    "    ...some log details",
                                    "    ...some log details",
                                    "Start of log entry 2",
                                    "    ...some log details",
                                    "    ...some log details",
                                    "Start of log entry 3",
                                    "    ...some log details",
                                    "    ...some log details");

    StreamSplitter.streamOf(flat, line -> line.matches("Start of log entry.*"))
                  .forEach(logEntry -> {
                      System.out.println("------------------");
                      logEntry.forEach(System.out::println);
                  });
}

// Output
// ------------------
// Start of log entry 1
//     ...some log details
//     ...some log details
// ------------------
// Start of log entry 2
//     ...some log details
//     ...some log details
// ------------------
// Start of log entry 3
//     ...some log details
//     ...some log details

迭代器总是向前看一行.一旦该lline是新条目的开始,它将把先前的条目包装在流中,并将其返回为next.工厂方法streamOf将此迭代器转换为要在上面的示例中使用的流.

The iterator always looks one line ahead. As soon as that lline is the beginning of a new entry, it will wrapp the previous entry in a stream and return it as next. The factory method streamOf turns this iterator into a stream to be used as in the example I gave above.

我将分割条件从正则表达式更改为Predicate,因此您可以在多个正则表达式,if条件等的帮助下指定更复杂的条件.

I changed the split condition from a regex to a Predicate, so you can specify more complicated conditions with the help of multiple regexes, if-conditions, and so on.

请注意,我仅使用上面的示例数据对其进行了测试,所以我不知道它在更复杂,错误或空的输入下的行为.

Note that I only tested it with the example data above, so I don't know how it would behave with more complicated, errornous, or empty input.

这篇关于Java通过谓词将流拆分为流的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆