Kafka Source和Sink连接器不适用于大数据 [英] Kafka Source and Sink connector not working for large data

查看:71
本文介绍了Kafka Source和Sink连接器不适用于大数据的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在使用Kafka源和接收器连接器创建数据管道.源连接器正在从SQL数据库中使用并发布到主题中,而订阅该主题的Sink连接器则将其添加到其他SQL数据库中.表格有16 GB的数据.现在的问题是,数据没有从一个数据库传输到另一个数据库.但是,如果表的大小很小(如1000行),则说明数据已成功传输.

I am creating a data pipeline using Kafka source and sink connector. Source connector is consuming from SQL database and publishing into topic and Sink connector subscribing to topic and putting into other SQL database. Table has 16 GB of data. Now the problem is, data is not getting transferred from one DB to another. However, if table size is small like 1000 rows then the data is getting successfully transferred.

源连接器配置:

"config": {
       "connector.class": 
"io.confluent.connect.jdbc.JdbcSourceConnector",
       "tasks.max": "1",
       "connection.url": "",
       "mode": "incrementing",
       "incrementing.column.name": "ID",
       "topic.prefix": "migration_",
       "name": "jdbc-source",
       "validate.non.null": false,
       "batch.max.rows":5
     }

源连接器日志:

INFO WorkerSourceTask{id=cmc-migration-source-0} flushing 0 outstanding messages for offset commit 
[2019-03-08 16:48:45,402] INFO WorkerSourceTask{id=cmc-migration-source-0} Committing offsets
[2019-03-08 16:48:45,402] INFO WorkerSourceTask{id=cmc-migration-source-0} flushing 0 outstanding messages for offset commit
[2019-03-08 16:48:55,403] INFO WorkerSourceTask{id=cmc-migration-source-0} Committing offsets(org.apache.kafka.connect.runtime.WorkerSourceTask:397)

有人可以指导我如何调整我的Kafka源连接器以传输大数据吗?

Can anyone guide me how to tune my Kafka source connector to transfer large data?

推荐答案

我设法通过限制在单个查询中返回到数据库的记录数量来解决此问题,例如一次最多5000个.

I have managed to overcome this problem by limiting the amount of records returned in a single query to a database e.g. 5000 at a time.

解决方案将取决于数据库和SQL方言.以下示例将对单个表格正常工作并管理偏移量.必须按照此处指定的说明设置递增的ID列和时间戳: https://docs.confluent.io/kafka-connect-jdbc/current/source-connector/index.html#incremental-query-modes

The solution will depend on the database and SQL dialect. Below examples will work and managed offsets properly for a single table. The incrementing ID column and Timestamp must be set as per instructions specified here: https://docs.confluent.io/kafka-connect-jdbc/current/source-connector/index.html#incremental-query-modes

示例表 myTable 包含以下列:

    每次添加新记录时,
  • id 就会递增
  • lastUpdatedTimestamp -每次记录更新时更新
  • 其他一些属性
  • id increments every time the new record is added
  • lastUpdatedTimestamp - updated every time the record gets updated
  • some other attributes

id lastUpdatedTimestamp 必须唯一地标识数据集中的记录.

id and lastUpdatedTimestamp must uniquely identify a record in the dataset.

连接器按以下方式构造查询:

The connector constructs the query as follows:

config.query + Kafka Connect选定模式的WHERE子句 + config.query.suffix

PostgreSQL/MySQL

"config": {
    ...
    "poll.interval.ms" : 10000,
    "mode":"timestamp+incrementing",
    "incrementing.column.name": "id",
    "timestamp.column.name": "lastUpdatedTimestamp",
    "table.whitelist": "myTable",
    "query.suffix": "LIMIT 5000"
    ...
    }

将导致:

SELECT *
FROM "myTable"
WHERE "myTable"."lastUpdatedTimestamp" < ?
    AND (
        ("myTable"."lastUpdatedTimestamp" = ? AND "myTable"."id" > ?)
        OR
        "myTable"."lastUpdatedTimestamp" > ?
        )
ORDER BY
    "myTable"."lastUpdatedTimestamp",
    "myTable"."id" ASC
LIMIT 5000

如果要在WHERE子句中添加其他条件,请执行以下操作.

Or the following if you want to add additional condition in the WHERE clause.

"config": {
    ...
    "poll.interval.ms" : 10000,
    "mode":"timestamp+incrementing",
    "incrementing.column.name": "id",
    "timestamp.column.name": "lastUpdatedTimestamp",
    "query": "SELECT * FROM ( SELECT id, lastUpdatedTimestamp, name, age FROM myTable WHERE Age > 18) myQuery",
    "query.suffix": "LIMIT 5000"
    ...
    }

将导致:

SELECT *
FROM (
    SELECT id, lastUpdatedTimestamp, name, age
    FROM myTable
    WHERE Age > 18
    ) myQuery
WHERE "myTable"."lastUpdatedTimestamp" < ?
    AND (
        ("myTable"."lastUpdatedTimestamp" = ? AND "myTable"."id" > ?)
        OR
        "myTable"."lastUpdatedTimestamp" > ?
        )
ORDER BY
    "myTable"."lastUpdatedTimestamp",
    "myTable"."id" ASC
LIMIT 5000

SQL Server

"config": {
    ...
    "poll.interval.ms" : 10000,
    "mode":"timestamp+incrementing",
    "incrementing.column.name": "id",
    "timestamp.column.name": "lastUpdatedTimestamp",
    "query": "SELECT TOP 5000 * FROM (SELECT id, lastUpdatedTimestamp, name, age FROM myTable) myQuery",
    ...
    }

将导致:

SELECT TOP 5000 *
FROM (
    SELECT id, lastUpdatedTimestamp, name, age
    FROM myTable
    WHERE Age > 18
    ) myQuery
WHERE "myTable"."lastUpdatedTimestamp" < ?
    AND (
        ("myTable"."lastUpdatedTimestamp" = ? AND "myTable"."id" > ?)
        OR
        "myTable"."lastUpdatedTimestamp" > ?
        )
ORDER BY
    "myTable"."lastUpdatedTimestamp",
    "myTable"."id" ASC

Oracle

"config": {
    ...
    "poll.interval.ms" : 10000,
    "mode":"timestamp+incrementing",
    "incrementing.column.name": "id",
    "timestamp.column.name": "lastUpdatedTimestamp",
    "query": "SELECT * FROM (SELECT id, lastUpdatedTimestamp, name, age FROM myTable WHERE ROWNUM <= 5000) myQuery",
    ...
    }

将导致:

SELECT *
FROM (
    SELECT id, lastUpdatedTimestamp, name, age
    FROM myTable
    WHERE ROWNUM <= 5000
    ) myQuery
WHERE "myTable"."lastUpdatedTimestamp" < ?
    AND (
        ("myTable"."lastUpdatedTimestamp" = ? AND "myTable"."id" > ?)
        OR
        "myTable"."lastUpdatedTimestamp" > ?
        )
ORDER BY
    "myTable"."lastUpdatedTimestamp",
    "myTable"."id" ASC

此方法不适用于 bulk 模式.它可以使用 timestamp + incrementing 模式,并且可以根据表的特性使用 timestamp incrementing 模式.

This approach will not work with bulk mode. It works with timestamp+incrementing mode and it may work with timestamp or incrementing modes depending on the table characteristics.

加入许多表格-我还没有测试过!

Joining many tables - idea that I have not tested!

如果查询在许多表中执行联接,则会变得更加复杂.它将需要以下内容:

It gets more complicated if the query performs the joins across many tables. It would require the following:

  • 提供可唯一标识行的ID,例如级联的tableA.id + tableB.id
  • 提供最后更新的表记录的时间戳,以最新的为准
  • 适当排序查询记录

由于Kafka Connect使用的Java long数据类型(即 9,223,372,036,854,775,807

There is a limit of of how long the ID can be due to Java long datatype used by Kafka Connect i.e. 9,223,372,036,854,775,807

这篇关于Kafka Source和Sink连接器不适用于大数据的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆