为什么在单独的函数中使用Postgres Replication Stream时不起作用? [英] Why Postgres Replication Stream doesn't work when used in separate function?

查看:155
本文介绍了为什么在单独的函数中使用Postgres Replication Stream时不起作用?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在研究postgres复制流API。在进行处理时,遇到了异常行为。

I am working on postgres replication stream API. While working on it came across unusual behavior.

当我使用复制插槽在main块中编写整个代码时,一切正常。

When I use replication slot writing whole code inside main block, everything works fine.

public class Server implements Config {

public static void main(String[] args) {
    Properties prop = new Properties();
    prop.load(new FileInputStream(System.getProperty("prop")));

    String user = prop.getProperty("user");
    String password = prop.getProperty("password");
    String url = prop.getProperty("url");

    Properties props = new Properties();
    PGProperty.USER.set(props, user);
    PGProperty.PASSWORD.set(props, password);
    PGProperty.ASSUME_MIN_SERVER_VERSION.set(props, "9.4");
    PGProperty.REPLICATION.set(props, "database");
    PGProperty.PREFER_QUERY_MODE.set(props, "simple");

    Connection conn= null;
    PGConnection replicationConnection= null;
    PGReplicationStream stream = null;

        conn = DriverManager.getConnection(url, props);
        replicationConnection = conn.unwrap(PGConnection.class);
        stream = replicationConnection.getReplicationAPI().replicationStream().logical()
                .withSlotName("replication_slot")
                .withSlotOption("include-xids", true)
                .withSlotOption("include-timestamp", "on")
                .withSlotOption("skip-empty-xacts", true)
                .withStatusInterval(20, TimeUnit.SECONDS).start();

    while (true) {
            ByteBuffer msg;
            try {
                msg = stream.readPending();         

            if (msg == null) {
                TimeUnit.MILLISECONDS.sleep(10L);
                continue;
            }

            int offset = msg.arrayOffset();
            byte[] source = msg.array();
            int length = source.length - offset;
            // convert byte buffer into string
            String data = new String(source, offset, length);

            // then convert it into bufferedreader
            BufferedReader reader = new BufferedReader(new StringReader(data));
            String line = reader.readLine();

            while (line != null) {
                System.out.println(line);
                line = reader.readLine();
            }
            stream.setAppliedLSN(stream.getLastReceiveLSN());
            stream.setFlushedLSN(stream.getLastReceiveLSN());
        } catch (SQLException | IOException | InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
        }
}

但是当我尝试分离时流逻辑使用这样的单独方法

But when I try to separate stream logic using separate method like this

public class Server implements Config {

public static void main(String[] args) {
    PGReplicationStream stream = getReplicationStream();
        while (true) {
            ByteBuffer msg;
            try {
                msg = stream.readPending();
    if (msg == null) {
                TimeUnit.MILLISECONDS.sleep(10L);
                continue;
            }

            int offset = msg.arrayOffset();
            byte[] source = msg.array();
            int length = source.length - offset;
            String data = new String(source, offset, length);

            BufferedReader reader = new BufferedReader(new StringReader(data));
            String line = reader.readLine();

            while (line != null) {
                System.out.println(line);
                line = reader.readLine();
            }
            stream.setAppliedLSN(stream.getLastReceiveLSN());
            stream.setFlushedLSN(stream.getLastReceiveLSN());
        } catch (SQLException | IOException | InterruptedException e) {
            e.printStackTrace();
        }
        }

}
public static PGReplicationStream getReplicationStream() {
    Properties prop = new Properties();
    try {
        prop.load(new FileInputStream(System.getProperty("prop")));
    } catch (IOException e1) {
        // TODO Auto-generated catch block
        e1.printStackTrace();
    }

    String user = prop.getProperty("user");
    String password = prop.getProperty("password");
    String url = prop.getProperty("url");

    Properties props = new Properties();
    PGProperty.USER.set(props, user);
    PGProperty.PASSWORD.set(props, password);
    PGProperty.ASSUME_MIN_SERVER_VERSION.set(props, "9.4");
    PGProperty.REPLICATION.set(props, "database");
    PGProperty.PREFER_QUERY_MODE.set(props, "simple");

    Connection conn= null;
    PGConnection replicationConnection= null;
    PGReplicationStream stream = null;
    try {
        conn = DriverManager.getConnection(url, props);
        replicationConnection = conn.unwrap(PGConnection.class);
        stream = replicationConnection.getReplicationAPI().replicationStream().logical()
                .withSlotName("replication_slot")
                .withSlotOption("include-xids", true)
                .withSlotOption("include-timestamp", "on")
                .withSlotOption("skip-empty-xacts", true)
                .withStatusInterval(20, TimeUnit.SECONDS).start();
    } catch (SQLException e) {
        e.printStackTrace();
    }
    return stream;
}

}

读取一些数据后,程序给出错误

After reading some data, program is giving error

org.postgresql.util.PSQLException: Database connection failed when reading from copy
at org.postgresql.core.v3.QueryExecutorImpl.readFromCopy(QueryExecutorImpl.java:1028)
at org.postgresql.core.v3.CopyDualImpl.readFromCopy(CopyDualImpl.java:41)
at org.postgresql.core.v3.replication.V3PGReplicationStream.receiveNextData(V3PGReplicationStream.java:155)
at org.postgresql.core.v3.replication.V3PGReplicationStream.readInternal(V3PGReplicationStream.java:124)
at org.postgresql.core.v3.replication.V3PGReplicationStream.readPending(V3PGReplicationStream.java:78)
at Server.main(Server.java:47)
Caused by: java.net.SocketException: Socket closed
at java.net.SocketInputStream.socketRead0(Native Method)
at java.net.SocketInputStream.socketRead(SocketInputStream.java:116)
at java.net.SocketInputStream.read(SocketInputStream.java:171)
at java.net.SocketInputStream.read(SocketInputStream.java:141)
at org.postgresql.core.VisibleBufferedInputStream.readMore(VisibleBufferedInputStream.java:140)
at org.postgresql.core.VisibleBufferedInputStream.ensureBytes(VisibleBufferedInputStream.java:109)
at org.postgresql.core.VisibleBufferedInputStream.read(VisibleBufferedInputStream.java:191)
at org.postgresql.core.PGStream.receive(PGStream.java:495)
at org.postgresql.core.PGStream.receive(PGStream.java:479)
at org.postgresql.core.v3.QueryExecutorImpl.processCopyResults(QueryExecutorImpl.java:1161)
at org.postgresql.core.v3.QueryExecutorImpl.readFromCopy(QueryExecutorImpl.java:1026)
... 5 more

我认为这两种方法之间没有任何区别。但是该程序的行为有所不同。有人可以解释一下第二种方法有什么问题吗?

I don't see any difference between both approach. But the program is behaving differently. Can someone explain, what is wrong with second approach.

推荐答案

我相信您的 Connection 问题。一旦函数返回,它将超出范围,由垃圾收集器来收集和完成。最后,连接被关闭,然后程序可能会失败。尝试在主要方法可用的范围内移动连接和其他必需的中间变量,然后重试。

I believe your problem with with Connection. It gets out of scope once your function returns and it is upto garbage collector to collect it and finalize. in finalize, connection is closed and probably then your program fails. Try moving connection and other required intermediate variables in a scope that is available in your main method and try again.

这篇关于为什么在单独的函数中使用Postgres Replication Stream时不起作用?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆