Junit无法使用Spark Structured Streaming创建的文件删除@TempDir [英] Junit cannot delete @TempDir with file created by Spark Structured Streaming

查看:235
本文介绍了Junit无法使用Spark Structured Streaming创建的文件删除@TempDir的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我为管道创建了集成测试,以检查是否生成了正确的CSV文件:

I created an integration test for my pipeline to check if the right CSV file is generated:

class CsvBatchSinkTest {

    @RegisterExtension
    static SparkExtension spark = new SparkExtension();

    @TempDir
    static Path directory;

    //this checks if the file is already available
    static boolean isFileWithSuffixAvailable(File directory, String suffix) throws IOException {
        return Files.walk(directory.toPath()).anyMatch(f -> f.toString().endsWith(suffix));
    }

    //this gets content of file
    static List<String> extractFileWithSuffixContent(File file, String suffix) throws IOException {
        return Files.readAllLines(
                Files.walk(file.toPath())
                        .filter(f -> f.toString().endsWith(suffix))
                        .findFirst()
                        .orElseThrow(AssertionException::new));
    }

    @Test
    @DisplayName("When correct dataset is sent to sink, then correct csv file should be generated.")
    void testWrite() throws IOException, InterruptedException {

        File file = new File(directory.toFile(), "output");


        List<Row> data =
                asList(RowFactory.create("value1", "value2"), RowFactory.create("value3", "value4"));

        Dataset<Row> dataset =
                spark.session().createDataFrame(data, CommonTestSchemas.SCHEMA_2_STRING_FIELDS);

         dataset.coalesce(1)
                .write()
                .option("header", "true")
                .option("delimiter", ";")
                .csv(file.getAbsolutePath());

        Awaitility.await()
                .atMost(10, TimeUnit.SECONDS)
                .until(() -> isFileWithSuffixAvailable(file, ".csv"));

        Awaitility.await()
                .atMost(10, TimeUnit.SECONDS)
                .untilAsserted(
                        () ->
                                assertThat(extractFileWithSuffixContent(file, ".csv"))
                                        .containsExactlyInAnyOrder("field1;field2", "value1;value2", "value3;value4"));
    }
}

实际代码看起来有些不同,这只是一个可重现的示例.

The real code looks a little bit different, it is just an reproducible example.

Spark扩展程序仅在每次测试之前启动本地火花,并在之后关闭.

Spark extension just starts local spark before every test and closes is after.

测试通过,但是随后junit尝试清理@TempDir时,会引发以下异常:

The test passes, but then when junit tries to clean up @TempDir following exception is thrown:

无法删除临时目录C:\ Users \ RK03GJ \ AppData \ Local \ Temp \ junit596680345801656194.以下路径无法删除

Failed to delete temp directory C:\Users\RK03GJ\AppData\Local\Temp\junit596680345801656194. The following paths could not be deleted

我可以以某种方式解决此错误吗?我尝试等待spark停止使用awaility,但并没有真正帮助.

Can I somehow fix this error? I tried waiting for spark to stop using awaility, but I didn't really help.

也许我可以以某种方式忽略此错误?

Maybe I can somehow ignore this error?

推荐答案

快速猜测:您需要关闭

Quick guess: you need to close the stream returned by Files.walk. Quote from the docs:

如果需要及时处理文件系统资源,则应使用try-with-resources构造来确保流的

If timely disposal of file system resources is required, the try-with-resources construct should be used to ensure that the stream's close method is invoked after the stream operations are completed.

-要解决此问题,请在isFileWithSuffixAvailable方法中添加try-with-resources:

To fix this add a try-with-resources in the isFileWithSuffixAvailable method:

static boolean isFileWithSuffixAvailable(File directory, String suffix) throws IOException {
    try (Stream<Path> walk = Files.walk(directory.toPath())) {
        return walk.anyMatch(f -> f.toString().endsWith(suffix));
    }
}

这篇关于Junit无法使用Spark Structured Streaming创建的文件删除@TempDir的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
相关文章
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆