offset相关内容
示例表:https://docs.google.com/spreadsheets/d/14ma-y3esh1S_EkzHpFBvLb0GzDZZiDsSVXFktH3Rr_E/edit?usp=sharing 在ItemData表的B列中,我通过将公式复制到列中的每个单元格中获得了我想要的结果,但我想改用ArrayFormula来解决这个问题. 在 C 列中,我使用 ArrayForm
..
我更改了用于 Google 地图上marker 的图像.新图像比旧图像宽得多,我注意到标记与 lat 和 lng 对齐,因此标记位于 lat 和 lng 的水平中点上方code>lat 和 lng.这不是我想要的,我想让 lat 和 lng 与标记左侧对齐 - 我想偏移 marker从默认位置向右约 80 像素. 解决方案 试试这个尺寸. var markerImage = new go
..
我正在使用 zookeeper 从 kafka 获取数据.在这里我总是从最后一个偏移点获取数据.有什么办法可以指定偏移时间来获取旧数据吗? 有一个选项 autooffset.reset.它接受最小或最大.有人可以解释什么是最小和最大.autooffset.reset 可以帮助从旧偏移点而不是最新偏移点获取数据吗? 解决方案 消费者始终属于一个组,对于每个分区,Zookeeper 会跟
..
Spark 2.2 引入了 Kafka 的结构化流媒体源.据我了解,它依赖于 HDFS 检查点目录来存储偏移量并保证“恰好一次"消息传递. 但是旧码头(例如 https://blog.cloudera.com/blog/2017/06/offset-management-for-apache-kafka-with-apache-spark-streaming/) 说 Spark Stream
..
我正在使用 Spark 2.2.0 和 kafka 0.10 spark-streaming 库来读取充满 Kafka-Streams scala 应用程序的主题.Kafka Broker 版本为 0.11,Kafka-streams 版本为 0.11.0.2. 当我在 Kafka-Stream 应用中设置 EXACTLY_ONCE 保证时: p.put(StreamsConfig.PR
..
我正在使用 Spark 2.2.0 和 kafka 0.10 spark-streaming 库来读取充满 Kafka-Streams scala 应用程序的主题.Kafka Broker 版本为 0.11,Kafka-streams 版本为 0.11.0.2. 当我在 Kafka-Stream 应用中设置 EXACTLY_ONCE 保证时: p.put(StreamsConfig.PR
..
我在 kafka 中做数据复制.但是,kafka 日志文件的大小增加得非常快.大小在一天内达到 5 GB.作为这个问题的解决方案,我想立即删除处理过的数据.我在 AdminClient 中使用删除记录方法来删除偏移量.但是当我查看日志文件时,并没有删除与该偏移量对应的数据. RecordsToDelete recordsToDelete = RedcordsToDelete.beforeOffs
..
我们使用kafka作为我们的消息队列,我们的业务要求消息时间戳必须与偏移量具有相同的顺序,这意味着:如果有消息m1和消息m2,并且(m1.timestamp 解决方案 取决于使用的时间戳类型,有两种类型: CreateTime - 创建生产者记录时分配时间戳,因此在发送之前.可能会重试,因此无法保证保留顺序. LogAppendTime - 将记录附加到代理上的日志时分配时间戳.在
..
kafka 将如何处理调用 KafkaConsumer.commitAsync(Map offsets, OffsetCommitCallback callback) 当某个主题的偏移值被指定为小于先前调用的值时? 解决方案 它会简单地将分区的偏移量设置为您指定的值,因此下次您将使用来自 commitedOffset+1 的消息. commitAsync() 的 javado
..
kafka 将如何处理调用 KafkaConsumer.commitAsync(Map offsets, OffsetCommitCallback callback) 当某个主题的偏移值被指定为小于先前调用的值时? 解决方案 它会简单地将分区的偏移量设置为您指定的值,因此下次您将使用来自 commitedOffset+1 的消息. commitAsync() 的 javado
..
我正在使用 Kafka 的结构化流源(集成指南),如前所述,它没有提交任何偏移量. 我的目标之一是监控它(检查它是否落后等).即使它没有提交偏移量,它也会通过不时查询 kafka 并检查下一个要处理的偏移量来处理它们.根据文档,偏移量被写入 HDFS,因此在发生故障时可以恢复,但问题是: 它们存放在哪里?如果不提交偏移量,是否有任何方法可以监视火花流(结构化)的 kafka 消耗(从程
..
我一直在做火花流作业,通过 kafka 消费和生产数据.我用的是directDstream,所以必须自己管理offset,我们采用redis来写和读offset.现在有一个问题,当我启动我的客户端时,我的客户端需要从redis中获取offset,而不是kafka中存在的offset它自己.如何显示我编写的代码?现在我已经在下面编写了代码: kafka_stream = KafkaUtils.c
..
我正在尝试使用 kafka-python 构建一个应用程序,其中消费者从一系列主题中读取数据.消费者绝不会两次读取同一条消息,而且绝不会错过任何一条消息,这一点极为重要. 似乎一切正常,除非我关闭消费者(例如失败)并尝试从偏移量开始读取.我只能读取主题中的所有消息(这会造成双重读取)或仅收听新消息(并错过在故障期间发出的消息).我在暂停消费者时没有遇到这个问题. 我创建了一个孤立的模拟
..
我需要一天一小时地获取 Kafka 中产生的消息.每隔一小时,我将启动一项工作来消费 1 小时前生成的消息.例如,如果当前时间是 20:12,我将在 19:00:00 和 19:59:59 之间消费消息.这意味着我需要在 19:00:00 时间开始偏移,在 19:59:59 时间结束偏移.我使用 SimpleConsumer.getOffsetsBefore 如「0.8.0 SimpleConsu
..
我在谷歌上搜索并阅读 Kafka 文档,但我找不到消费者偏移量的最大值以及最大值后是否有偏移量环绕.我知道偏移量是一个 Int64 值,所以最大值是 0xFFFFFFFFFFFFFFFF.如果有环绕,Kafka 如何处理这种情况? 解决方案 根据这个 post,偏移量未重置: 我们目前不回滚偏移量.由于偏移量很长,它可以持续很长时间.一天写1TB就可以继续大约 400 万天.
..
我正在使用 Kafka 流,并希望将一些消费者偏移量从 Java 重置到开头.KafkaConsumer.seekToBeginning(...) 听起来是正确的做法,但我使用 Kafka Streams: KafkaStreams 流 = new KafkaStreams(builder, props);...流开始(); 我想根据我定义的具体流管道,这会在幕后创建几个消费者.我可以访问那些
..
引自 https:/www.safaribooksonline.com/library/view/kafka-the-definitive/9781491936153/ch04.html#callout_kafka_consumers__reading_data_from_kafka_CO2-1 缺点是虽然 commitSync() 会重试提交,直到它要么成功,要么遇到不可重试的失败,com
..
我正在使用 zookeeper 从 kafka 获取数据.在这里,我总是从最后一个偏移点获取数据.有什么办法可以指定偏移时间来获取旧数据吗? 有一个选项 autooffset.reset.它接受最小或最大.有人可以解释什么是最小和最大.autooffset.reset 可以帮助从旧偏移点而不是最新偏移点获取数据吗? 解决方案 消费者始终属于一个组,对于每个分区,Zookeeper 会
..
我正在阅读这个: 自动提交提交偏移量的最简单方法是允许消费者为你做.如果配置 enable.auto.commit=true,然后每五秒消费者将提交最大的偏移量您的客户从 poll() 收到.五秒间隔是默认并通过设置 auto.commit.interval.ms 来控制.只是像消费者中的其他一切一样,自动提交是由驱动的通过轮询循环.每当您进行轮询时,消费者都会检查是否到了时间提交,如果是,
..
Spark 2.2 引入了 Kafka 的结构化流媒体源.据我了解,它依赖于 HDFS 检查点目录来存储偏移量并保证“恰好一次"消息传递. 但是旧码头(例如 https://blog.cloudera.com/blog/2017/06/offset-management-for-apache-kafka-with-apache-spark-streaming/) 说 Spark Stream
..