apache-kafka相关内容
这是question的续集。我可以把“普通”的阿帕奇卡夫卡活页夹和功能模型一起使用吗?到目前为止,使用基于注释的配置,我在一个应用程序中混合了spring-cloud-stream-binder-kafka和spring-cloud-stream-binder-kafka-streams两者,spring-cloud-stream-binder-kafka用于简单的使用/生产和spring-clo
..
我知道如何使用命令式编程方法定义生产者,但我找不到如何使用函数式编程方法定义生产者。 我读了关于这个的Spring Cloud Stream Binder文档,但只找到了如何定义消费者,或者消费者和生产者(例如,从主题中获取信息,转换数据并发送到另一个主题)。 所以,我不知道是否可以继续使用像@Input、@Ouptut这样的批注来定义单个处理器,在这一点上我非常困惑,因为库表明这些批
..
我的Customer类已使用maven-avro插件创建。当我尝试运行此程序时,收到的错误为Exception in thread "main" java.lang.IllegalStateException: Expecting type to be a PojoTypeInfo [main] INFO org.apache.flink.api.java.typeutils.TypeExt
..
今天,%3中的一个节点不同步,已重新启动。 现在,当我检查连接器任务的状态时,它显示为未分配,即使连接器处于运行状态。 工作进程正在分布式模式下运行。 我尝试重新启动连接器,但它仍未分配,并且指向被带回群集中的同一工作节点。 以下是我的其中一个工作进程的属性文件,该文件在所有工作进程中都是相同的: bootstrap.servers=something:9092 group.id
..
编辑问题: 尝试配置debezium MySQL Kafka连接器,以 https://debezium.io/documentation/reference/stable/connectors/mysql.html#mysql-example-configuration 我有: 主机名:";ec2-xxx.compute.amazonaws.com";
..
我已正确配置了FilePulse,以便当我在阅读文件夹中创建文件时,它会在主题中读取并摄取该文件。 现在我需要连续阅读该文件夹中的每个文件,因为它们正在不断更新。 我必须更改属性文件的任何属性吗? 我的文件PulseTxtFile.properties: name=connect-file-pulse-txt connector.class=io.streamthought
..
我计划设置MySQL到Kafka的流程,最终目标是计划一个流程,根据更改的数据重新计算MongoDB文档。 这可能涉及直接修补MongoDB文档,或运行将重新创建整个文档的进程。 我的问题是,如果MySQL数据库的一组更改都与一个MongoDB文档相关,那么我不想为每个更改实时重新运行重新计算过程,我想等待更改‘结算’,以便只在需要时运行重新计算过程。 有没有办法“揭穿”卡夫卡之
..
我正在创建一个与另一个服务对话的服务,以确定要收听的卡夫卡主题。卡夫卡主题可能有不同的键和值类型。因此,我希望为每个配置(主题、键类型、值类型)动态创建不同的Kafka消费者,其中配置仅在运行时才知道。然而,在春季的Kafka中,我看不到动态传递所有这些参数的方法(至少我不知道有)。我应该如何进行这项工作。 推荐答案 只需在运行时创建新的侦听器容器。 https://docs.s
..
我按照this link创建了一个模板,该模板构建了一个从KafkaIO读取的光束管道。但我总是遇到“不兼容的类型:org.apache.beam.sdk.options.ValueProvider无法转换为java.lang.String”。导致错误的是行“.withBootstrapServers(options.getKafkaServer())”。BEAM版本为2.9.0,以下是我的部分代
..
我是Spark和Kafka的新手。使用从免费Kafka服务器提供商(Cloudkarafka)创建的Kafka服务器来使用数据。在运行pyspark代码(在Databricks上)以使用流数据时,流只是保持初始化,并且不获取任何内容。它既不会失败,也不会停止执行,只是将状态保持为流正在初始化。 代码: from pyspark.sql.functions import col kaf
..
我正在尝试每隔500 ms创建一个JSON数据集,并希望将其推送到Kafka主题,以便我可以在下游设置一些窗口并执行计算。以下是我的代码: package KafkaAsSource import org.apache.flink.api.common.serialization.SimpleStringSchema import org.apache.flink.streaming.ap
..
由于下面的错误,我无法启动Kafka服务器。 java.io.IOException: Map failed at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:940) at kafka.log.AbstractIndex.(AbstractIndex.scala:61) at kafka.log
..
我一直在为PostgreSQL实现一个Kafka连接器(我正在使用debezium Kafka连接器并通过docker运行所有组件)。我需要一个定制分区键,所以我一直使用SMT来实现这一点。但是,我使用的方法创建了一个Struct,并且我需要它是一个字符串。这篇article讲述了如何将分区键设置为int,但我无法访问配置文件来设置适当的转换。目前我的Kafka连接器如下所示 数据-lang=“
..
在我的Spring Boot Kafka发布应用程序中,我希望提供对以字符串(Json)或字节格式发布消息的支持,因为我希望同时支持json和avro。但是春装中的卡夫卡模板让我们只能定义其中的一个模板。有没有办法同时使用两个模板或任何其他方式来同时支持JSON和Avro? KafkaTemplate只适用于字符串,但我也想发布Avro,它应该类似于Kafka
..
我希望使用Spring Kafka API实现有状态监听器。 提供以下信息: ConCurrentKafkaListenerContainerFactory,并发设置为“n” Spring@Service类上的@KafkaListener批注方法 然后创建“n”个KafkaMessageListenerContainers。它们中的每一个都将有自己的KafkaConsumer,
..
我正在尝试将StreamsUncaughtExceptionHandler添加到我的Kafka流处理器中。该处理器是用Kafka函数编写的。我查看了suggestion provided by Artem Bilan以将StreamsUncaughtExceptionHandler包括到我的服务中,但我的异常从未被它捕获/处理。 配置Bean: @Autowired UnCaughtE
..
默认情况下,.windowedBy(SessionWindows.with(Duration.ofSeconds(60))为每个传入记录返回一条记录。 结合使用.count()和.filter()可以轻松检索第一条记录。 使用 .suppress(Suppressed.untilWindowCloses(unbounded()))还可以轻松检索最后一条记录。 所以…我做了两次处理
..
我正在尝试Kafka Streams的字数统计问题。我正在使用带有Scala版本2.11.12和SBT版本1.1.4的Kafka 1.1.0。我收到以下错误: Exception in thread "wordcount-application-d81ee069-9307-46f1-8e71-c9f777d2db64-StreamThread-1" java.lang.Unsatisfied
..
我有一个Kafka流--比如博客和Kafka表--比如那些博客相关的评论。Kafka流中的key可以映射到Kafka表中的多个值,即一个博客可以有多条评论。我想连接这两个对象,并创建一个带有注释ID数组的新对象。但是当我连接时,流只包含最后一个注释id。有没有任何文档或示例代码可以为我指明如何实现这一点的正确方向?基本上,有没有文档详细说明如何使用Kafka流和Kafka表进行一对多关系连接?
..
我有一个Kafka Streams应用程序,它从几个主题中获取数据,并连接数据并将其放入另一个主题中。 卡夫卡配置: 5 kafka brokers Kafka Topics - 15 partitions and 3 replication factor. 注意:我正在运行Kafka Broker的同一台计算机上运行Kafka Streams应用程序。 每小时消耗/产生数
..