Apache Storm - 从 SPOUT 访问数据库 - 连接池 [英] Apache Storm - Accessing database from SPOUT - connection pooling

查看:25
本文介绍了Apache Storm - 从 SPOUT 访问数据库 - 连接池的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

有一个喷口,每次滴答都会进入 Postgre 数据库并读取额外的一行.喷口代码如下所示:

Having a spout which on each tick goes to Postgre database and reads an additional row. The spout code looks as follows:

class RawDataLevelSpout extends BaseRichSpout implements Serializable {


private int counter;

SpoutOutputCollector collector;


@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
    declarer.declare(new Fields("col1", "col2"));
}

@Override
public void open(Map map, TopologyContext context, SpoutOutputCollector spoutOutputCollector) {
    collector = spoutOutputCollector;
}

private Connection initializeDatabaseConnection() {

    try {
        Class.forName("org.postgresql.Driver");
        Connection connection = null;
        connection = DriverManager.getConnection(
                DATABASE_URI,"root", "root");
        return connection;
    } catch (ClassNotFoundException e) {
        e.printStackTrace();
    } catch (SQLException e) {
        e.printStackTrace();
    }
    return null;
}

@Override
public void close() {

}

@Override
public void nextTuple() {
    List<String> values = new ArrayList<>();

    PreparedStatement statement = null;
    try {
        Connection connection = initializeDatabaseConnection();
        statement = connection.prepareStatement("SELECT * FROM table1 ORDER BY col1 LIMIT 1 OFFSET ?");
        statement.setInt(1, counter++);
        ResultSet resultSet = statement.executeQuery();
        resultSet.next();
        ResultSetMetaData resultSetMetaData = resultSet.getMetaData();
        int totalColumns = resultSetMetaData.getColumnCount();
        for (int i = 1; i <= totalColumns; i++) {
            String value = resultSet.getString(i);
            values.add(value);
        }


        connection.close();
    } catch (SQLException e) {
        e.printStackTrace();
    }
    collector.emit(new Values(values.stream().toArray(String[]::new)));
}

}

在 Apache Storm 的 Spouts 中处理连接池的标准方法是什么?此外,是否有可能以某种方式同步集群拓扑中多个运行实例的 coutner 变量?

What is the standard way how to approach connection pooling in Spouts in apache storm? Furthermore, is it possible to somehow synchronize the coutner variable accross multiple running instances within the cluster topology?

推荐答案

关于连接池,如果需要,您可以通过静态变量池连接,但由于不能保证所有 spout 实例都在同一个 JVM 中运行,我认为没有任何意义.

Regarding connection pooling, you could pool connections via static variable if you wanted, but since you aren't guaranteed to have all spout instances running in the same JVM, I don't think there's any point.

不,没有办法同步计数器.spout 实例可能运行在不同的 JVM 上,并且您不希望它们在 spout 同意计数器值时全部阻塞.不过,我认为您的 spout 实现没有意义.如果您只想一次读取一行,为什么不只运行单个 spout 实例,而是尝试同步多个 spout?

No, there is no way to synchronize the counter. The spout instances may be running on different JVMs, and you don't want them all blocking while the spouts agree what the counter value is. I don't think your spout implementation makes sense though. If you wanted to just read one row at a time, why would you not just run a single spout instance instead of trying to synchronize multiple spouts?

您似乎试图将关系数据库用作队列系统,这可能不太合适.考虑例如而是卡夫卡.我认为您应该能够使用 https://www.confluent.io/product/之一connector/http://debezium.io/ 将数据从 Postgres 流式传输到 Kafka.

You seem to be trying to use your relational database as a queue system, which is probably a bad fit. Consider e.g. Kafka instead. I think you should be able to use either one of https://www.confluent.io/product/connectors/ or http://debezium.io/ to stream data from your Postgres to Kafka.

这篇关于Apache Storm - 从 SPOUT 访问数据库 - 连接池的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆