Flink Jdbc接收器 [英] Flink Jdbc sink

查看:67
本文介绍了Flink Jdbc接收器的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我创建了一个应用程序,可以在其中从Kinesis流中读取数据并将数据存储到mysql表中.

I have created an application where I read data from Kinesis streams and sink the data into mysql table.

我试图对应用程序进行负载测试.对于100k条目,需要3个多小时.任何有关为什么它发生如此缓慢的建议.还有一件事是我的表的主键由7列组成.

I tried to load test the app. For 100k entries it takes more than 3 hours. Any suggestion why it's happening so slow. One more thing is the primary key of my table consist of 7 columns.

我正在使用hibernate将POJO直接存储到数据库中.

I am using hibernate to store POJOs directly into database.

代码:

public class JDBCSink extends RichSinkFunction<CompetitorConfig> {
    private static SessionFactory sessionFactory;
    private static StandardServiceRegistryBuilder serviceRegistry;
    private Session session;
    private  String username;
    private  String password;
    private static final Logger LOG = LoggerFactory.getLogger(CompetitorConfigJDBCSink.class);

    public JDBCSink(String user, String pass) {
        username = user;
        password = pass;
    }

    public static void configureHibernateUtil(String username, String password) {
        try {

            Properties prop= new Properties();
            prop.setProperty("hibernate.dialect", "org.hibernate.dialect.MySQLDialect");
            prop.setProperty("hibernate.connection.driver_class", "com.mysql.cj.jdbc.Driver");
            prop.setProperty("hibernate.connection.url", "url");
            prop.setProperty("hibernate.connection.username", username);
            prop.setProperty("hibernate.connection.password", password);
            org.hibernate.cfg.Configuration configuration = new org.hibernate.cfg.Configuration().addProperties(prop);
            configuration.addAnnotatedClass(CompetitorConfig.class);
            serviceRegistry = new StandardServiceRegistryBuilder().applySettings(configuration.getProperties());
            sessionFactory = configuration.buildSessionFactory(serviceRegistry.build());
        } catch (Throwable ex) {
             throw new ExceptionInInitializerError(ex);
        }
    }

    @Override
    public void open(Configuration parameters) throws Exception {

        configureHibernateUtil(username,password);
        this.session = sessionFactory.openSession();
    }

    @Override
    public void invoke(CompetitorConfig value) throws Exception {
        Transaction transaction = null;
        try {
            transaction = session.beginTransaction();
            session.merge(value);
            session.flush();

        } catch (Exception e) {
             throw e;
        } finally {
            transaction.commit();
        }
    }

    @Override
    public void close() throws Exception {
           this.session.close();
            sessionFactory.close();
    }
}

推荐答案

这很慢,因为您分别编写每个记录,并包装在其自己的事务中.高性能数据库接收器将进行缓冲,批量写入和提交事务,作为检查点的一部分.

This is slow because you writing each record individually, wrapped in its own transaction. A high performance database sink will do buffered, bulk writes, and commit transactions as part of checkpointing.

如果只需要一次保证并且可以满足upsert语义,则可以使用FLINK的现有JDBC接收器.如果您需要两阶段提交,那么该提交已被合并到主提交中,并将包含在Flink 1.13中.参见 FLINK-15578 .

If you need exactly once guarantees and can be satisfied with upsert semantics, you can use FLINK's existing JDBC sink. If you require two-phase commit, that's already been merged to master, and will be included in Flink 1.13. See FLINK-15578.

更新:

upsert没有标准的SQL语法.您需要弄清楚数据库是否以及如何支持此功能.例如:

There's no standard SQL syntax for an upsert; you'll need to figure out if and how your database supports this. For example:

MySQL:

INSERT ... ON DUPLICATE KEY UPDATE ...

PostgreSQL:

PostgreSQL:

INSERT ... ON CONFLICT ... DO UPDATE SET ...

就其价值而言,这样的应用程序通常更易于使用Flink SQL实施.在这种情况下,您可以使用运动表连接器以及 JDBC表连接器.

For what it's worth, applications like this are generally easier to implement using Flink SQL. In that case you would use the Kinesis table connector together with the JDBC table connector.

这篇关于Flink Jdbc接收器的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆