不断从表数据库获取内容 [英] Continuously getting content from table database

查看:78
本文介绍了不断从表数据库获取内容的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个代码此处:

package testcode;

import java.sql.*;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;


public class ProducerClear2 {

    public static String vardbserver;
    public static String vardbuser;
    public static String vardbpassword;
    public static String vardbname;

    public static void main(String[] args) {
        vardbserver = "TestDBtoMQ";
        vardbuser = "postgresql";
        vardbpassword = "admin";

        ConnectionFactory factory = null;
        javax.jms.Connection connection = null;
        Session session = null;
        Destination destination = null;
        MessageProducer producer = null;
        try {
            factory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_BROKER_URL);
            connection = factory.createConnection();
            connection.start();
            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            destination = session.createQueue("TestQueue");
            producer = session.createProducer(destination);

            Class.forName("org.postgresql.Driver");
            System.out.println("----------------------------");

            try (Connection c = DriverManager.getConnection("jdbc:postgresql://localhost:5432/" + vardbserver, vardbuser, vardbpassword);
                 PreparedStatement stmt = c.prepareStatement("SELECT * FROM MESSAGES where xmin::varchar::bigint > ? and xmin::varchar::bigint < ? ");
                 PreparedStatement max = c.prepareStatement("select max(xmin::varchar::bigint) as txid from messages")
            ) {
                c.setAutoCommit(false);

                Long previousTxId = 0L;
                Long nextTxId = 0L;

                while (true) {
                    stmt.clearParameters();

                    try (ResultSet rs = max.executeQuery()) {
                        if (rs.next()) {
                            nextTxId = rs.getLong(1);
                        }
                    }

                    stmt.setLong(1, previousTxId);
                    stmt.setLong(2, nextTxId + 1);
                    try (ResultSet rs = stmt.executeQuery()) {
                        while (rs.next()) {
                            String message = rs.getString("MESSAGE");
                            System.out.println("Message = " + message);
                            TextMessage mssg = session.createTextMessage(message);
                            System.out.println("Sent: " + mssg.getText());
                            producer.send(mssg);
                        }
                        previousTxId = nextTxId;
                    }
                    Thread.sleep(5000);
                }
            }
        } catch (JMSException e) {
            e.printStackTrace();
        } catch (Exception e) {
            System.err.println(e.getClass().getName() + ": " + e.getMessage());
        } finally {
            if (session != null) {
                try {
                    session.close();
                } catch (JMSException ex) {
                    // ignore
                }
            }
            if (connection != null) {
                try {
                    connection.close();
                } catch (JMSException ex) {
                    // ignore
                }
            }
        }
        System.out.println("----------------------------");
        System.out.println("Message sent successfully");
    }
}

基本上,该应用程序可以在数据库表中获取内容并将其发送到ActiveMQ.当表更新时,它将发送刚刚更新的内容(不发送已发送的过去的内容).但是此代码仅适用于PostgreSQL

Basically, the app works to get contents inside a database's table and sent it to ActiveMQ. And when the table updated, it will sent the content that just updated (not sending the past that was sent). But this code only works on PostgreSQL

然后我打算创建一个"if"功能.因此,我可以使用另一个数据库来获取数据(Oracle和MySQL).

Then i'm planning to create an "if" function. So i can use another database to getting the data (Oracle and MySQL).

xmin仍然适用于Oracle和MySQL吗?所以我只需要更改服务器URL?还是我需要更改Oracle和MySQL的代码?

Is the xmin still works for Oracle and MySQL? So i just need to change the server URL? Or i need to change the code for Oracle and MySQL?

推荐答案

使用限制简单地找到答案,将每个限制行保存到文件中,并为每个日常文件使用日期....

Simply found the answer, using limit, save it's every limit row to a file and use a date for every daily file....

              if(vardbtype.equals("MYSQL")){
                    Class.forName("com.mysql.jdbc.Driver");
                    System.out.println("----------------------------");
                    int limitrowmysql = 0;
                    LocalDate now = LocalDate.now();
                    Path path = FileSystems.getDefault().getPath("C:\\Users\\NN\\Documents\\Test\\RowMYSQL\\RowIDMYSQL_" + now.format(DateTimeFormatter.ISO_LOCAL_DATE) + ".txt");
                    if (Files.exists(path)) {
                        String latestRowIdFromFile  = Files.lines(path).max((e1, e2) -> {
                        if (((String)e1).isEmpty() || ((String)e2).isEmpty()) {
                            return -1;
                        }
                        return new Long(e1).compareTo(new Long(e2));
                        }).get(); // read latestRowId from file
                        if (latestRowIdFromFile != null && !latestRowIdFromFile.isEmpty()) {
                            limitrowmysql = Integer.valueOf(latestRowIdFromFile);
                        }
                    } else {
                        limitrowmysql = 0;
                    }
                    Connection c = DriverManager.getConnection("jdbc:mysql://localhost:3306/"+ vardbserver, vardbuser, vardbpassword);
                    while(true) {
                        Statement stmts = c.createStatement();
                        int countrowmysql = 0;
                        String sql = ("SELECT * FROM "+ vardbname +" LIMIT "+ limitrowmysql +", 18446744073709551615");
                        ResultSet rss = stmts.executeQuery(sql);
                        while(rss.next()) {
                            String  message = rss.getString("MESSAGE");
                            System.out.println("Message = " + message);
                            TextMessage mssg = session.createTextMessage(message);
                            System.out.println("Sent: " + mssg.getText());
                            producer.send(mssg);
                            countrowmysql = countrowmysql + 1;
                        }

                        rss.close();
                        stmts.close();
                        Thread.sleep(batchperiod2);

                        limitrowmysql = limitrowmysql + countrowmysql;
                        Files.write(path, ("\n" + limitrowmysql).getBytes()); // write latestRowId to file
                    }
                }

这篇关于不断从表数据库获取内容的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆