kafka-consumer-api相关内容
我正在使用 FluentD(v.12 最后一个稳定版本)向 Kafka 发送消息.但是 FluentD 使用的是旧的 KafkaProducer,因此记录时间戳始终设置为 -1.因此,我必须使用 WallclockTimestampExtractor 将记录的时间戳设置为消息到达 kafka 的时间点. 我真正感兴趣的时间戳是由 fluentd 在消息中发送的: "timestamp"
..
对于Kafka Streams,如果我们使用较低级别的处理器API,我们可以控制提交与否.因此,如果我们的代码中出现问题,并且我们不想提交此消息.在这种情况下,Kafka 会多次重新传递此消息,直到问题得到解决. 但是在使用其更高级别的流 DSL API 时如何控制是否提交消息? 资源: http://docs.confluent.io/2.1.0-alpha1/streams/
..
我正在研究 kafka 流.我想使用选择性非常低的过滤器(几千分之一)过滤我的流.我在看这个方法:https://kafka.apache.org/0100/javadoc/org/apache/kafka/streams/kstream/KStream.html#filter(org.apache.kafka.streams.kstream.Predicate) 但是我找不到任何证据,如果
..
我已经实现了一个简单的 Kafka 死信记录处理器. 当使用控制台生产者产生的记录时,它工作得很好. 但是,我发现我们的 Kafka Streams 应用程序并不能保证向接收器主题生成记录,即每生成一条记录,偏移量就会增加 1. 死信处理器背景: 我有一个场景,在发布处理记录所需的所有数据之前,可能会收到记录.当流应用程序处理的记录不匹配时,它们将移动到死信主题,而不是继续
..
我知道无法在 Kafka 中对多个分区进行排序,并且分区排序仅对组内的单个使用者(对于单个分区)有保证.但是,使用 Kafka Streams 0.10 现在可以实现这一目标吗?如果我们使用时间戳功能,以便每个分区中的每条消息都保持顺序,那么在消费者方面,可以说使用 Kafka Streams 0.10 现在有可能吗?假设我们收到所有消息,我们是否可以不根据消费的时间戳对所有分区进行排序,然后将它
..
在某些情况下,我使用 Kafka-stream 对主题的小型内存(哈希图)投影进行建模.K,V 缓存确实需要一些操作,因此它不是 GlobalKTable 的好例子.在这样的“缓存"场景中,我希望我所有的兄弟实例都拥有相同的缓存,所以我需要绕过消费者组机制. 要启用此功能,我通常只需使用随机生成的应用程序 ID 启动我的应用程序,因此每个应用程序每次重新启动时都会重新加载主题.唯一需要注意的
..
我最近开始学习 Kafka 并最终解决了这些问题. 消费者和流之间有什么区别?对我来说,如果任何工具/应用程序消费来自 Kafka 的消息,那么它就是 Kafka 世界中的消费者. Stream 有什么不同,因为它也从 Kafka 消费或向 Kafka 生成消息?为什么需要它,因为我们可以编写自己的消费者应用程序使用消费者 API 并根据需要处理它们,还是从消费者应用程序将它们发送到
..
我使用以下形式开发生产 kafka 环境:3 个 ZK 服务器、3 个 Kafka 代理和两个 kafka 连接.我将我的 tmp 文件与我的 kafka 主文件夹并排放置.我在远程 ubuntu 环境中运行它,但不在 docker 中运行. 当我运行我的 kafka 操作时,我遇到错误,通知我的磁盘消耗过多.我检查了我的 kafka tmp 文件夹,发现它的大小几乎是我磁盘大小的 2/3,
..
如何将 Confluent Control Center Interceptor 添加到现有的 S3(Sink) Connector?监视接收器.我正在寻找文档.任何帮助表示赞赏. 解决方案 要绝对清楚,您的接收器上需要拦截器和源.如果不这样做,您将无法使用 Confluent Control Center 监控您的管道,就像现在一样. 要在 Kafka Connect 中启用拦截器
..
我需要找到或编写一个将向请求添加标头字段的 SMT.请求缺少一些类型字段,我想添加它们. 您究竟如何在 SMT 中添加标题,我所看到的只是如下所示的记录转换,但如果我想更改标题或向其中添加字段怎么办? 私有 R applySchemaless(R record) {最终映射value = requireMap(operatingValue(record), 目的);//record.he
..
我在此链接中遵循第 7 步(使用 Kafka Connect 导入/导出数据): http://kafka.apache.org/documentation.html#quickstart 它运行良好,直到我删除了“test.txt"文件.主要是因为这就是 log4j 文件的工作方式.一段时间后,文件将被旋转 - 我的意思是 - 它将被重命名 &将开始写入具有相同名称的新文件.
..
我是Kafka新手,我用kafka通过logstash收集netflow(可以),我想从kafka发送数据到elasticsearch,但是有一些问题. 我的问题是如何将 Kafka 与 Elasticsearch 连接起来?netflow 到 kafka logstash 配置: 输入{UDP{主机 =>“120.127.XXX.XX"端口 =>5556编解码器 =>网络流量}}筛选{}输出
..
我正在尝试使用 Kafka Connect JDBC sink 连接器将数据插入 Oracle,但它抛出了错误.我已经尝试了所有可能的架构配置.下面是例子. 如果我遗漏了以下内容,请指出我的配置文件和错误. 案例 1 - 第一次配置 internal.value.converter.schemas.enable=false . 所以我得到了 [2017-08-28 16:16:2
..
我有一个长期运行的 Spark 结构化流作业,它正在摄取 kafka 数据.我有一个问题如下.如果作业由于某种原因失败并稍后重新启动,如何确保从断点处摄取kafka数据,而不是在作业重新启动时始终摄取当前和以后的数据.我是否需要明确指定诸如消费者组和 auto.offet.reset 等内容?它们是否支持 spark kafka 摄取?谢谢! 解决方案 根据 Spark 结构化集成指南,S
..
我使用以下形式开发生产 kafka 环境:3 个 ZK 服务器、3 个 Kafka 代理和两个 kafka 连接.我将我的 tmp 文件与我的 kafka 主文件夹并排放置.我在远程 ubuntu 环境中运行它,但不在 docker 中运行. 当我运行我的 kafka 操作时,我遇到错误,通知我的磁盘消耗过多.我检查了我的 kafka tmp 文件夹,发现它的大小几乎是我磁盘大小的 2/3,
..
我们有一个使用 org.apache.kafka.clients.consumer.KafkaConsumer 消费 Kafka 消息的 Java 应用程序 我们创建了一个具有 Spring-Kafka 依赖项的 Spring Boot 应用程序,但无法读取新项目中的消息.检查了明显的参数,包括引导服务器的主机名和端口(日志显示可以识别)、组、主题以及 Spring Boot 与原始使用者一样
..
我有一个 kafka 消费者,我正在从该消费者中消费来自特定主题的数据,但我看到以下异常.我使用的是 0.10.0.0 kafka 版本. LoggingCommitCallback.onComplete: Commit failed for offsets= {....}, eventType= some_type, timetaken= 19ms, error= org.apache.kaf
..
当我使用Kafka的Java API时,如果我让我的主线程睡眠少于2000ns,它无法产生任何消息.我真的很想知道为什么会发生这种情况? 这是我的制作人: 公共类生产者{私人最终 KafkaProducer生产者;私人最终字符串主题;公共生产者(字符串主题,字符串 [] args){//......//......生产者 = 新的 KafkaProducer(props);this.t
..
我已经看到一个类似的问题 clickhere 但我仍然想知道是否不可能从特定分区流式传输数据?我在 Spark Streaming 订阅方法中使用了 Kafka Consumer Strategies. ConsumerStrategies.Subscribe[String, String](topics, kafkaParams,偏移量) 这是我尝试订阅主题和分区的代码片段,
..
我是 kafka 的新手,我使用 Apache kafka 消费者来读取来自生产者的消息.但是当我停止并开始一段时间时.之间产生的所有消息都丢失了.如何处理这种情况.我正在使用这些属性“auto.offset.reset"、“latest"和“enable.auto.commit"、“false". 这是我正在使用的代码.感谢任何帮助. Properties props = new Pro
..