Flink Jdbc 接收器 [英] Flink Jdbc sink

查看:38
本文介绍了Flink Jdbc 接收器的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我创建了一个应用程序,我在其中从 Kinesis 流读取数据并将数据沉入 mysql 表中.

I have created an application where I read data from Kinesis streams and sink the data into mysql table.

我尝试对应用进行负载测试.对于 10 万个条目,需要 3 个多小时.任何建议为什么它发生的这么慢.还有一件事是我的表的主键由 7 列组成.

I tried to load test the app. For 100k entries it takes more than 3 hours. Any suggestion why it's happening so slow. One more thing is the primary key of my table consist of 7 columns.

我使用 hibernate 将 POJO 直接存储到数据库中.

I am using hibernate to store POJOs directly into database.

代码:

public class JDBCSink extends RichSinkFunction<CompetitorConfig> {
    private static SessionFactory sessionFactory;
    private static StandardServiceRegistryBuilder serviceRegistry;
    private Session session;
    private  String username;
    private  String password;
    private static final Logger LOG = LoggerFactory.getLogger(CompetitorConfigJDBCSink.class);

    public JDBCSink(String user, String pass) {
        username = user;
        password = pass;
    }

    public static void configureHibernateUtil(String username, String password) {
        try {

            Properties prop= new Properties();
            prop.setProperty("hibernate.dialect", "org.hibernate.dialect.MySQLDialect");
            prop.setProperty("hibernate.connection.driver_class", "com.mysql.cj.jdbc.Driver");
            prop.setProperty("hibernate.connection.url", "url");
            prop.setProperty("hibernate.connection.username", username);
            prop.setProperty("hibernate.connection.password", password);
            org.hibernate.cfg.Configuration configuration = new org.hibernate.cfg.Configuration().addProperties(prop);
            configuration.addAnnotatedClass(CompetitorConfig.class);
            serviceRegistry = new StandardServiceRegistryBuilder().applySettings(configuration.getProperties());
            sessionFactory = configuration.buildSessionFactory(serviceRegistry.build());
        } catch (Throwable ex) {
             throw new ExceptionInInitializerError(ex);
        }
    }

    @Override
    public void open(Configuration parameters) throws Exception {

        configureHibernateUtil(username,password);
        this.session = sessionFactory.openSession();
    }

    @Override
    public void invoke(CompetitorConfig value) throws Exception {
        Transaction transaction = null;
        try {
            transaction = session.beginTransaction();
            session.merge(value);
            session.flush();

        } catch (Exception e) {
             throw e;
        } finally {
            transaction.commit();
        }
    }

    @Override
    public void close() throws Exception {
           this.session.close();
            sessionFactory.close();
    }
}

推荐答案

这很慢,因为您单独编写每条记录,并封装在其自己的事务中.作为检查点的一部分,高性能数据库接收器将执行缓冲、批量写入和提交事务.

This is slow because you writing each record individually, wrapped in its own transaction. A high performance database sink will do buffered, bulk writes, and commit transactions as part of checkpointing.

如果您需要恰好一次保证并且可以满足 upsert 语义,您可以使用 FLINK 现有的 JDBC sink.如果你需要两阶段提交,那已经合并到 master 中,并将包含在 Flink 1.13 中.请参阅 FLINK-15578.

If you need exactly once guarantees and can be satisfied with upsert semantics, you can use FLINK's existing JDBC sink. If you require two-phase commit, that's already been merged to master, and will be included in Flink 1.13. See FLINK-15578.

更新:

更新插入没有标准的 SQL 语法;您需要弄清楚您的数据库是否以及如何支持这一点.例如:

There's no standard SQL syntax for an upsert; you'll need to figure out if and how your database supports this. For example:

MySQL:

INSERT ... ON DUPLICATE KEY UPDATE ...

PostgreSQL:

PostgreSQL:

INSERT ... ON CONFLICT ... DO UPDATE SET ...

就其价值而言,使用 Flink SQL 通常更容易实现这样的应用程序.在这种情况下,您将使用 Kinesis表连接器JDBC 表连接器.

For what it's worth, applications like this are generally easier to implement using Flink SQL. In that case you would use the Kinesis table connector together with the JDBC table connector.

这篇关于Flink Jdbc 接收器的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆