Kafka JDBC Sink连接器,批量插入值 [英] Kafka JDBC Sink Connector, insert values in batches
问题描述
我每秒收到许多消息(通过http-protocol)(50000-100000),并希望将它们保存到PostgreSql.我决定为此目的使用Kafka JDBC Sink.
I receive a lot of the messages (by http-protocol) per second (50000 - 100000) and want to save them to PostgreSql. I decided to use Kafka JDBC Sink for this purpose.
消息是通过一条记录而不是成批保存到数据库的.我想以500-1000条记录的大小批量在PostgreSQL中插入记录.
The messages are saved to database by one record, not in batches. I want to insert records in PostgreSQL in batches with size 500-1000 records.
我发现了有关此问题的一些答案:如何使用批处理.大小?
I found some answers on this problem in issue: How to use batch.size?
我尝试在配置中使用相关选项,但似乎它们没有任何作用.
I tried to use related options in configuration, but it seems that they no have any effect.
我的Kafka JDBC Sink PostgreSql配置(etc/kafka-connect-jdbc/postgres.properties
):
My Kafka JDBC Sink PostgreSql configuration (etc/kafka-connect-jdbc/postgres.properties
):
name=test-sink
connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
tasks.max=3
# The topics to consume from - required for sink connectors like this one
topics=jsonb_pkgs
connection.url=jdbc:postgresql://localhost:5432/test?currentSchema=test
auto.create=false
auto.evolve=false
insert.mode=insert
connection.user=postgres
table.name.format=${topic}
connection.password=pwd
batch.size=500
# based on 500*3000byte message size
fetch.min.bytes=1500000
fetch.wait.max.ms=1500
max.poll.records=4000
我还为connect-distributed.properties
添加了选项:
consumer.fetch.min.bytes=1500000
consumer.fetch.wait.max.ms=1500
尽管每个分区每秒可获取1000条以上的记录,但记录会被一个保存到PostgreSQL.
Although each a partition gets more than 1000 records per second, records are saved to PostgreSQL by one.
修改:使用其他名称正确的文件中添加了消费者选项
我还为etc/schema-registry/connect-avro-standalone.properties
添加了选项:
# based on 500*3000 byte message size
consumer.fetch.min.bytes=1500000
consumer.fetch.wait.max.ms=1500
consumer.max.poll.records=4000
推荐答案
我意识到我误解了文档.记录被一一插入到数据库中.在一个事务中插入的记录数取决于batch.size
和consumer.max.poll.records
.我希望批量插入是通过其他方式实现的.我想有一个插入这样的记录的选项:
I realised that I misunderstood the documentation. The records are inserted in database one by one. The count of the records inserted in one transaction depends on batch.size
and consumer.max.poll.records
. I expected that the batch insert was implemented the other way. I would like to have an option to insert records like this:
INSERT INTO table1 (First, Last)
VALUES
('Fred', 'Smith'),
('John', 'Smith'),
('Michael', 'Smith'),
('Robert', 'Smith');
但这似乎是不可能的.
这篇关于Kafka JDBC Sink连接器,批量插入值的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!