如何转换Reactor Flux< String>到InputStream [英] How to convert Reactor Flux<String> to InputStream

查看:214
本文介绍了如何转换Reactor Flux< String>到InputStream的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

鉴于我的Flux<String>大小未知,如何将其转换为其他库期望的InputStream?

Given that I have a Flux<String> of unknown size, how can I convert it into InputStream that other library is expecting?

例如,使用WebClient,我可以使用这种方法来实现

For example with WebClient I can achieve that using this approach

WebClient.get('example.com').exchange.flatMap { it.bodyToMono(InputStreamResource::class.java) }.map { it.inputStream }

但是当我输入Flux<String>时我不知道该怎么做?

but I can't figure out how to do the same when I have Flux<String> as an input?

推荐答案

可能有很多方法可以做到这一点.一种可能性是使用 PipedInputStream PipedOutputStream .

There are probably many ways to do this. One possibility is to use PipedInputStream and PipedOutputStream.

此方法的工作方式是将输出流链接到输入流,这样,您就可以从链接的输入流中读取写入到输出流的所有内容,方法是在两个输入流之间创建一个管道. /p>

The way this works is that you link an output stream to an input stream such that everything you write to the output stream can be read from the linked input stream, by doing this, creating a pipe between the two of them.

PipedInputStream in = new PipedInputStream();
PipedOutputStream out = PipedOutputStream(in);

有一个警告,不过,根据管道流的文档,写入过程和读取过程必须在单独的线程上进行,否则我们可能会导致死锁.

There is one caveat, though, according to the documentation of piped streams, the writing process and the reading process must be happening on separate threads, otherwise we may cause a deadlock.

因此,回到我们的反应流方案中,我们可以创建管道(如上所述)并订阅Flux对象,并将您从其中获得的数据写入到管道输出流中.无论您在此处写入什么内容,都可以在管道的另一侧的相应输入流中进行读取.您可以与非反应性方法共享该输入流.

So, coming back to our reactive stream scenario, we can create a pipeline (as mentioned above) and subscribe to the Flux object and the data you get from it you write it to a piped output stream. Whatever you write there, will be available for reading at the other side of the pipe, in the corresponding input stream. This input stream is the one you can share with your non-reactive method.

我们只需要格外小心,因为我们在单独的线程(例如.)上订阅了Flux. subscribeOn(Schedulers.elastic()).

We just have to be extra careful that we subscribe to the Flux on a separate thread, .e.g. subscribeOn(Schedulers.elastic()).

这是此类订户的非常基本的实现:

Here's a very basic implementation of such subscriber:

class PipedStreamSubscriber extends BaseSubscriber<byte[]> {

    private final Logger logger = LoggerFactory.getLogger(this.getClass());

    private final PipedInputStream in;
    private PipedOutputStream out;

    PipedStreamSubscriber(PipedInputStream in) {
        Objects.requireNonNull(in, "The input stream must not be null");
        this.in = in;
    }

    @Override
    protected void hookOnSubscribe(Subscription subscription) {
        //change if you want to control back-pressure
        super.hookOnSubscribe(subscription);
        try {
            this.out = new PipedOutputStream(in);
        } catch (IOException e) {
            //TODO throw a contextual exception here
            throw new RuntimeException(e);
        }
    }

    @Override
    protected void hookOnNext(byte[] payload) {
        try {
            out.write(payload);
        } catch (IOException e) {
            //TODO throw a contextual exception here
            throw new RuntimeException(e);
        }
    }

    @Override
    protected void hookOnComplete() {
        close();
    }

    @Override
    protected void hookOnError(Throwable error) {
        //TODO handle the error or at least log it
        logger.error("Failure processing stream", error);
        close();
    }

    @Override
    protected void hookOnCancel() {
        close();
    }

    private void close() {
        try {
            if (out != null) {
                out.close();
            }
        } catch (IOException e) {
            //probably just ignore this one or simply  log it
        }
    }
}

然后使用此订阅者,我可以定义一个非常简单的实用程序方法,该方法将Flux<byte[]转换为InputStream,如下所示:

And using this subscriber I could define a very simple utility method that turned a Flux<byte[] into an InputStream, somewhat as follows:

static InputStream createInputStream(Flux<byte[]> flux) {

    PipedInputStream in = new PipedInputStream();
    flux.subscribeOn(Schedulers.elastic())
        .subscribe(new PipedStreamSubscriber(in));

    return in;
}

请注意,当流完成,发生错误或取消订阅时,我非常小心地关闭输出流,否则我们冒着在读取端阻塞,等待更多输入到达的风险.关闭输出流是在管道另一端发出信号的信号.

Notice that I was extra careful to close the output stream when the flow is done, when error occurs or the subscription is cancelled, otherwise we run the risk of blocking in the read side, waiting for more input to arrive. Closing the output stream is what signals the end of the input stream at the other side of the pipe.

现在,InputStream可以像任何常规流一样被使用,因此您可以将其传递给非反应性方法,例如

And now that InputStream can be consumed just as any regular stream and therefore you could pass it around to your non-reactive method, e.g.

Flux<byte[]> jedi = Flux.just("Luke\n", "Obi-Wan\n", "Yoda\n").map(String::getBytes);

try (InputStream in = createInputStream(jedi)) {
    byte[] data = new byte[5];
    int size = 0;
    while ((size = in.read(data)) > 0) {
        System.out.printf("%s", new String(data, 0, size));
    }
} 

上面的代码产生:

Luke
Obi-Wan
Yoda

这篇关于如何转换Reactor Flux&lt; String&gt;到InputStream的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
相关文章
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆