如何使用 Spring Boot @RestController 流式传输分块响应 [英] How To Stream Chunked Response With Spring Boot @RestController

查看:41
本文介绍了如何使用 Spring Boot @RestController 流式传输分块响应的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我在这上面花了大约一天的时间,但找不到有效的解决方案.在我们的应用程序中,我们有几个端点可以返回大响应.我一直在尝试寻找一种机制,允许我们在处理数据库查询结果时流式传输响应.主要目标是限制服务端的峰值内存使用(不需要内存中的整个响应)并最小化响应第一个字节的时间(如果响应没有开始进入,客户端系统会超时)指定时间 - 10 分钟).我真的很惊讶这这么难.

I have spent like a day on this and I am unable to find a solution that works. In our application we have a couple of endpoints that can return large responses. I have been trying to find a mechanism that allows us to stream the response as we process the result of a database query. The main goals are to limit peak memory usage (not need the entire response in memory) on the service side and to minimize the time to first byte of response (the client system has a timeout if the response doesn't start to come within the specified time - 10 minutes). I'm really surprised this is so hard.

我找到了 StreamingResponseBody 并且它似乎接近我们想要的,虽然我们并不真正需要异步方面,我们只希望能够在处理查询结果时开始流式传输响应.我也尝试过其他方法,例如使用@ResponseBody 进行注释、返回 void 并添加 OutputStream 的参数,但这不起作用,因为传递的 OutputStream 基本上只是一个缓存整个结果的 CachingOutputStream.这是我现在拥有的...

I found StreamingResponseBody and it seemed close to what we wanted, although we don't really need the asynchronous aspect, we only want to be able to start streaming the response as we process the query result. I have tried other approaches as well, like annotating with @ResponseBody, returning void, and adding a parameter of OutputStream, but that didn't work because the passed OutputStream was basically just a CachingOutputStream that buffered the entire result. Here is what I have now...

资源方法:

@GetMapping(value = "/catalog/features")
public StreamingResponseBody findFeatures(                                      
        @RequestParam("provider-name") String providerName,
        @RequestParam(name = "category", required = false) String category,
        @RequestParam("date") String date,
        @RequestParam(value = "version-state", defaultValue = "*") String versionState) {

    CatalogVersionState catalogVersionState = getCatalogVersionState(versionState);

    log.info("GET - Starting DB query...");
    final List<Feature> features 
        = featureService.findFeatures(providerName, 
                                      category, 
                                      ZonedDateTime.parse(date), 
                                      catalogVersionState);
    log.info("GET - Query done!");

    return new StreamingResponseBody() {
        @Override
        public void writeTo(OutputStream outputStream) throws IOException {
            log.info("GET - Transforming DTOs");
            JsonFactory jsonFactory = new JsonFactory();
            JsonGenerator jsonGenerator = jsonFactory.createGenerator(outputStream);
            Map<Class<?>, JsonSerializer<?>> serializerMap = new HashMap<>();
            serializerMap.put(DetailDataWrapper.class, new DetailDataWrapperSerializer());
            serializerMap.put(ZonedDateTime.class, new ZonedDateTimeSerializer());
            ObjectMapper jsonMapper =  Jackson2ObjectMapperBuilder.json()
                .serializersByType(serializerMap)
                .deserializerByType(ZonedDateTime.class, new ZonedDateTimeDeserializer())
                .build();
            jsonGenerator.writeStartArray();
            for (Feature feature : features) {
                FeatureDto dto = FeatureMapper.MAPPER.featureToFeatureDto(feature);
                jsonMapper.writeValue(jsonGenerator, dto);
                jsonGenerator.flush();
            }
            jsonGenerator.writeEndArray();
            log.info("GET - DTO transformation done!");
        }
    };
}

异步配置:

@Configuration
@EnableAsync
@EnableScheduling
public class ProductCatalogStreamingConfig extends WebMvcConfigurerAdapter {

    private final Logger log = LoggerFactory.getLogger(ProductCatalogStreamingConfig.class);

    @Override
    public void configureAsyncSupport(AsyncSupportConfigurer configurer) {
        configurer.setDefaultTimeout(360000).setTaskExecutor(getAsyncExecutor());
        configurer.registerCallableInterceptors(callableProcessingInterceptor());
    }

    @Bean(name = "taskExecutor")
    public AsyncTaskExecutor getAsyncExecutor() {
        log.debug("Creating Async Task Executor");
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(2);
        executor.setMaxPoolSize(5);
        executor.setQueueCapacity(25);
        executor.setThreadNamePrefix("AsyncStreaming-");
        return executor;
    }

    @Bean
    public CallableProcessingInterceptor callableProcessingInterceptor() {
        return new TimeoutCallableProcessingInterceptor() {
            @Override
            public <T> Object handleTimeout(NativeWebRequest request, Callable<T> task) throws 
Exception {
                log.error("timeout!");
                return super.handleTimeout(request, task);
            }
        };
    }
}

我期望客户端会在调用 StreamingResponseBody.writeTo() 后立即开始看到响应,并且响应标头将包含

I was expecting that the client would start seeing the response as soon as StreamingResponseBody.writeTo() was called and that the response headers would include

Content-Encoding: chunked

但不是

Content-Length: xxxx

相反,在 StreamingResponseBody.writeTo() 返回并且响应包括 Content-Length 之前,我在客户端看不到任何响应.(但不是内容编码)

Instead, I don't see any response at the client until StreamingResponseBody.writeTo() has returned and the response includes the Content-Length. (but not Content-Encoding)

我的问题是,当我在 writeTo() 中写入 OutputStream 而不是缓存整个有效负载并仅在最后发送时,告诉 Spring 发送分块响应的秘诀是什么?具有讽刺意味的是,我发现一些帖子想知道如何禁用分块编码,但对启用它一无所知.

My question is, What is the secret sauce that tells Spring to send a chunked response while I'm writing to OutputStream in writeTo() and not cache the entire payload and send it only at the end? Ironically I have found posts that want to know how to disable chunked encoding, but nothing about enabling it.

推荐答案

事实证明,上面的代码正是我们所寻求的.我们观察到的行为不是由于 Spring 实现这些功能的方式的任何原因,而是由公司特定的启动器引起的,该启动器安装了干扰 Spring 正常行为的 servlet 过滤器.这个过滤器包装了 HttpServletResponse OutputStream,这就是我们观察问题中提到的 CachingOutputStream 的原因.移除 starter 后,上述代码的行为与我们希望的完全一致,我们正在以不会干扰此行为的方式重新实现 servlet 过滤器.

It turns out the code above does exactly what we were seeking. The behavior we observed was not due to anything in the way Spring has implemented these features, it was caused by a company specific starter that installed a servlet filter that interfered with the normal Spring behavior. This filter wrapped the HttpServletResponse OutputStream and that is why we observed the CachingOutputStream noted in the question. After removing the starter, the above code behaved exactly as we hoped and we are re-implementing the servlet filter in a way that will not interfere with this behavior.

这篇关于如何使用 Spring Boot @RestController 流式传输分块响应的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆