apache-kafka-connect相关内容
我在 Kafka Connect 中设置了一个 JDBC Oracle 源连接器,我在 Oracle 表中有一个时间戳列,其值设置为日期格式“21-MAR-18 05.39.44.0194990 AM". Oracle表中的所有数据都被抓取到Kafka主题中,但是oracle表中日期格式的列(时间戳列)的值被转换为毫秒并在主题中显示为毫秒值.我希望收到 Oracle 表中的日期格式. {“
..
我正在使用 Conluent Platform/Kafka Connect 和类似的东西,我想运行几个例子. 我从这里关注了快速入门.这意味着: 安装 Confluent Platform (v3.2.1) 运行 Zookeeper、Kafka Broker 和 Schema Register 运行读取文件数据的示例(witk Kafka Connect) 我运行了这个命令(
..
根据 Kafka 文档 连接器配置是简单的键值映射.对于独立mode 这些在属性文件中定义并传递给 Connect在命令行上处理. 大多数配置都依赖于连接器,因此无法对其进行概述这里.但是,有一些常见的选项: name - 连接器的唯一名称.尝试使用相同名称再次注册将失败. 我有 10 个连接器以这样的独立模式运行: bin/connect-standalone.sh confi
..
过去几周,我们一直致力于将 Kafka Connect 添加到我们的数据平台,并认为这将是将数据从 Kafka 提取到 S3 数据湖的有用方法.我们使用了 FieldPartitioner 和 TimeBasePartitioner,并看到了一些相当不错的结果. 我们还需要按用户 id 进行分区 - 但是尝试在用户 id 字段上使用 FieldPartitioner 时,连接器非常慢 - 特
..
以 JSON 格式从 Kafka 生产/消费.使用以下属性以 JSON 格式保存到 HDFS: key.converter=org.apache.kafka.connect.json.JsonConvertervalue.converter=org.apache.kafka.connect.json.JsonConverterkey.converter.schemas.enable=falsev
..
我为 3 节点 Kafka 集群设置了 ACL,并且能够通过生产者控制台和消费者控制台发送和接收主题.现在我想用 ACL 配置 Kafka 连接.我尝试使用 SASL_PLAINTEXT 组合,并在 connect.log 文件中显示以下错误.它没有从源表同步到主题,请帮助我缺少任何配置. 错误日志 [2020-10-14 07:24:35,874] ERROR WorkerSourceT
..
我在 AWS RDS 上有一个 Postgres Db 和一个在桌子上监听的 kafka 连接连接器 (Debezium Postgres).连接器的配置: {"name": "我的连接器",“配置":{"connector.class": "io.debezium.connector.postgresql.PostgresConnector","database.dbname": "my_db
..
这个问题跟在这个问题之后. 主要任务是在 KSQL 端进行连接.下面的例子将说明它.事件消息到达 Kafka 主题.该消息的结构: [{"name": "from_ts",“类型":“bigint"},{"name": "to_ts",“类型":“bigint"},{"name": "rulenode_id",“类型":“整数"}] 还有一个 Postgres 表 rulenode:
..
我们目前正在尝试使用 Debezium 来捕获 Postgres 数据库中 4 个表的更改.我们目前知道,对于这个用例,我们可以使用 kafka-streams 应用程序来加入/聚合 KTable 的表,但是我们希望保持 kafka-stream 拓扑简单,因此我们的想法是使用来自 Postgres 的物化视图和捕捉它的变化. 是否可以这样做,如果可以,我们应该如何配置它? Kafka
..
我正在尝试使用带有 rabbitMQ 连接器的 kafka-connect 执行概念验证.基本上,我有两个简单的 Spring Boot 应用程序;一个 RabbitMQ 生产者和一个 Kafka 消费者.消费者无法处理来自连接器的消息,因为它以某种方式转换了我的 JSON 消息;RabbitMQ 发送 {"transaction": "PAYMENT", "amount": "$125.0"}
..
我想通过 jdbc sink 批量读取 5000 条记录,为此我使用了 jdbc sink 配置文件中的 batch.size: name=jdbc-sinkconnector.class=io.confluent.connect.jdbc.JdbcSinkConnector任务.max=1批量大小=5000主题=postgres_usersconnection.url=jdbc:postgre
..
我正在为 Kafka 生态系统中的 Kafka 连接设置监控.我已经为 kafka 代理启用了 JMX 导出器并且工作正常.现在我正在尝试为 kafka 连接启用 JMX 导出器.但是,从哪里开始有点不清楚. 我只能修改 connect-distributed.sh 以启用更改.任何指针都会是一个很好的补充. kafka-run-class.sh 已修改,使 jmx_exporter
..
我正在 Spring Kafka 中的 Listener 上进行反序列化.但这假设类型信息是由 Spring Kafka 生产者包含或发送的.在我的例子中,Json 是由 Debezium MySQLConnector 发送的,它没有添加这个元数据.所以我想把它添加到请求中.我了解它放置在 JsonSerializer 中某个位置的请求中,并且我查看了源代码,但无法弄清楚如何在序列化期间将元数据类
..
我有一个最终会有很多不同模式的主题.现在它只有一个.我已经通过 REST 创建了一个连接作业,如下所示: {"name":"com.mycompany.sinks.GcsSinkConnector-auth2",“配置":{"connector.class": "com.mycompany.sinks.GcsSinkConnector","topics": "auth.events","flus
..
我想在 Apache Kafka 中制作实时数据管道.我有位于远程位置的数据库,并且该数据库不断更新.我应该使用哪个 Kafka 连接 API 来从数据库中提取数据并实时摄取到 Kafka 代理中?稍后我将使用 kafka 流和 KSQL 运行临时查询来执行指标. 任何帮助将不胜感激! 解决方案 如果您想创建实时数据管道,您需要使用能够从 MySQL 流式传输更改的变更数据捕获 (C
..
我已经安装了 confluent_3.3.0 并启动了 zookeper、schema-registry 和 kafka broker.我还从 此链接下载了 mongodb 连接器. 说明:我正在使用以下命令运行接收器连接器: ./bin/connect-standalone etc/kafka/connect-standalone.properties/home/username/m
..
我创建了一个将数据转换为其他存储的接收器 kafka 连接;当使用 kafka connect rest api 创建新连接器时,我想将 auto.offset.reset 设置为 latest;我在配置中设置了 consumer.auto.offset.reset: latest; json{"name": "test_v14",“配置":{"name": "test_v14","cons
..
我想将一个主题从 avro 中的 kafka 索引到 elasticsearch 格式,但是我的时间戳字段有问题无法识别elasticsearch 作为日期格式字段. 我对连接器使用了以下配置. {"name": "es-sink-barchart-10",“配置":{"connector.class": "io.confluent.connect.elasticsearch.Elast
..
connect-standalone.properties connector.class=io.confluent.connect.jdbc.JdbcSourceConnectorbootstrap.servers=10.33.62.20:9092,10.33.62.110:9092,10.33.62.200:9092key.converter=org.apache.kafka.connect
..
我正在尝试为 Google Cloud PubSub 服务配置 Sink Kafka Connect. 使用以下命令配置Kafka Connect: 卷曲-X POST-H '内容类型:应用程序/json'-H '接受:应用程序/json' -d '{ "name": "pubsub_test","config": { "connector.class": "com.google.pubs
..