Google Dataflow(Apache Beam)将JdbcIO批量插入mysql数据库 [英] Google Dataflow (Apache beam) JdbcIO bulk insert into mysql database

查看:75
本文介绍了Google Dataflow(Apache Beam)将JdbcIO批量插入mysql数据库的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在使用Dataflow SDK 2.X Java API(Apache Beam SDK)将数据写入mysql.我已经基于 Apache Beam SDK文档,以使用数据流将数据写入mysql.当我需要实现批量插入时,它会插入单行.我没有在官方文档中找到任何选项来启用批量插入模式.

I'm using Dataflow SDK 2.X Java API ( Apache Beam SDK) to write data into mysql. I've created pipelines based on Apache Beam SDK documentation to write data into mysql using dataflow. It inserts single row at a time where as I need to implement bulk insert. I do not find any option in official documentation to enable bulk inset mode.

想知道,是否可以在数据流管道中设置批量插入模式?如果是,请告诉我以下代码中需要更改的内容.

Wondering, if it's possible to set bulk insert mode in dataflow pipeline? If yes, please let me know what I need to change in below code.

 .apply(JdbcIO.<KV<Integer, String>>write()
      .withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create(
            "com.mysql.jdbc.Driver", "jdbc:mysql://hostname:3306/mydb")
          .withUsername("username")
          .withPassword("password"))
      .withStatement("insert into Person values(?, ?)")
      .withPreparedStatementSetter(new JdbcIO.PreparedStatementSetter<KV<Integer, String>>() {
        public void setParameters(KV<Integer, String> element, PreparedStatement query) {
          query.setInt(1, kv.getKey());
          query.setString(2, kv.getValue());
        }
      })

推荐答案

编辑2018年1月27日:

事实证明,此问题与DirectRunner有关.如果使用DataflowRunner运行相同的管道,则应获得实际上多达1,000条记录的批处理.在进行分组操作后,DirectRunner始终会创建大小为1的束.

It turns out that this issue is related to the DirectRunner. If you run the same pipeline using the DataflowRunner, you should get batches that are actually up to 1,000 records. The DirectRunner always creates bundles of size 1 after a grouping operation.

原始答案:

使用Apache Beam的JdbcIO写入云数据库时,我遇到了同样的问题.问题是,尽管JdbcIO确实支持批量写入多达1,000条记录,但实际上我从未见过它一次写入多于1行(我必须承认:这始终在开发环境中使用DirectRunner).

I've run into the same problem when writing to cloud databases using Apache Beam's JdbcIO. The problem is that while JdbcIO does support writing up to 1,000 records in one batch, in I have never actually seen it write more than 1 row at a time (I have to admit: This was always using the DirectRunner in a development environment).

因此,我在JdbcIO中添加了一个功能,您可以通过将数据分组在一起并将每个组写为一个批处理来自己控制批处理的大小.以下是基于Apache Beam原始WordCount示例的如何使用此功能的示例.

I have therefore added a feature to JdbcIO where you can control the size of the batches yourself by grouping your data together and writing each group as one batch. Below is an example of how to use this feature based on the original WordCount example of Apache Beam.

p.apply("ReadLines", TextIO.read().from(options.getInputFile()))
    // Count words in input file(s)
    .apply(new CountWords())
    // Format as text
    .apply(MapElements.via(new FormatAsTextFn()))
    // Make key-value pairs with the first letter as the key
    .apply(ParDo.of(new FirstLetterAsKey()))
    // Group the words by first letter
    .apply(GroupByKey.<String, String> create())
    // Get a PCollection of only the values, discarding the keys
    .apply(ParDo.of(new GetValues()))
    // Write the words to the database
    .apply(JdbcIO.<String> writeIterable()
            .withDataSourceConfiguration(
                JdbcIO.DataSourceConfiguration.create(options.getJdbcDriver(), options.getURL()))
            .withStatement(INSERT_OR_UPDATE_SQL)
            .withPreparedStatementSetter(new WordCountPreparedStatementSetter()));

与JdbcIO的常规写方​​法不同的是,新方法 writeIterable()采用 PCollection< Iterable< RowT>> 作为输入而不是 PCollection< RowT> .每个Iterable都批量写入数据库.

The difference with the normal write-method of JdbcIO is the new method writeIterable() that takes a PCollection<Iterable<RowT>> as input instead of PCollection<RowT>. Each Iterable is written as one batch to the database.

可以在以下位置找到具有此附加功能的JdbcIO版本:

The version of JdbcIO with this addition can be found here: https://github.com/olavloite/beam/blob/JdbcIOIterableWrite/sdks/java/io/jdbc/src/main/java/org/apache/beam/sdk/io/jdbc/JdbcIO.java

包含上述示例的整个示例项目可以在以下位置找到: https://github.com/olavloite/spanner-beam-example

The entire example project containing the example above can be found here: https://github.com/olavloite/spanner-beam-example

(Apache Beam上还有一个待处理的拉取请求,将其包含在项目中)

(There is also a pull request pending on Apache Beam to include this in the project)

这篇关于Google Dataflow(Apache Beam)将JdbcIO批量插入mysql数据库的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆