apache-kafka-connect相关内容

Kafka Streams 表转换

我在 SQL Server 中有一个表,我想将其流式传输到 Kafka 主题,结构如下: (用户 ID,报告 ID) 此表将不断更改(添加、插入、不更新记录) 我想把它转换成这种结构并放到 Elasticsearch 中: {“用户ID":1,“报告":[1, 2, 3, 4, 5, 6]} 到目前为止我看到的例子是日志或点击流,但在我的情况下不起作用. 这种用例可能吗?我总是 ..

Kafka Connect 找不到开发的插件类

我为实现 SinkConnector 的 kafka connect 创建了一个插件,我使用 gradle jar 任务将其打包到一个 jar 中: jar {档案名称='名称.jar'} 我将它复制到文件夹中的 kafka 集群中,并设置 CLASSPATH=我的 jar 所在的位置.然后我执行 kafka 脚本来启动独立连接,它给我一个错误,说我的类找不到: [2017-07-25 05 ..
发布时间:2021-11-12 03:27:14 Java开发

我们如何重置与 Kafka Connect 源连接器关联的状态?

我们正在使用 Kafka Connect 2.5. 我们正在使用 Confluent JDBC 源连接器(虽然我认为这个问题主要与连接器类型无关)并且正在将 IBM DB2 数据库中的一些数据用于主题,使用“递增模式"(主键)作为每条记录的唯一 ID. 这在正常的事件过程中工作正常;连接器第一次启动时,所有记录都会被消耗并放置在一个主题上,然后,当添加新记录时,它们会被添加到我们的主题 ..

K8s 上的广告 kafka 连接休息监听器

我正在努力以分布式模式在 Kubernetes (DockerEE) 上设置 Kafka Connect. 目前我在三个相应的 k8s-pods 上有一个由三个工人组成的集群. 我面临的问题是我的员工很难相互沟通(至少我是这么认为的). 当我尝试启动连接器时,我得到: {"error_code":409,"message":"由于操作冲突(例如worker rebalance),无法完 ..
发布时间:2021-11-12 03:27:09 其他开发

Kafka Connect 用数组字段展平 postgres 记录的转换

我有一个使用 Kafka Connect 连接到 Kafka 的 postgres 数据库,以便将 CDC 事件放在一个主题上.我们使用扁平化转换作为共享配置的一部分: flattenKey: "org.apache.kafka.connect.transforms.Flatten$Key" 表中的一列属于 ARRAY 类型,因此在尝试应用转换时出现异常: Flatten 转换不支持 ARR ..

如何使用 Debezium 从 MS SQL 将 250 个表摄取到 Kafka

嗨,我尝试在 PostgreSQL 之间构建 Kafka 连接管道作为源到 SQL Server 作为目标.我使用了 3 个 Kafka broker,需要消费 252 个主题(一个主题与一张 PostgreSQL 表相同).运行一个多小时后,252张表中只能拉出218张.我发现的错误是 SQL Server 中存在死锁机制,可以将事务保存到 SQL Server 并尝试重试,Debezium 复 ..
发布时间:2021-11-12 03:26:58 数据库

在多个节点上运行kafka connect分布式模式

我正在测试一个 kafka 连接器的弹性,我想在它运行时杀死一个工人,从而杀死连接器实例.最简单的方法可能是强制分布式模式在多个节点上运行,然后杀死该节点上的工作进程(对吧?).我怎样才能让 Kafka 连接在不仅仅是它启动的节点上产生工作人员?这是在工作配置中定义的吗? 解决方案 所以最后我做的是: 将 Kafka Connect 分布式模式所需的所有 jar 复制到我想要运行它的 ..
发布时间:2021-11-12 03:26:55 其他开发

如何根据连接器名称获取 Kafka 源连接器架构

我正在使用 Confluent JDBC Kafka 连接器将消息发布到主题中.源连接器将在每次轮询时将数据与模式一起发送到主题.我想检索这个模式. 有可能吗?如何?谁能推荐我 我的目的是基于 Kafka 连接器在轮询时构建的架构创建 KSQL 流或表. 解决方案 最好的方法是使用 Avro,其中架构单独存储,并由 Kafka Connect 和 KSQL 自动使用. 您 ..

具有 json 架构的 Kafka jdbc 接收器连接器不起作用

使用最新的 kafka 和 confluent jdbc sink 连接器.发送一条非常简单的 Json 消息: {“架构":{“类型":“结构",“领域":[{“类型":“整数",“可选":假,“字段":“ID"},{“类型":“字符串",“可选":真,“字段":“味精"}],“可选":假,"name": "msgschema"},“有效载荷":{“身份证":222,"msg": "你好"}} ..
发布时间:2021-11-12 03:26:47 其他开发

使用 kafka-connect 从多个主题更新到多个表

我正在尝试使用 JDBC 接收器连接器读取 2 个 kafka 主题并将其更新插入到我手动创建的 2 个 Oracle 表中.每个表都有 1 个主键,我想在 upsert 模式下使用它.如果我仅用于 1 个主题且 pk.fields 中只有 1 个字段,则连接器工作正常,但如果我在 pk.fields 中输入多个列,则每个表中的一个列无法识别架构.我是否遗漏了任何东西,请提出建议. name=o ..

如何更改“kafka connect"组件端口?

在端口 8083 上,我正在运行 Influxdb,我什至在 http://localhost:8083 上获得了 GUI 现在来到 kafka,这里我指的是按照 https://kafka.apache.org/进行的设置快速入门 通过命令启动文件夹/opt/zookeeper-3.4.10中的zookeeeper:bin/zkServer.sh start 所以zookeep ..
发布时间:2021-11-12 03:26:41 其他开发

在 HDFS 上查找数据的 Kafka Streams

我正在使用 Kafka Streams (v0.10.0.1) 编写一个应用程序,并希望使用查找数据来丰富我正在处理的记录.这些数据(带时间戳的文件)每天(或每天 2-3 次)写入 HDFS 目录. 如何在 Kafka Streams 应用程序中加载它并加入实际的 KStream? 当新文件到达那里时,从 HDFS 重新读取数据的最佳做法是什么? 还是切换到 Kafka Connec ..

Kafka Leader选举什么时候举行?

Kafka High Level Producer 何时以及多久选举一次领导人?是在发送每条消息之前执行还是仅在创建连接时执行一次? 解决方案 每个 Broker 都有一个关于主题列表(和分区)及其领导者的信息,每当新领导者出现时,动物园管理员都会更新这些信息选择或分区数量发生变化时. 因此,当生产者调用其中一个代理时,它会使用此信息列表进行响应.一旦生产者收到此信息,它就会缓存它并 ..

如何在没有 Confluent 的情况下为 Cassandra 使用 Kafka Connect

我们如何在不使用 Confluent 框架的情况下将 Kafka Connect 与 Cassandra 结合使用. 解决方案 Kafka Connect 是 框架.Confluent 只提供连接器.如果您不想使用 Confluent 开源(但为什么不呢?),您也可以将所有这些连接器与 vanilla Apache Kafka 一起使用. 有多个可用的卡桑德拉连接器:https:// ..
发布时间:2021-11-12 03:26:30 其他开发