apache-kafka-connect相关内容

如何使用Kafka与JDBC接收器连接,并使用Python进行源连接

我要从一个系统实时传输到另一个系统。 我正在使用kafka-python,可以在本地直播。 计算出连接器将处理多个设备。有没有人能给我提个建议,让我用连接符来实现它? 推荐答案 Kafka Connect是Java框架,不是Python. Kafka Connect运行REST API,您可以使用urllib3或requests与其交互,而不是kafka-python ..
发布时间:2022-05-26 09:07:16 其他开发

Kafka 接收器连接器:即使重启后也未分配任务

我在一组 Docker 容器中使用 Confluent 3.2,其中一个正在运行 kafka-connect worker. 由于我不清楚的原因,我的四个连接器中的两个 - 具体来说,hpgraphsl 的 MongoDB接收器连接器 - 停止工作.我能够确定主要问题:连接器没有分配任何任务,这可以通过调用 GET/connectors/{my_connector}/status 看到.其他 ..

Kafka 流 - 第一个示例 WordCount 未正确计算第一圈

我正在研究 Kafka Streams,但我对 Java 8 中 WordCount 的第一个示例有疑问,该示例取自文档. 使用最新版本的 kafka 流、Kafka Connect 和 WordCount lambda 表达式示例. 我遵循以下步骤:我在 Kafka 中创建了一个输入主题和一个输出主题.启动应用程序流,然后通过从 .txt 文件中插入一些单词来上传输入主题 在第 ..

Kafka Streams 表转换

我在 SQL Server 中有一个表,我想将其流式传输到 Kafka 主题,结构如下: (用户 ID,报告 ID) 此表将不断更改(添加、插入、不更新记录) 我想把它转换成这种结构并放到 Elasticsearch 中: {“用户ID":1,“报告":[1, 2, 3, 4, 5, 6]} 到目前为止我看到的例子是日志或点击流,但在我的情况下不起作用. 这种用例可能吗?我总是 ..

在 HDFS 上查找数据的 Kafka Streams

我正在使用 Kafka Streams (v0.10.0.1) 编写一个应用程序,并希望使用查找数据来丰富我正在处理的记录.这些数据(带时间戳的文件)每天(或每天 2-3 次)写入 HDFS 目录. 如何在 Kafka Streams 应用程序中加载它并加入实际的 KStream? 当新文件到达那里时,从 HDFS 重新读取数据的最佳做法是什么? 还是切换到 Kafka Connec ..

如何将记录拆分为不同的流,从一个主题到不同的流?

我有一个包含不同大小记录的单一源 CSV 文件,这些记录将每条记录推送到一个源主题中.我想从该源主题将记录拆分为不同的 KStreams/KTables.我有一个用于一个表加载的管道,我将记录从源主题以分隔格式推送到 stream1,然后将记录推送到另一个 AVRO 格式的流,然后推送到 JDBC 接收器连接器,将记录推送到 MySQL 数据库.管道需要相同.但是我想将不同表的记录推送到一个源主题 ..

如何处理kafka KStream并直接写入数据库而不是向其发送另一个主题

我不想把经过处理的KStream写到另一个主题,我想直接把丰富的KStream写到数据库中.我应该如何进行? 解决方案 您可以实现一个自定义 Processor 来打开一个数据库连接并通过 KStream#process().参见https://docs.confluent.io/current/streams/developer-guide/dsl-api.html#applying-p ..

为什么我的 kafka tmp 文件夹的大小几乎与磁盘大小相同?

我使用以下形式开发生产 kafka 环境:3 个 ZK 服务器、3 个 Kafka 代理和两个 kafka 连接.我将我的 tmp 文件与我的 kafka 主文件夹并排放置.我在远程 ubuntu 环境中运行它,但不在 docker 中运行. 当我运行我的 kafka 操作时,我遇到错误,通知我的磁盘消耗过多.我检查了我的 kafka tmp 文件夹,发现它的大小几乎是我磁盘大小的 2/3, ..

Confluent 控制中心拦截器

如何将 Confluent Control Center Interceptor 添加到现有的 S3(Sink) Connector?监视接收器.我正在寻找文档.任何帮助表示赞赏. 解决方案 要绝对清楚,您的接收器上需要拦截器和源.如果不这样做,您将无法使用 Confluent Control Center 监控您的管道,就像现在一样. 要在 Kafka Connect 中启用拦截器 ..

kafka-connect-jdbc:SQLException:仅在使用分布式模式时没有合适的驱动程序

我们已经成功地使用了 mySQL - 使用 jdbc 独立连接器的 kafka 数据摄取,但现在面临在分布式模式下使用相同的问题(作为 kafka 连接服务). connect-distributed.properties 文件- bootstrap.servers=IP1:9092,IP2:9092group.id=连接集群key.converter.schemas.enable=tru ..

Kafka Connect FileStreamSource 忽略附加行

我正在开发一个使用 Spark 处理日志的应用程序,我想使用 Kafka 作为从日志文件流式传输数据的一种方式.基本上我有一个日志文件(在本地文件系统上),它会不断更新新日志,而 Kafka Connect 似乎是从文件中获取数据以及新附加行的完美解决方案. 我使用以下命令以默认配置启动服务器: Zookeeper 服务器: zookeeper-server-start.sh c ..
发布时间:2021-11-12 03:32:22 其他开发