使用 ResponseEntity 进行流式传输并确保 InputStream 关​​闭的正确方法 [英] Proper way of streaming using ResponseEntity and making sure the InputStream gets closed

查看:328
本文介绍了使用 ResponseEntity 进行流式传输并确保 InputStream 关​​闭的正确方法的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我们的一个应用程序泄漏了文件句柄,我们尚未找到原因.

One of our application leaks file handles and we have not yet found the cause for this.

在代码中我可以看到几个类似的函数:

In the code I can see several functions similar to this:

public ResponseEntity<InputStreamResource> getFoo( ... ) {
    InputStream content = getContent(...)
    InputStreamResource isr = new InputStreamResource(content);
    return ResponseEntity.status(HttpServletResponse.SC_OK).body(isr);
}

(if 检查和 try/catch 为简洁起见删除)

(if checks and try / catch removed for brevity)

我确信这部分会导致问题,因为当我使用 JMeter 加载测试此特定代码时,我可以看到 getContent() 在此阶段失败:

I am sure this section causes the problem because when I loadtest this specific code with JMeter I can see that getContent() fails in this stage:

is = Files.newInputStream(f.toPath());

通常我会关闭 InputStream 但因为这个简短而简单的代码我无法在 returnbody 调用之前关闭流>.

Normally I would close the InputStream but because this short and simply code I can't close the stream before return or the call of body.

当我运行 lsof(代码在 Linux 上运行)时,我可以看到数千个文件以读取模式打开.所以我确定这个问题是由于流没有关闭引起的.

When I run lsof (the code runs on Linux) I can see that thousands of files are open in read mode. So I am sure this problem is caused by the stream not getting closed.

是否有我应该交易的最佳实践代码?

Is there a best practice code I should trade in ?

推荐答案

可以尝试使用StreamingResponseBody

you can try to use StreamingResponseBody

StreamingResponseBody

用于异步请求处理的控制器方法返回值类型,其中应用程序可以直接写入响应 OutputStream,而无需阻止 Servlet 容器线程.

A controller method return value type for asynchronous request processing where the application can write directly to the response OutputStream without holding up the Servlet container thread.

因为您在一个单独的线程上工作,直接写入响应,所以您在 return 之前调用 close() 的问题得到解决.

Because you are working on a separate thread, writing directly to the response, your problem to call close() before return is solved.

也许你可以从下面的例子开始

probably you can start by the following example

public ResponseEntity<StreamingResponseBody> export(...) throws FileNotFoundException {
    //...

    InputStream inputStream = new FileInputStream(new File("/path/to/example/file"));


    StreamingResponseBody responseBody = outputStream -> {

        int numberOfBytesToWrite;
        byte[] data = new byte[1024];
        while ((numberOfBytesToWrite = inputStream.read(data, 0, data.length)) != -1) {
            System.out.println("Writing some bytes..");
            outputStream.write(data, 0, numberOfBytesToWrite);
        }

        inputStream.close();
    };

    return ResponseEntity.ok()
            .header(HttpHeaders.CONTENT_DISPOSITION, "attachment; filename=generic_file_name.bin")
            .contentType(MediaType.APPLICATION_OCTET_STREAM)
            .body(responseBody);
}

你也可以尝试使用Files(从java 7开始)

You can also try to use Files (since java 7)

所以你不必管理InputStream

    File file = new File("/path/to/example/file");

    StreamingResponseBody responseBody = outputStream -> {
        Files.copy(file.toPath(), outputStream);
    };

正如@Stackee007 在评论中所描述的,在生产环境的重负载下,为 TaskExecutor 定义一个 @Configuration 类来调整参数和管理 @Configuration 类也是一个好习惯.代码>异步进程.

As @Stackee007 described in comment, under heavy load in production environment it's a good practice also to define a @Configuration class for a TaskExecutor to tune parameters and manage Async processes.

@Configuration
@EnableAsync
@EnableScheduling
public class AsyncConfiguration implements AsyncConfigurer {

    private final Logger log = LoggerFactory.getLogger(AsyncConfiguration.class);

    private final TaskExecutionProperties taskExecutionProperties;

    public AsyncConfiguration(TaskExecutionProperties taskExecutionProperties) {
        this.taskExecutionProperties = taskExecutionProperties;
    }

    //  ---------------> Tune parameters here
    @Override
    @Bean(name = "taskExecutor")
    public Executor getAsyncExecutor() {
        log.debug("Creating Async Task Executor");
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(taskExecutionProperties.getPool().getCoreSize());
        executor.setMaxPoolSize(taskExecutionProperties.getPool().getMaxSize());
        executor.setQueueCapacity(taskExecutionProperties.getPool().getQueueCapacity());
        executor.setThreadNamePrefix(taskExecutionProperties.getThreadNamePrefix());
        return executor;
    }
    
    //  ---------------> Use this task executor also for async rest methods
    @Bean
    protected WebMvcConfigurer webMvcConfigurer() {
        return new WebMvcConfigurer() {
            @Override
            public void configureAsyncSupport(AsyncSupportConfigurer configurer) {
                configurer.setTaskExecutor(getTaskExecutor());
            }
        };
    }

    @Bean
    protected ConcurrentTaskExecutor getTaskExecutor() {
        return new ConcurrentTaskExecutor(this.getAsyncExecutor());
    }

    @Override
    public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
        return new SimpleAsyncUncaughtExceptionHandler();
    }
}

如何使用 mockMvc 进行测试

您可以在集成测试中简单地按照以下示例代码进行操作:

You can simply follow this sample code in your integration test as:

    .andExpect(request().asyncStarted())
    .andDo(MvcResult::getAsyncResult)
    .andExpect(status().isOk()).getResponse().getContentAsByteArray();

ResponseEntity 的内容类型是一个 MediaType.APPLICATION_OCTET_STREAM 在这个例子中你可以得到 byte[] (.getContentAsByteArray()>) 但您可以根据您的身体响应内容类型获取所有内容的 String/Json/plaintext.

Content type of ResponseEntity<StreamingResponseBody> is a MediaType.APPLICATION_OCTET_STREAM in this example and you can get byte[] (.getContentAsByteArray()) but you can get String/Json/plaintext of everything depending of your body response content type.

这篇关于使用 ResponseEntity 进行流式传输并确保 InputStream 关​​闭的正确方法的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆