将对OutputStream的写入转换为ServerResponse可用的流量(<DataBuffer&> [英] Convert writes to OutputStream into a Flux<DataBuffer> usable by ServerResponse

查看:9
本文介绍了将对OutputStream的写入转换为ServerResponse可用的流量(<DataBuffer&>的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个旧库,我必须使用它来检索文件。此旧库不会像您通常期望的读取内容那样在InputStream中返回,但它希望传递给它一个开放的OutputStream,它可以写入。

我必须编写一个Webflow REST服务,将此OutputStream写入org.springframework.web.reactive.function.server.ServerResponse正文。

legacyLib.BlobRead(outputStream); // writes the stream to an outputstream, that has to be provided by me, and somehow has to end up in the ServerResponse

因为我想将Stream直接传递给ServerResponse,所以我可能必须这样做,对吗?

ServerResponse.ok().body(magicOutpuStreamToFluxConverter(), DataBuffer.class);

推荐答案

这里是RequestHandler的一部分,这很重要;我忽略了一些错误处理/异常捕获,这些错误处理/捕获通常是不需要的。注意,我为读取发布了一个不同的调度器(或者至少这是我想要做的),这样这个阻塞读取就不会干扰我的主事件线程:

private Mono<ServerResponse> writeToServerResponse(@NotNull FPTag tag) {
    final long blobSize = tag.getBlobSize();
    return ServerResponse.ok()
        .contentType(MediaType.APPLICATION_OCTET_STREAM)
        .body(Flux.<DataBuffer>create((FluxSink<DataBuffer> emitter) -> {
      // for a really big blob I want to read it in chunks, so that my server doesn't use too much memory
      for(int i = 0; i < blobSize; i+= tagChunkSize) {
        // new DataBuffer that is written to, then emitted later
        DefaultDataBuffer dataBuffer = new DefaultDataBufferFactory().allocateBuffer();
        try (OutputStream outputStream = dataBuffer.asOutputStream()) {
          // write to the outputstream of DataBuffer
          tag.BlobReadPartial(outputStream, i, tagChunkSize, FPLibraryConstants.FP_OPTION_DEFAULT_OPTIONS);
          // don't know if flushing is strictly neccessary
          outputStream.flush();
        } catch (IOException | FPLibraryException e) {
          log.error("Error reading + writing from tag to http outputstream", e);
          emitter.error(e);
        }
        emitter.next(dataBuffer);
      }
      // if blob is finished, send "complete" to my flux of DataBuffers
      emitter.complete();
    }, FluxSink.OverflowStrategy.BUFFER).publishOn(Schedulers.newElastic("centera")).doOnComplete(() -> closeQuietly(tag)), DataBuffer.class);

}

这篇关于将对OutputStream的写入转换为ServerResponse可用的流量(&lt;DataBuffer&>的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆