结束“无限”的结果。在满足某些条件时流 [英] Ending an "infinite" stream when certain conditions are met

查看:107
本文介绍了结束“无限”的结果。在满足某些条件时流的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试从REST风格的Web服务中提取数据,该服务以页面形式提供内容。

I'm trying to pull data from a REST-style web-service which delivers content in pages.

我知道自己已经到了最后的唯一方法就是当我要求页面时没有结果。我想在那时终止流。

The only way I can know that I've reached the end is when I ask for a page and there are no results. I'd like to terminate the stream at that time.

我编写了以下Java代码。第一个函数从Web服务中提取单个页面并将其作为流返回。第二个函数将流平面映射为单个流。

I've written the following Java code. The first function pulls a single page from the web-service and returns it as a stream. The second function flatmaps the streams together into a single stream.

public Stream<ApplicationResponse> getApplications(String token, RestTemplate rt, Integer page, Integer pageSize) {
    HttpEntity<String> entity = new HttpEntity<>("parameters", getHeaders(token));
    String url = String.format("%s?PageIndex=%s&PageSize=%s", endpoint, page, pageSize);
    ResponseEntity<ApplicationCollection> ar = rt.exchange(url, HttpMethod.GET, entity, ApplicationCollection.class);
    ApplicationResponse[] res = Objects.requireNonNull(ar.getBody()).getData();

    // Do something here when res is empty, so that the stream ends

    return Arrays.stream(res);
}

public Stream<ApplicationResponse> getApplications(String token, RestTemplate rt) {
    // This function does the right thing, exept when we run out of data!
    return IntStream.iterate(1, i -> i + 1).mapToObj(i -> getApplications(token, rt, i, 500)).flatMap(Function.identity());
}

问题是,我如何允许这种情况结束?

The problem is, how do I allow this to end?

如果我用Python编写这个,我会在我知道没有什么东西放到流上的时候引发一个StopIteration异常。有什么类似我可以做的吗?

If I were writing this in Python I'd raise a StopIteration exception at the point where I know there's nothing left to put onto the stream. Is there something similar I can do?

我能想到的最好的事情是使用null,或者引发异常以表示数据的结束,然后结束流入迭代器,该迭代器知道在收到该信号时停止。但是,我还能做更多惯用吗?

The best thing I could think of was use a null, or raise an exception to signify the end of data and then wrap up the stream into an Iterator that knows to stop when that signal is received. But is there anything more idiomatic that I can do?

推荐答案

在Holger的评论之后,我试了一下并尝试了 Spliterator 而不是 Iterator 。这确实比较简单,因为 next hasNext 是......有点合并到 tryAdvance ?它甚至足够短,只需将其内联到一个util方法,imo。

After comments from Holger, I gave it a shot and tried Spliterator instead of Iterator. It is indeed simpler, as next and hasNext are... kinda combined into tryAdvance? It is even short enough to just inline it into a util method, imo.

public static Stream<ApplicationResponse> getApplications(String token, RestTemplate rt)
{
    return StreamSupport.stream(new AbstractSpliterator<ApplicationResponse[]>(Long.MAX_VALUE,
                                                                               Spliterator.ORDERED
                                                                                               | Spliterator.IMMUTABLE)
    {
        private int page = 1;

        @Override
        public boolean tryAdvance(Consumer<? super ApplicationResponse[]> action)
        {
            HttpEntity<String> entity = new HttpEntity<>("parameters", getHeaders(token));
            String url = String.format("%s?PageIndex=%s&PageSize=%s", endpoint, page, 500);
            ResponseEntity<ApplicationCollection> ar = rt.exchange(url, HttpMethod.GET, entity,
                                                                   ApplicationCollection.class);
            ApplicationResponse[] res = Objects.requireNonNull(ar.getBody()).getData();

            if (res.length == 0)
                return false;

            page++;
            action.accept(res);
            return true;
        }
    }, false).flatMap(Arrays::stream);
}






你可以实现一个迭代器并创建它的流:


You could implement an Iterator and create a Stream of it:

public class ResponseIterator
    implements Iterator<Stream<ApplicationResponse>>
{
    private int                   page = 1;
    private String                token;
    private RestTemplate          rt;

    private ApplicationResponse[] next;

    private ResponseIterator(String token, RestTemplate rt)
    {
        this.token = token;
        this.rt = rt;
    }

    public static Stream<ApplicationResponse> getApplications(String token, RestTemplate rt)
    {
        Iterable<Stream<ApplicationResponse>> iterable = () -> new ResponseIterator(token, rt);
        return StreamSupport.stream(iterable.spliterator(), false).flatMap(Function.identity());
    }

    @Override
    public boolean hasNext()
    {
        if (next == null)
        {
            next = getNext();
        }
        return next.length != 0;
    }

    @Override
    public Stream<ApplicationResponse> next()
    {
        if (next == null)
        {
            next = getNext();
        }
        Stream<ApplicationResponse> nextStream = Arrays.stream(next);
        next = getNext();
        return nextStream;
    }

    private ApplicationResponse[] getNext()
    {
        HttpEntity<String> entity = new HttpEntity<>("parameters", getHeaders(token));
        String url = String.format("%s?PageIndex=%s&PageSize=%s", endpoint, page, 500);
        ResponseEntity<ApplicationCollection> ar = rt.exchange(url, HttpMethod.GET, entity,
                                                               ApplicationCollection.class);
        ApplicationResponse[] res = Objects.requireNonNull(ar.getBody()).getData();
        page++;
        return res;
    }
}

它将检查下一个响应是否为空 hasNext(),停止流。否则,它将流和flatMap响应。我已经硬连线 pageSize ,但您可以轻松地将其作为工厂方法的第三个输入 ResponseIterator.getApplications()

It will check whether the next response is empty in hasNext(), stopping the stream. Otherwise, it will stream and flatMap that response. I have hardwired pageSize, but you can easily make that a third input for the factory method ResponseIterator.getApplications().

这篇关于结束“无限”的结果。在满足某些条件时流的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆