Apache Storm-从SPOUT访问数据库-连接池 [英] Apache Storm - Accessing database from SPOUT - connection pooling

查看:87
本文介绍了Apache Storm-从SPOUT访问数据库-连接池的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

具有一个喷嘴,在每个刻度上它都会进入Postgre数据库并读取另一行.喷口代码如下:

Having a spout which on each tick goes to Postgre database and reads an additional row. The spout code looks as follows:

class RawDataLevelSpout extends BaseRichSpout implements Serializable {


private int counter;

SpoutOutputCollector collector;


@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
    declarer.declare(new Fields("col1", "col2"));
}

@Override
public void open(Map map, TopologyContext context, SpoutOutputCollector spoutOutputCollector) {
    collector = spoutOutputCollector;
}

private Connection initializeDatabaseConnection() {

    try {
        Class.forName("org.postgresql.Driver");
        Connection connection = null;
        connection = DriverManager.getConnection(
                DATABASE_URI,"root", "root");
        return connection;
    } catch (ClassNotFoundException e) {
        e.printStackTrace();
    } catch (SQLException e) {
        e.printStackTrace();
    }
    return null;
}

@Override
public void close() {

}

@Override
public void nextTuple() {
    List<String> values = new ArrayList<>();

    PreparedStatement statement = null;
    try {
        Connection connection = initializeDatabaseConnection();
        statement = connection.prepareStatement("SELECT * FROM table1 ORDER BY col1 LIMIT 1 OFFSET ?");
        statement.setInt(1, counter++);
        ResultSet resultSet = statement.executeQuery();
        resultSet.next();
        ResultSetMetaData resultSetMetaData = resultSet.getMetaData();
        int totalColumns = resultSetMetaData.getColumnCount();
        for (int i = 1; i <= totalColumns; i++) {
            String value = resultSet.getString(i);
            values.add(value);
        }


        connection.close();
    } catch (SQLException e) {
        e.printStackTrace();
    }
    collector.emit(new Values(values.stream().toArray(String[]::new)));
}

}

在Apache Storm中,如何在Spouts中处理连接池的标准方法是什么?此外,是否可以通过某种方式在群集拓扑中的多个正在运行的实例之间同步coutner变量?

What is the standard way how to approach connection pooling in Spouts in apache storm? Furthermore, is it possible to somehow synchronize the coutner variable accross multiple running instances within the cluster topology?

推荐答案

关于连接池,您可以根据需要通过静态变量来池化连接,但是由于不能保证所有spout实例都在同一JVM中运行,我认为没有任何意义.

Regarding connection pooling, you could pool connections via static variable if you wanted, but since you aren't guaranteed to have all spout instances running in the same JVM, I don't think there's any point.

否,无法同步计数器.喷口实例可能在不同的JVM上运行,并且您不希望它们在喷口同意计数器值的同时全部阻塞.我认为您的喷口实施没有道理.如果您想一次只读取一行,为什么不只运行一个spout实例而不是尝试同步多个spout?

No, there is no way to synchronize the counter. The spout instances may be running on different JVMs, and you don't want them all blocking while the spouts agree what the counter value is. I don't think your spout implementation makes sense though. If you wanted to just read one row at a time, why would you not just run a single spout instance instead of trying to synchronize multiple spouts?

您似乎正在尝试将关系数据库用作队列系统,这可能是不合适的.考虑例如卡夫卡代替.我认为您应该可以使用 https://www.confluent.io/product/中的任何一个Connectors/ http://debezium.io/,将数据从Postgres传输到Kafka.

You seem to be trying to use your relational database as a queue system, which is probably a bad fit. Consider e.g. Kafka instead. I think you should be able to use either one of https://www.confluent.io/product/connectors/ or http://debezium.io/ to stream data from your Postgres to Kafka.

这篇关于Apache Storm-从SPOUT访问数据库-连接池的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆