Flink-为什么我应该创建自己的RichSinkFunction而不是仅打开和关闭我的PostgreSql连接? [英] Flink - Why should I create my own RichSinkFunction instead of just open and close my PostgreSql connection?

查看:1405
本文介绍了Flink-为什么我应该创建自己的RichSinkFunction而不是仅打开和关闭我的PostgreSql连接?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我想知道为什么我真的需要创建自己的RichSinkFunction或使用JDBCOutputFormat在数据库上进行连接,而不仅仅是创建我的连接,执行查询并使用我的SinkFunction中的传统PostgreSQL驱动程序关闭连接?

I would like to know why I really need to create my own RichSinkFunction or use JDBCOutputFormat to connect on the database instead of just Create my connection, perform the query and close the connection using the traditional PostgreSQL drivers inside my SinkFunction?

我发现有很多文章告诉我这样做,但是没有解释为什么?有什么区别?

I found many articles telling do to that but does not explain why? What is the difference?

使用JDBCOutputFormat的代码示例

Code example using JDBCOutputFormat,

JDBCOutputFormat jdbcOutput = JDBCOutputFormat.buildJDBCOutputFormat()
     .setDrivername("org.postgresql.Driver")
     .setDBUrl("jdbc:postgresql://localhost:1234/test?user=xxx&password=xxx")
     .setQuery(query)
     .setSqlTypes(new int[] { Types.VARCHAR, Types.VARCHAR, Types.VARCHAR }) //set the types
     .finish();

实现自己的RichSinkFunction的代码示例,

Code example implementing the own RichSinkFunction,

public class RichCaseSink extends RichSinkFunction<Case> {

  private static final String UPSERT_CASE = "INSERT INTO public.cases (caseid, tracehash) "
      + "VALUES (?, ?) "
      + "ON CONFLICT (caseid) DO UPDATE SET "
      + "  tracehash=?";

  private PreparedStatement statement;


  @Override
  public void invoke(Case aCase) throws Exception {

    statement.setString(1, aCase.getId());
    statement.setString(2, aCase.getTraceHash());
    statement.setString(3, aCase.getTraceHash());
    statement.addBatch();
    statement.executeBatch();
  }

  @Override
  public void open(Configuration parameters) throws Exception {
    Class.forName("org.postgresql.Driver");
    Connection connection =
        DriverManager.getConnection("jdbc:postgresql://localhost:5432/casedb?user=signavio&password=signavio");

    statement = connection.prepareStatement(UPSERT_CASE);
  }

}

为什么我不能只使用PostgreSQL驱动程序?

why I cannot just use the PostgreSQL driver?

public class Storable implements SinkFunction<Activity>{

    @Override
    public void invoke(Activity activity) throws Exception {
        Class.forName("org.postgresql.Driver");
        try(Connection connection =
            DriverManager.getConnection("jdbc:postgresql://localhost:5432/casedb?user=signavio&password=signavio")){

        statement = connection.prepareStatement(UPSERT_CASE);

        //Perform the query

        //close connection...
        }
    }

}

有人知道Flink最佳做法的技术答案吗? RichSinkFunction的实现或JDBCOutputFormat的使用是否有特殊之处?

Does someone know the technical answer to the best practice in Flink? Does Implementation of RichSinkFunction or usage of JDBCOutputFormat do something special?

谢谢.

推荐答案

好吧,您可以使用自己的SinkFunction,只需使用invoke()方法打开连接并写入数据,它通常可以正常工作.但是在大多数情况下,它的性能会非常非常差.

Well You can use your own SinkFunction that will simply use invoke() method to open connection and write data and it should work in general. But it's performance will be very, very poor in most cases.

第一个示例和第二个示例之间的实际区别在于,在RichSinkFunction中,您正在使用open()方法打开连接并准备语句.该open()方法在函数初始化时仅被调用一次.在第二个示例中,您将打开与数据库的连接并在invoke()方法内的prepare语句中,该语句将对输入DataStream的每个元素进行调用.实际上,您将为数据库中的每个元素打开一个新连接.流.

The actual difference between first example and the second example is the fact that in the RichSinkFunction you are using open() method to open the connection and prepare the statement. This open() method is invoked only once when the function is initialized. In the second example you will open the connection to the database and prepare statement inside the invoke() method, which is invoked for every element of the input DataStream.You will actually open a new connection for every element in the stream.

创建数据库连接是一件昂贵的事情,并且肯定会带来严重的性能缺陷.

Creating a database connection is expensive thing to do, and it will for sure have terrible performance drawbacks.

这篇关于Flink-为什么我应该创建自己的RichSinkFunction而不是仅打开和关闭我的PostgreSql连接?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆