apache-kafka相关内容

基于APACHE-KAFKA-BINDER的春云流功能模型

这是question的续集。我可以把“普通”的阿帕奇卡夫卡活页夹和功能模型一起使用吗?到目前为止,使用基于注释的配置,我在一个应用程序中混合了spring-cloud-stream-binder-kafka和spring-cloud-stream-binder-kafka-streams两者,spring-cloud-stream-binder-kafka用于简单的使用/生产和spring-clo ..

如何使用Spring Cloud Kafka Stream 3.1创建制片人

我知道如何使用命令式编程方法定义生产者,但我找不到如何使用函数式编程方法定义生产者。 我读了关于这个的Spring Cloud Stream Binder文档,但只找到了如何定义消费者,或者消费者和生产者(例如,从主题中获取信息,转换数据并发送到另一个主题)。 所以,我不知道是否可以继续使用像@Input、@Ouptut这样的批注来定义单个处理器,在这一点上我非常困惑,因为库表明这些批 ..
发布时间:2022-09-24 20:20:18 Java开发

Debezium连接器任务处于未分配状态

今天,%3中的一个节点不同步,已重新启动。 现在,当我检查连接器任务的状态时,它显示为未分配,即使连接器处于运行状态。 工作进程正在分布式模式下运行。 我尝试重新启动连接器,但它仍未分配,并且指向被带回群集中的同一工作节点。 以下是我的其中一个工作进程的属性文件,该文件在所有工作进程中都是相同的: bootstrap.servers=something:9092 group.id ..
发布时间:2022-08-16 16:57:56 其他开发

如何配置Kafka-CONNECT-FILE-PULSE以连续读取文本文件?

我已正确配置了FilePulse,以便当我在阅读文件夹中创建文件时,它会在主题中读取并摄取该文件。 现在我需要连续阅读该文件夹中的每个文件,因为它们正在不断更新。 我必须更改属性文件的任何属性吗? 我的文件PulseTxtFile.properties: name=connect-file-pulse-txt connector.class=io.streamthought ..
发布时间:2022-08-16 16:45:50 其他开发

反弹卡夫卡事件

我计划设置MySQL到Kafka的流程,最终目标是计划一个流程,根据更改的数据重新计算MongoDB文档。 这可能涉及直接修补MongoDB文档,或运行将重新创建整个文档的进程。 我的问题是,如果MySQL数据库的一组更改都与一个MongoDB文档相关,那么我不想为每个更改实时重新运行重新计算过程,我想等待更改‘结算’,以便只在需要时运行重新计算过程。 有没有办法“揭穿”卡夫卡之 ..
发布时间:2022-08-14 13:19:17 其他开发

打造消费者动态春天卡夫卡

我正在创建一个与另一个服务对话的服务,以确定要收听的卡夫卡主题。卡夫卡主题可能有不同的键和值类型。因此,我希望为每个配置(主题、键类型、值类型)动态创建不同的Kafka消费者,其中配置仅在运行时才知道。然而,在春季的Kafka中,我看不到动态传递所有这些参数的方法(至少我不知道有)。我应该如何进行这项工作。 推荐答案 只需在运行时创建新的侦听器容器。 https://docs.s ..
发布时间:2022-08-13 15:40:48 其他开发

如何修复不兼容的类型:org.apache.beam.sdk.options.ValueProvider<;java.lang.String>;无法转换为java.lang.String";

我按照this link创建了一个模板,该模板构建了一个从KafkaIO读取的光束管道。但我总是遇到“不兼容的类型:org.apache.beam.sdk.options.ValueProvider无法转换为java.lang.String”。导致错误的是行“.withBootstrapServers(options.getKafkaServer())”。BEAM版本为2.9.0,以下是我的部分代 ..
发布时间:2022-08-12 19:55:34 Java开发

PYSpark没有打印Kafka流中的任何数据,也没有失败

我是Spark和Kafka的新手。使用从免费Kafka服务器提供商(Cloudkarafka)创建的Kafka服务器来使用数据。在运行pyspark代码(在Databricks上)以使用流数据时,流只是保持初始化,并且不获取任何内容。它既不会失败,也不会停止执行,只是将状态保持为流正在初始化。 代码: from pyspark.sql.functions import col kaf ..

SMT将通过连接器配置创建Kafka连接器字符串分区键

我一直在为PostgreSQL实现一个Kafka连接器(我正在使用debezium Kafka连接器并通过docker运行所有组件)。我需要一个定制分区键,所以我一直使用SMT来实现这一点。但是,我使用的方法创建了一个Struct,并且我需要它是一个字符串。这篇article讲述了如何将分区键设置为int,但我无法访问配置文件来设置适当的转换。目前我的Kafka连接器如下所示 数据-lang=“ ..
发布时间:2022-07-20 19:38:07 其他开发

我们可以在春靴中使用多个卡夫卡模板吗?

在我的Spring Boot Kafka发布应用程序中,我希望提供对以字符串(Json)或字节格式发布消息的支持,因为我希望同时支持json和avro。但是春装中的卡夫卡模板让我们只能定义其中的一个模板。有没有办法同时使用两个模板或任何其他方式来同时支持JSON和Avro? KafkaTemplate只适用于字符串,但我也想发布Avro,它应该类似于Kafka ..
发布时间:2022-07-18 17:13:45 其他开发

Kafka Stream和KTable一对多关系连接

我有一个Kafka流--比如博客和Kafka表--比如那些博客相关的评论。Kafka流中的key可以映射到Kafka表中的多个值,即一个博客可以有多条评论。我想连接这两个对象,并创建一个带有注释ID数组的新对象。但是当我连接时,流只包含最后一个注释id。有没有任何文档或示例代码可以为我指明如何实现这一点的正确方向?基本上,有没有文档详细说明如何使用Kafka流和Kafka表进行一对多关系连接? ..
发布时间:2022-07-17 20:01:45 其他开发

卡夫卡消费者再平衡需要太长时间

我有一个Kafka Streams应用程序,它从几个主题中获取数据,并连接数据并将其放入另一个主题中。 卡夫卡配置: 5 kafka brokers Kafka Topics - 15 partitions and 3 replication factor. 注意:我正在运行Kafka Broker的同一台计算机上运行Kafka Streams应用程序。 每小时消耗/产生数 ..
发布时间:2022-07-17 19:56:55 其他开发