具有批处理功能的 Java 8 Stream [英] Java 8 Stream with batch processing

查看:24
本文介绍了具有批处理功能的 Java 8 Stream的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个包含项目列表的大文件.

I have a large file that contains a list of items.

我想创建一批项目,用这批项目发出一个 HTTP 请求(所有项目都需要作为 HTTP 请求中的参数).我可以使用 for 循环非常轻松地做到这一点,但作为 Java 8 爱好者,我想尝试使用 Java 8 的 Stream 框架编写它(并获得延迟处理的好处).

I would like to create a batch of items, make an HTTP request with this batch (all of the items are needed as parameters in the HTTP request). I can do it very easily with a for loop, but as Java 8 lover, I want to try writing this with Java 8's Stream framework (and reap the benefits of lazy processing).

例子:

List<String> batch = new ArrayList<>(BATCH_SIZE);
for (int i = 0; i < data.size(); i++) {
  batch.add(data.get(i));
  if (batch.size() == BATCH_SIZE) process(batch);
}

if (batch.size() > 0) process(batch);

我想做一些很长的事情lazyFileStream.group(500).map(processBatch).collect(toList())

最好的方法是什么?

推荐答案

注意!此解决方案在运行 forEach 之前读取整个文件.

您可以使用 jOOλ,这是一个为单线程扩展 Java 8 流的库,顺序流用例:

You could do it with jOOλ, a library that extends Java 8 streams for single-threaded, sequential stream use-cases:

Seq.seq(lazyFileStream)              // Seq<String>
   .zipWithIndex()                   // Seq<Tuple2<String, Long>>
   .groupBy(tuple -> tuple.v2 / 500) // Map<Long, List<String>>
   .forEach((index, batch) -> {
       process(batch);
   });

在幕后,zipWithIndex() 只是:

static <T> Seq<Tuple2<T, Long>> zipWithIndex(Stream<T> stream) {
    final Iterator<T> it = stream.iterator();

    class ZipWithIndex implements Iterator<Tuple2<T, Long>> {
        long index;

        @Override
        public boolean hasNext() {
            return it.hasNext();
        }

        @Override
        public Tuple2<T, Long> next() {
            return tuple(it.next(), index++);
        }
    }

    return seq(new ZipWithIndex());
}

... 而 groupBy() 是 API 方便:

... whereas groupBy() is API convenience for:

default <K> Map<K, List<T>> groupBy(Function<? super T, ? extends K> classifier) {
    return collect(Collectors.groupingBy(classifier));
}

(免责声明:我为 jOOλ 背后的公司工作)

(Disclaimer: I work for the company behind jOOλ)

这篇关于具有批处理功能的 Java 8 Stream的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆