Spring Batch-FlatFileItemWriter错误14416:流已关闭 [英] Spring Batch - FlatFileItemWriter Error 14416: Stream is already closed

查看:11
本文介绍了Spring Batch-FlatFileItemWriter错误14416:流已关闭的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

基本上我有一个Spring批处理,它查询数据库并实现分区程序以获取作业,并将作业分配给SlaveStep中的ThreadPoolTaskExecutor。

读取器从数据库读取(作业)。编写器将数据加载到Azure Blob存储中的CSV文件。

作业分割器和读取器工作正常。Writer写入一个文件,然后关闭,其他作业无法完成,因为流已关闭。我收到以下错误:

Reading: market1
Reading: market2
Reading: market3
Reading: market4
Reading: market5
Writter: /upload-demo/market3_2021-06-01.csv
Writter: /upload-demo/market5_2021-06-01.csv
Writter: /upload-demo/market4_63_2021-06-01.csv
Writter: /upload-demo/market2_2021-06-01.csv
Writter: /upload-demo/market1_11_2021-06-01.csv
2021-06-02 08:24:42.304 ERROR 20356 --- [ taskExecutor-3] c.a.storage.common.StorageOutputStream   : Stream is already closed.
2021-06-02 08:24:42.307  WARN 20356 --- [ taskExecutor-3] o.s.b.f.support.DisposableBeanAdapter    : Destroy method 'close' on bean with name 'scopedTarget.writer2' threw an exception: java.lang.RuntimeException: Stream is already closed.
Reading: market6
Writter: /upload-demo/market6_2021-06-01.csv

这是我的批配置:

@EnableBatchProcessing
@Configuration
public class BatchConfig extends DefaultBatchConfigurer {

    String connectionString = "azureConnectionString";

    String containerName = "upload-demo";

    String endpoint = "azureHttpsEndpoint";

    String accountName ="azureAccountName";
    String accountKey = "accountKey";

    StorageSharedKeyCredential credential = new StorageSharedKeyCredential(accountName, accountKey);
    BlobServiceClient client = new BlobServiceClientBuilder().connectionString(connectionString).endpoint(endpoint).buildClient();

    @Autowired
    private StepBuilderFactory steps;

    @Autowired
    private JobBuilderFactory jobs;

    @Autowired
    @Qualifier("verticaDb")
    private DataSource verticaDataSource;

    @Autowired
    private PlatformTransactionManager transactionManager;

    @Autowired
    private ConsoleItemWriter consoleItemWriter;

    @Autowired
    private ItemWriter itemWriter;

    @Bean
    public Job job() throws Exception {
        return jobs.get("job1")
                .start(masterStep(null, null))
                .incrementer(new RunIdIncrementer())
                .build();
    }

    @Bean
    public ThreadPoolTaskExecutor taskExecutor() {
        ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
        taskExecutor.setCorePoolSize(5);
        taskExecutor.setMaxPoolSize(10);
        taskExecutor.initialize();
        return taskExecutor;
    }

    @Bean
    @JobScope
    public Step masterStep(@Value("#{jobParameters['startDate']}") String startDate,
                           @Value("#{jobParameters['endDate']}") String endDate) throws Exception {

        return steps.get("masterStep")
                .partitioner(slaveStep().getName(), new RangePartitioner(verticaDataSource, startDate, endDate))
                .step(slaveStep())
                .gridSize(5)
                .taskExecutor(taskExecutor())
                .build();
    }

    @Bean
    public Step slaveStep() throws Exception {
        return steps.get("slaveStep")
                .<MarketData, MarketData>chunk(100)
                .reader(pagingItemReader(null, null, null))
                .faultTolerant()
                .skip(NullPointerException.class)
                .skipPolicy(new AlwaysSkipItemSkipPolicy())
                .writer(writer2(null, null, null))  //consoleItemWriter
                .build();
    }

    @Bean
    @StepScope
    public JdbcPagingItemReader pagingItemReader(
            @Value("#{stepExecutionContext['MarketName']}") String marketName,
            @Value("#{jobParameters['startDate']}") String startDate,
            @Value("#{jobParameters['endDate']}") String endDate
            ) throws Exception {

System.out.println("Reading: " + marketName);

        SqlPagingQueryProviderFactoryBean provider = new SqlPagingQueryProviderFactoryBean();

        Map<String, Order> sortKey = new HashMap<>();
        sortKey.put("xbin", Order.ASCENDING);
        sortKey.put("ybin", Order.ASCENDING);

        provider.setDataSource(this.verticaDataSource);
        provider.setDatabaseType("POSTGRES");
        provider.setSelectClause("SELECT MARKET AS market, EPSG AS epsg, XBIN AS xbin, YBIN AS ybin, " +
                        "LATITUDE AS latitude, LONGITUDE AS longitude, " +
                        "SUM(TOTALUPLINKVOLUME) AS totalDownlinkVol, SUM(TOTALDOWNLINKVOLUME) AS totalUplinkVol");
        provider.setFromClause("FROM views.geo_analytics");
        provider.setWhereClause(
                "WHERE market='" + marketName + "'" +
                        " AND STARTTIME >= '" + startDate + "'" +
                        " AND STARTTIME < '" + endDate + "'" +
                        " AND TOTALUPLINKVOLUME IS NOT NULL" +
                        " AND TOTALUPLINKVOLUME > 0" +
                        " AND TOTALDOWNLINKVOLUME IS NOT NULL" +
                        " AND TOTALDOWNLINKVOLUME > 0" +
                        " AND EPSG IS NOT NULL" +
                        " AND LATITUDE IS NOT NULL" +
                        " AND LONGITUDE IS NOT NULL" +
                        " AND XBIN IS NOT NULL" +
                        " AND YBIN IS NOT NULL"
        );
        provider.setGroupClause("GROUP BY XBIN, YBIN, MARKET, EPSG, LATITUDE, LONGITUDE");
        provider.setSortKeys(sortKey);

        JdbcPagingItemReader reader = new JdbcPagingItemReader();
        reader.setDataSource(this.verticaDataSource);
        reader.setQueryProvider(provider.getObject());
        reader.setFetchSize(1000);
        reader.setRowMapper(new BeanPropertyRowMapper() {
            {
                setMappedClass((MarketData.class));
            }
        });
        return reader;
    }

    @Bean
    @StepScope
    public FlatFileItemWriter<MarketData> writer2(@Value("#{jobParameters['yearMonth']}") String yearMonth,
                                                 @Value("#{stepExecutionContext['marketName']}") String marketName,
                                                 @Value("#{jobParameters['startDate']}") String startDate) throws URISyntaxException, InvalidKeyException, StorageException, IOException {

        AZBlobWriter<MarketData> writer = new AZBlobWriter<>();

        String fullPath =marketName + "_" + startDate + ".csv";
        String resourceString = "azure-blob://upload-demo/" + fullPath;

        CloudStorageAccount storageAccount = CloudStorageAccount.parse(connectionString);
        CloudBlobClient blobClient = storageAccount.createCloudBlobClient();
        CloudBlobContainer container2 = blobClient.getContainerReference(containerName);
        container2.createIfNotExists();

        AzureStorageResourcePatternResolver storageResourcePatternResolver = new AzureStorageResourcePatternResolver(client);
        Resource resource = storageResourcePatternResolver.getResource(resourceString);


System.out.println("Writter: " + resource.getURI().getPath().toString());

        writer.setResource(resource);
        writer.setStorage(container2);

        writer.setLineAggregator(new DelimitedLineAggregator<MarketData>() {
            {
                setDelimiter(",");
                setFieldExtractor(new BeanWrapperFieldExtractor<MarketData>() {
                    {
                        setNames(new String[] {
                                "market",
                                "epsg",
                                "xbin",
                                "ybin",
                                "latitude",
                                "longitude",
                                "totalDownlinkVol",
                                "totalUplinkVol"
                        });
                    }
                });
            }
        });
        return writer;
    }
}

以前我遇到了其他问题,例如将FlatFileWriter的资源设置为Azure Blob,Spring Batch / Azure Storage account blob resource [container"foo", blob='bar'] cannot be resolved to absolute file path

按照@Mahmoud Ben Hassine的建议,实现Azure Blob的FlatFileWriter。

我在本文中用作基础(GCP)的FlatFileWriter的实现:how to configure FlatFileItemWriter to output the file to a ByteArrayRecource?

下面是Azure Blob的实现:

public class AZBlobWriter<T> extends FlatFileItemWriter<T> {

    private CloudBlobContainer storage;
    private Resource resource;

    private static final String DEFAULT_LINE_SEPARATOR = System.getProperty("line.separator");
    private OutputStream os;

    private String lineSeparator = DEFAULT_LINE_SEPARATOR;

    @Override
    public void write(List<? extends T> items) throws Exception {

        StringBuilder lines = new StringBuilder();
        for (T item : items) {
            lines.append(item).append(lineSeparator);
        }
        byte[] bytes = lines.toString().getBytes();
        try {
            os.write(bytes);
        }
        catch (IOException e) {
            throw new WriteFailedException("Could not write data.  The file may be corrupt.", e);
        }
        os.flush();
    }

    @Override
    public void open(ExecutionContext executionContext) {
        try {
            os = ((WritableResource)resource).getOutputStream();
            String bucket = resource.getURI().getHost();
            String filePath = resource.getURI().getPath().substring(1);

            CloudBlockBlob blob = storage.getBlockBlobReference(filePath);
        } catch (IOException e) {
            e.printStackTrace();
        } catch (StorageException e) {
            e.printStackTrace();
        } catch (URISyntaxException e) {
            e.printStackTrace();
        }
    }

    @Override
    public void update(ExecutionContext executionContext) {
    }

    @Override
    public void close() {
        super.close();

        try {
            os.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    public void setStorage(CloudBlobContainer storage) {
        this.storage = storage;
    }
    @Override
    public void setResource(Resource resource) {
        this.resource = resource;
    }
}

任何帮助我都非常感激。我对肮脏的代码表示歉意,因为我仍在测试/开发它。

thx,Markus。

推荐答案

您没有共享整个堆栈跟踪以查看此错误准确发生的时间,但close方法似乎被多次调用。我认为这不是由于并发问题,因为我看到您在分区步骤中每个线程使用一个编写器。因此,通过在关闭输出流之前检查输出流是否已关闭(输出流上没有isClosed方法,因此您可以使用自定义布尔值),我将使此方法重新进入。

也就是说,我首先要确认close方法被调用了两次,如果是这样,请调查原因并解决根本原因。

这篇关于Spring Batch-FlatFileItemWriter错误14416:流已关闭的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆