带ResultSet的java.util.stream [英] java.util.stream with ResultSet

查看:174
本文介绍了带ResultSet的java.util.stream的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有几张包含大量数据的表(大约1亿条记录)。所以我无法将这些数据存储在内存中,但我想使用 java.util.stream 类来传输结果集并传递此流去另一个班级。我读过 Stream.of Stream.Builder 运算符,但它们是内存中的缓冲流。那么有什么方法可以解决这个问题吗?
提前致谢。

I have few tables with big amount of data (about 100 million records). So I can't store this data in memory but I would like to stream this result set using java.util.stream class and pass this stream to another class. I read about Stream.of and Stream.Builder operators but they are buffered streams in memory. So is there any way to resolve this question? Thanks in advance.

更新#1

好的我谷歌搜索 jooq 库。我不确定,但看起来它可能适用于我的测试用例。总而言之,我有几张包含大量数据的表格。我想流式传输我的结果集并将此流传输到另一个方法。这样的事情:

Okay I googled and found jooq library. I'm not sure but looks like it could be applicable to my test case. To summarize I have few tables with big amount of data. I would like to stream my resultset and transfer this stream to another method. Something like this:

// why return Stream<String>? Because my result set has String type
private Stream<Record> writeTableToStream(DataSource dataSource, String table) {

    Stream<Record> record = null;
    try (Connection connection = dataSource.getConnection()) {
        String sql = "select * from " + table;

        try (PreparedStatement pSt = connection.prepareStatement(sql)) {
            connection.setAutoCommit(false);
            pSt.setFetchSize(5000);
            ResultSet resultSet = pSt.executeQuery();
            //
            record = DSL.using(connection)
                    .fetch(resultSet).stream();
        }
    } catch (SQLException sqlEx) {
        logger.error(sqlEx);
    }

    return record;
}

请有人建议,我是否正确?谢谢。

Could please someone advise, am I on correct way? Thanks.

更新#2

我在做了一些实验jooq 现在可以说上面的决定不适合我。此代码 record = DSL.using(connection).fetch(resultSet).stream(); 花费太多时间

I made some experiment on jooq and could say now that above decision is not suitable for me. This code record = DSL.using(connection).fetch(resultSet).stream(); takes too much time

推荐答案

首先要了解的是代码,如

The first thing you have to understand is that code like

try (Connection connection = dataSource.getConnection()) {
    …
    try (PreparedStatement pSt = connection.prepareStatement(sql)) {
        …
        return stream;
    }
}

在您离开时不起作用尝试块,资源关闭,而 Stream 的处理甚至还没有开始。

does not work as by the time you leave the try blocks, the resources are closed while the processing of the Stream hasn’t even started.

资源管理构造try with resources适用于方法内块范围内使用的资源,但是您正在创建返回资源的工厂方法。因此,您必须确保关闭返回的流将关闭资源,并且调用者负责关闭 Stream

The resource management construct "try with resources" works for resources used within a block scope inside a method but you are creating a factory method returning a resource. Therefore you have to ensure that the closing of the returned stream will close the resources and the caller is responsible for closing the Stream.

此外,您需要一个函数,该函数从 ResultSet 中生成一行中的项目。假设你有一个方法,如

Further, you need a function which produces an item out of a single line from the ResultSet. Supposing, you have a method like

Record createRecord(ResultSet rs) {
    …
}

你可以创建一个 Stream< Record> 基本上喜欢

you may create a Stream<Record> basically like

Stream<Record> stream = StreamSupport.stream(new Spliterators.AbstractSpliterator<Record>(
    Long.MAX_VALUE,Spliterator.ORDERED) {
        @Override
        public boolean tryAdvance(Consumer<? super Record> action) {
            if(!resultSet.next()) return false;
            action.accept(createRecord(resultSet));
            return true;
        }
    }, false);

但为了正确地做到这一点,你必须加入异常处理和关闭资源。您可以使用 Stream.onClose 来注册将在 Stream 关闭时执行的操作,但必须是一个 Runnable ,它不能抛出已检查的异常。类似地,不允许 tryAdvance 方法抛出已检查的异常。因为我们不能简单地嵌套 try(...)块,所以在关闭中抛出抑制异常的程序逻辑,当已经存在待处理的异常时,不是免费的。

But to do it correctly you have to incorporate the exception handling and closing of resources. You can use Stream.onClose to register an action that will be performed when the Stream gets closed, but it has to be a Runnable which can not throw checked exceptions. Similarly the tryAdvance method is not allowed to throw checked exceptions. And since we can’t simply nest try(…) blocks here, the program logic of suppression exceptions thrown in close, when there is already a pending exception, doesn’t come for free.

为了帮助我们,我们引入了一种新的类型,它可以包含可能抛出检查的关闭操作异常并将它们包含在未经检查的异常中。通过实现 AutoCloseable 本身,它可以利用 try(...)构造来安全地链接关闭操作:

To help us here, we introduce a new type which can wrap closing operations which may throw checked exceptions and deliver them wrapped in an unchecked exception. By implementing AutoCloseable itself, it can utilize the try(…) construct to chain close operations safely:

interface UncheckedCloseable extends Runnable, AutoCloseable {
    default void run() {
        try { close(); } catch(Exception ex) { throw new RuntimeException(ex); }
    }
    static UncheckedCloseable wrap(AutoCloseable c) {
        return c::close;
    }
    default UncheckedCloseable nest(AutoCloseable c) {
        return ()->{ try(UncheckedCloseable c1=this) { c.close(); } };
    }
}

这样,整个操作变为:

private Stream<Record> tableAsStream(DataSource dataSource, String table)
    throws SQLException {

    UncheckedCloseable close=null;
    try {
        Connection connection = dataSource.getConnection();
        close=UncheckedCloseable.wrap(connection);
        String sql = "select * from " + table;
        PreparedStatement pSt = connection.prepareStatement(sql);
        close=close.nest(pSt);
        connection.setAutoCommit(false);
        pSt.setFetchSize(5000);
        ResultSet resultSet = pSt.executeQuery();
        close=close.nest(resultSet);
        return StreamSupport.stream(new Spliterators.AbstractSpliterator<Record>(
            Long.MAX_VALUE,Spliterator.ORDERED) {
            @Override
            public boolean tryAdvance(Consumer<? super Record> action) {
                try {
                    if(!resultSet.next()) return false;
                    action.accept(createRecord(resultSet));
                    return true;
                } catch(SQLException ex) {
                    throw new RuntimeException(ex);
                }
            }
        }, false).onClose(close);
    } catch(SQLException sqlEx) {
        if(close!=null)
            try { close.close(); } catch(Exception ex) { sqlEx.addSuppressed(ex); }
        throw sqlEx;
    }
}

此方法包含所有资源的必要关闭操作,实用程序的一个实例中的连接语句 ResultSet 上面描述的类。如果在初始化期间发生异常,则立即执行关闭操作,并将异常传递给调用者。如果流构建成功,则通过 onClose 注册关闭操作。

This method wraps the necessary close operation for all resources, Connection, Statement and ResultSet within one instance of the utility class described above. If an exception happens during the initialization, the close operation is performed immediately and the exception is delivered to the caller. If the stream construction succeeds, the close operation is registered via onClose.

因此调用者必须确保正确关闭喜欢

Therefore the caller has to ensure proper closing like

try(Stream<Record> s=tableAsStream(dataSource, table)) {
    // stream operation
}

请注意,还要交付 SQLException via RuntimeException 已添加到 tryAdvance 方法中。因此,您现在可以添加将SQLException 抛出到 createRecord 方法而不会出现问题。

Note that also the delivery of an SQLException via RuntimeException has been added to the tryAdvance method. Therefore you may now add throws SQLException to the createRecord method without problems.

这篇关于带ResultSet的java.util.stream的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆