如何从Apache Kafka中的远程数据库中提取数据? [英] How to pull the data from remote database in Apache Kafka?

查看:405
本文介绍了如何从Apache Kafka中的远程数据库中提取数据?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我想在Apache Kafka中建立实时数据管道.我有位于远程位置的数据库,并且该数据库不断更新.我可以使用哪个Kafka connect API来从数据库中提取数据并实时提取到Kafka Broker中的人吗?稍后,我将使用kafka流和KSQL来运行临时查询来执行指标.

I want to make real-time data pipeline in Apache Kafka. I have database which is located at remote location and that database continuously updating. Can anybody which Kafka connect API i should use to pull the data from database and ingest into Kafka broker in real time? later on i would use kafka stream and KSQL to run ad-hoc queries to perform the metrics.

任何帮助将不胜感激!

推荐答案

如果要创建实时数据管道,则需要使用能够从MySQL流式传输更改的更改数据捕获(CDC)工具.我建议 Debezium ,它是用于捕获变更数据的开源分布式平台.

If you want to create a real-time data pipeline you need to use a Change Data Capture (CDC) tool which is able to stream changes from MySQL. I would suggest Debezium which is an open source distributed platform for change data capture.

捕获插入

将新记录添加到表中时,将生成类似于以下内容的JSON:

When a new record is added to a table, a JSON similar to the one below will be produced:

{  
   "payload":{  
      "before":null,
      "after":{  
         "id":1005,
         "first_name":"Giorgos",
         "last_name":"Myrianthous",
         "email":"giorgos@abc.com"
      },
      "source":{  
         "name":"dbserver1",
         "server_id":223344,
         "ts_sec":1500369632,
         "gtid":null,
         "file":"mysql-bin.000003",
         "pos":364,
         "row":0,
         "snapshot":null,
         "thread":13,
         "db":"inventory",
         "table":"customers"
      },
      "op":"c",
      "ts_ms":1500369632095
   }
}

before对象为null,after对象包含新插入的值.请注意,op属性是c,指示这是一个CREATE事件.

before object is null and after object contains the newly inserted values. Note that the op attribute is c, indicating that this was a CREATE event.

捕获更新

假设email属性已更新,将生成类似于以下内容的JSON:

Assuming that email attribute has been updated, a JSON similar to the one below will be produced:

{ 
    "payload":{  
      "before":{  
         "id":1005,
         "first_name":"Giorgos",
         "last_name":"Myrianthous",
         "email":"giorgos@abc.com"
      },
      "after":{  
         "id":1005,
         "first_name":"Giorgos",
         "last_name":"Myrianthous",
         "email":"newEmail@abc.com"
      },
      "source":{  
         "name":"dbserver1",
         "server_id":223344,
         "ts_sec":1500369929,
         "gtid":null,
         "file":"mysql-bin.000003",
         "pos":673,
         "row":0,
         "snapshot":null,
         "thread":13,
         "db":"inventory",
         "table":"customers"
      },
      "op":"u",
      "ts_ms":1500369929464
   }
}

通知op(现在为u),指示这是一个UPDATE事件. before对象显示更新之前的行状态,而after对象捕获更新后的行的当前状态.

Notice op which is now u, indicating that this was an UPDATE event. before object shows the row state before the update and after object captures the current state of the updated row.

捕获删除内容

现在假定该行已被删除;

Now assume that the row has been deleted;

{ 
    "payload":{  
      "before":{  
         "id":1005,
         "first_name":"Giorgos",
         "last_name":"Myrianthous",
         "email":"newEmail@abc.com"
      },
      "after":null,
      "source":{  
         "name":"dbserver1",
         "server_id":223344,
         "ts_sec":1500370394,
         "gtid":null,
         "file":"mysql-bin.000003",
         "pos":1025,
         "row":0,
         "snapshot":null,
         "thread":13,
         "db":"inventory",
         "table":"customers"
      },
      "op":"d",
      "ts_ms":1500370394589
   }
}

op new等于d,表示DELETE事件. after属性将为null,并且before对象包含被删除之前的行.

op new is equal to d, indicating a DELETE event. after attribute will be null and before object contains the row before it gets deleted.

您还可以查看其网站上提供的广泛的教程.

You can also have a look at the extensive tutorial provided in their website.

编辑:示例配置一个MySQL数据库

Example configuration for a MySQL database

{
  "name": "inventory-connector",  (1)
  "config": {
    "connector.class": "io.debezium.connector.mysql.MySqlConnector", (2)
    "database.hostname": "192.168.99.100", (3)
    "database.port": "3306", (4)
    "database.user": "debezium", (5)
    "database.password": "dbz", (6)
    "database.server.id": "184054", (7)
    "database.server.name": "fullfillment", (8)
    "database.whitelist": "inventory", (9)
    "database.history.kafka.bootstrap.servers": "kafka:9092", (10)
    "database.history.kafka.topic": "dbhistory.fullfillment" (11)
    "include.schema.changes": "true" (12)
  }
}

1当我们在Kafka Connect中注册连接器时的名称 服务.
2此MySQL连接器类的名称.
3的地址 MySQL服务器.
4 MySQL服务器的端口号.
5的名字 具有所需特权的MySQL用户.
6的密码 具有所需特权的MySQL用户.
7连接器的 标识符在MySQL集群中必须唯一,并且类似于 MySQL的server-id配置属性.
8的逻辑名称 MySQL服务器/集群,形成一个命名空间,并在所有 连接器写入的Kafka主题的名称,Kafka 连接架构名称以及相应的Avro的名称空间 使用Avro连接器时的架构.
9所有数据库的列表 此连接器将监视的服务器托管的主机.这是 可选,还有其他属性可用于列出数据库和 表以包括或不包括在监视中.
10卡夫卡名单 该连接器将用于写入和恢复DDL的代理 数据库历史记录主题的语句.
11数据库的名称 连接器将在其中写入和恢复DDL的历史主题 陈述.本主题仅供内部使用,不应使用 受到消费者的欢迎.
12该标志指定连接器应 在架构更改主题上生成名为fullfillment events的事件, 消费者可以使用的DDL更改.

1 The name of our connector when we register it with a Kafka Connect service.
2 The name of this MySQL connector class.
3 The address of the MySQL server.
4 The port number of the MySQL server.
5 The name of the MySQL user that has the required privileges.
6 The password for the MySQL user that has the required privileges.
7 The connector’s identifier that must be unique within the MySQL cluster and similar to MySQL’s server-id configuration property.
8 The logical name of the MySQL server/cluster, which forms a namespace and is used in all the names of the Kafka topics to which the connector writes, the Kafka Connect schema names, and the namespaces of the corresponding Avro schema when the Avro Connector is used.
9 A list of all databases hosted by this server that this connector will monitor. This is optional, and there are other properties for listing the databases and tables to include or exclude from monitoring.
10 The list of Kafka brokers that this connector will use to write and recover DDL statements to the database history topic.
11 The name of the database history topic where the connector will write and recover DDL statements. This topic is for internal use only and should not be used by consumers.
12 The flag specifying that the connector should generate on the schema change topic named fullfillment events with the DDL changes that can be used by consumers.

这篇关于如何从Apache Kafka中的远程数据库中提取数据?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆