kafka-consumer-api相关内容

Kafka Streams:如何更改记录时间戳(0.11.0)?

我正在使用 FluentD(v.12 最后一个稳定版本)向 Kafka 发送消息.但是 FluentD 使用的是旧的 KafkaProducer,因此记录时间戳始终设置为 -1.因此,我必须使用 WallclockTimestampExtractor 将记录的时间戳设置为消息到达 kafka 的时间点. 我真正感兴趣的时间戳是由 fluentd 在消息中发送的: "timestamp" ..

使用 Kafka Streams DSL 时如何处理错误和不提交

对于Kafka Streams,如果我们使用较低级别的处理器API,我们可以控制提交与否.因此,如果我们的代码中出现问题,并且我们不想提交此消息.在这种情况下,Kafka 会多次重新传递此消息,直到问题得到解决. 但是在使用其更高级别的流 DSL API 时如何控制是否提交消息? 资源: http://docs.confluent.io/2.1.0-alpha1/streams/ ..

Kafka Streams 在生成主题时不会将偏移量增加 1

我已经实现了一个简单的 Kafka 死信记录处理器. 当使用控制台生产者产生的记录时,它工作得很好. 但是,我发现我们的 Kafka Streams 应用程序并不能保证向接收器主题生成记录,即每生成一条记录,偏移量就会增加 1. 死信处理器背景: 我有一个场景,在发布处理记录所需的所有数据之前,可能会收到记录.当流应用程序处理的记录不匹配时,它们将移动到死信主题,而不是继续 ..

Kafka多分区排序

我知道无法在 Kafka 中对多个分区进行排序,并且分区排序仅对组内的单个使用者(对于单个分区)有保证.但是,使用 Kafka Streams 0.10 现在可以实现这一目标吗?如果我们使用时间戳功能,以便每个分区中的每条消息都保持顺序,那么在消费者方面,可以说使用 Kafka Streams 0.10 现在有可能吗?假设我们收到所有消息,我们是否可以不根据消费的时间戳对所有分区进行排序,然后将它 ..

Kafka:删除空闲的消费者组ID

在某些情况下,我使用 Kafka-stream 对主题的小型内存(哈希图)投影进行建模.K,V 缓存确实需要一些操作,因此它不是 GlobalKTable 的好例子.在这样的“缓存"场景中,我希望我所有的兄弟实例都拥有相同的缓存,所以我需要绕过消费者组机制. 要启用此功能,我通常只需使用随机生成的应用程序 ID 启动我的应用程序,因此每个应用程序每次重新启动时都会重新加载主题.唯一需要注意的 ..

Kafka:消费者 API 与 Streams API

我最近开始学习 Kafka 并最终解决了这些问题. 消费者和流之间有什么区别?对我来说,如果任何工具/应用程序消费来自 Kafka 的消息,那么它就是 Kafka 世界中的消费者. Stream 有什么不同,因为它也从 Kafka 消费或向 Kafka 生成消息?为什么需要它,因为我们可以编写自己的消费者应用程序使用消费者 API 并根据需要处理它们,还是从消费者应用程序将它们发送到 ..

为什么我的 kafka tmp 文件夹的大小几乎与磁盘大小相同?

我使用以下形式开发生产 kafka 环境:3 个 ZK 服务器、3 个 Kafka 代理和两个 kafka 连接.我将我的 tmp 文件与我的 kafka 主文件夹并排放置.我在远程 ubuntu 环境中运行它,但不在 docker 中运行. 当我运行我的 kafka 操作时,我遇到错误,通知我的磁盘消耗过多.我检查了我的 kafka tmp 文件夹,发现它的大小几乎是我磁盘大小的 2/3, ..

Confluent 控制中心拦截器

如何将 Confluent Control Center Interceptor 添加到现有的 S3(Sink) Connector?监视接收器.我正在寻找文档.任何帮助表示赞赏. 解决方案 要绝对清楚,您的接收器上需要拦截器和源.如果不这样做,您将无法使用 Confluent Control Center 监控您的管道,就像现在一样. 要在 Kafka Connect 中启用拦截器 ..

Kafka Connect SMT 添加 Kafka 标头字段

我需要找到或编写一个将向请求添加标头字段的 SMT.请求缺少一些类型字段,我想添加它们. 您究竟如何在 SMT 中添加标题,我所看到的只是如下所示的记录转换,但如果我想更改标题或向其中添加字段怎么办? 私有 R applySchemaless(R record) {最终映射value = requireMap(operatingValue(record), 目的);//record.he ..

Kafka 连接教程停止工作

我在此链接中遵循第 7 步(使用 Kafka Connect 导入/导出数据): http://kafka.apache.org/documentation.html#quickstart 它运行良好,直到我删除了“test.txt"文件.主要是因为这就是 log4j 文件的工作方式.一段时间后,文件将被旋转 - 我的意思是 - 它将被重命名 &将开始写入具有相同名称的新文件. ..

Kafka Connect JDBC 接收器连接器不起作用

我正在尝试使用 Kafka Connect JDBC sink 连接器将数据插入 Oracle,但它抛出了错误.我已经尝试了所有可能的架构配置.下面是例子. 如果我遗漏了以下内容,请指出我的配置文件和错误. 案例 1 - 第一次配置 internal.value.converter.schemas.enable=false . 所以我得到了 [2017-08-28 16:16:2 ..

如何通过 Spark Structured Streaming 确保 kafka 数据摄取不会丢失数据?

我有一个长期运行的 Spark 结构化流作业,它正在摄取 kafka 数据.我有一个问题如下.如果作业由于某种原因失败并稍后重新启动,如何确保从断点处摄取kafka数据,而不是在作业重新启动时始终摄取当前和以后的数据.我是否需要明确指定诸如消费者组和 auto.offet.reset 等内容?它们是否支持 spark kafka 摄取?谢谢! 解决方案 根据 Spark 结构化集成指南,S ..

为什么我的 kafka tmp 文件夹的大小几乎与磁盘大小相同?

我使用以下形式开发生产 kafka 环境:3 个 ZK 服务器、3 个 Kafka 代理和两个 kafka 连接.我将我的 tmp 文件与我的 kafka 主文件夹并排放置.我在远程 ubuntu 环境中运行它,但不在 docker 中运行. 当我运行我的 kafka 操作时,我遇到错误,通知我的磁盘消耗过多.我检查了我的 kafka tmp 文件夹,发现它的大小几乎是我磁盘大小的 2/3, ..

无法在 Spring Boot 中使用 Kafka 消息

我们有一个使用 org.apache.kafka.clients.consumer.KafkaConsumer 消费 Kafka 消息的 Java 应用程序 我们创建了一个具有 Spring-Kafka 依赖项的 Spring Boot 应用程序,但无法读取新项目中的消息.检查了明显的参数,包括引导服务器的主机名和端口(日志显示可以识别)、组、主题以及 Spring Boot 与原始使用者一样 ..

主线程睡眠少于 1000 时无法产生消息

当我使用Kafka的Java API时,如果我让我的主线程睡眠少于2000ns,它无法产生任何消息.我真的很想知道为什么会发生这种情况? 这是我的制作人: 公共类生产者{私人最终 KafkaProducer生产者;私人最终字符串主题;公共生产者(字符串主题,字符串 [] args){//......//......生产者 = 新的 KafkaProducer(props);this.t ..
发布时间:2021-11-12 03:21:31 其他开发

使用 Spark 从 Kafka 主题中的特定分区流式传输数据

我已经看到一个类似的问题 clickhere 但我仍然想知道是否不可能从特定分区流式传输数据?我在 Spark Streaming 订阅方法中使用了 Kafka Consumer Strategies. ConsumerStrategies.Subscribe[String, String](topics, kafkaParams,偏移量) 这是我尝试订阅主题和分区的代码片段, ..

Apache kafka 消费者停止和启动之间丢失的消息

我是 kafka 的新手,我使用 Apache kafka 消费者来读取来自生产者的消息.但是当我停止并开始一段时间时.之间产生的所有消息都丢失了.如何处理这种情况.我正在使用这些属性“auto.offset.reset"、“latest"和“enable.auto.commit"、“false". 这是我正在使用的代码.感谢任何帮助. Properties props = new Pro ..
发布时间:2021-11-12 03:21:06 其他开发