apache-kafka-connect相关内容

如何在 Confluent Kafka 连接中将时间戳值格式化为日期格式 - JDBC Oracle Source Connector

我在 Kafka Connect 中设置了一个 JDBC Oracle 源连接器,我在 Oracle 表中有一个时间戳列,其值设置为日期格式“21-MAR-18 05.39.44.0194990 AM". Oracle表中的所有数据都被抓取到Kafka主题中,但是oracle表中日期格式的列(时间戳列)的值被转换为毫秒并在主题中显示为毫秒值.我希望收到 Oracle 表中的日期格式. {“ ..
发布时间:2021-11-12 03:30:14 其他开发

Kafka 连接器可以加载自己的名称吗?

根据 Kafka 文档 连接器配置是简单的键值映射.对于独立mode 这些在属性文件中定义并传递给 Connect在命令行上处理. 大多数配置都依赖于连接器,因此无法对其进行概述这里.但是,有一些常见的选项: name - 连接器的唯一名称.尝试使用相同名称再次注册将失败. 我有 10 个连接器以这样的独立模式运行: bin/connect-standalone.sh confi ..
发布时间:2021-11-12 03:30:08 其他开发

Kafka Connect S3 Sink Connector 按 id 字段对大型主题进行分区

过去几周,我们一直致力于将 Kafka Connect 添加到我们的数据平台,并认为这将是将数据从 Kafka 提取到 S3 数据湖的有用方法.我们使用了 FieldPartitioner 和 TimeBasePartitioner,并看到了一些相当不错的结果. 我们还需要按用户 id 进行分区 - 但是尝试在用户 id 字段上使用 FieldPartitioner 时,连接器非常慢 - 特 ..

Kafka 连接中的 ACL 配置不起作用

我为 3 节点 Kafka 集群设置了 ACL,并且能够通过生产者控制台和消费者控制台发送和接收主题.现在我想用 ACL 配置 Kafka 连接.我尝试使用 SASL_PLAINTEXT 组合,并在 connect.log 文件中显示以下错误.它没有从源表同步到主题,请帮助我缺少任何配置. 错误日志 [2020-10-14 07:24:35,874] ERROR WorkerSourceT ..
发布时间:2021-11-12 03:30:00 其他开发

从 Postgres 到 Kafka 的变化跟踪

这个问题跟在这个问题之后. 主要任务是在 KSQL 端进行连接.下面的例子将说明它.事件消息到达 Kafka 主题.该消息的结构: [{"name": "from_ts",“类型":“bigint"},{"name": "to_ts",“类型":“bigint"},{"name": "rulenode_id",“类型":“整数"}] 还有一个 Postgres 表 rulenode: ..
发布时间:2021-11-12 03:29:55 其他开发

Debezium 能否捕获 Postges 物化视图的变化

我们目前正在尝试使用 Debezium 来捕获 Postgres 数据库中 4 个表的更改.我们目前知道,对于这个用例,我们可以使用 kafka-streams 应用程序来加入/聚合 KTable 的表,但是我们希望保持 kafka-stream 拓扑简单,因此我们的想法是使用来自 Postgres 的物化视图和捕捉它的变化. 是否可以这样做,如果可以,我们应该如何配置它? Kafka ..

Kafka 连接正在发送格式错误的 json

我正在尝试使用带有 rabbitMQ 连接器的 kafka-connect 执行概念验证.基本上,我有两个简单的 Spring Boot 应用程序;一个 RabbitMQ 生产者和一个 Kafka 消费者.消费者无法处理来自连接器的消息,因为它以某种方式转换了我的 JSON 消息;RabbitMQ 发送 {"transaction": "PAYMENT", "amount": "$125.0"} ..
发布时间:2021-11-12 03:29:49 其他开发

有没有办法配置要使用 jmx_exporter/prometheus 捕获的 kafka-connect jmx 指标?

我正在为 Kafka 生态系统中的 Kafka 连接设置监控.我已经为 kafka 代理启用了 JMX 导出器并且工作正常.现在我正在尝试为 kafka 连接启用 JMX 导出器.但是,从哪里开始有点不清楚. 我只能修改 connect-distributed.sh 以启用更改.任何指针都会是一个很好的补充. kafka-run-class.sh 已修改,使 jmx_exporter ..

如何在 Spring Kafka 中包含用于反序列化的类型元数据

我正在 Spring Kafka 中的 Listener 上进行反序列化.但这假设类型信息是由 Spring Kafka 生产者包含或发送的.在我的例子中,Json 是由 Debezium MySQLConnector 发送的,它没有添加这个元数据.所以我想把它添加到请求中.我了解它放置在 JsonSerializer 中某个位置的请求中,并且我查看了源代码,但无法弄清楚如何在序列化期间将元数据类 ..

Kafka 连接器和架构注册表 - 检索 Avro 架构时出错 - 未找到主题

我有一个最终会有很多不同模式的主题.现在它只有一个.我已经通过 REST 创建了一个连接作业,如下所示: {"name":"com.mycompany.sinks.GcsSinkConnector-auth2",“配置":{"connector.class": "com.mycompany.sinks.GcsSinkConnector","topics": "auth.events","flus ..

如何从Apache Kafka中的远程数据库中提取数据?

我想在 Apache Kafka 中制作实时数据管道.我有位于远程位置的数据库,并且该数据库不断更新.我应该使用哪个 Kafka 连接 API 来从数据库中提取数据并实时摄取到 Kafka 代理中?稍后我将使用 kafka 流和 KSQL 运行临时查询来执行指标. 任何帮助将不胜感激! 解决方案 如果您想创建实时数据管道,您需要使用能够从 MySQL 流式传输更改的变更数据捕获 (C ..
发布时间:2021-11-12 03:29:35 数据库