kafka-consumer-api相关内容

Kafka只订阅最新消息吗?

有时(似乎是非常随机的),Kafka发送旧消息.我只需要最新消息,因此它将使用相同的密钥覆盖消息.目前看来,我有多个具有相同密钥的消息,没有被压缩. 我在主题中使用此设置: cleanup.policy=compact 我正在使用Java/Kotlin和Apache Kafka 1.1.1客户端. Properties(8).apply { val jaasTempl ..
发布时间:2020-04-25 08:26:31 Java开发

Kafka Consumer自动提交如何工作?

我正在阅读这: 自动提交提交偏移量的最简单方法是允许 消费者为您做.如果您配置enable.auto.commit = true, 然后,消费者每五秒钟将做出最大的抵销 您的客户来自poll().五秒钟的间隔是 默认值,并通过设置auto.commit.interval.ms进行控制.只是 像使用者中的所有其他东西一样,自动提交被驱动 通过轮询循环.每当您轮询时,消费者都会检查是否到了时间 ..
发布时间:2020-04-25 08:26:27 Java开发

阅读Spark批处理作业中的Kafka主题

我正在编写一个从Kafka主题读取的Spark(v1.6.0)批处理作业. 为此,我可以使用org.apache.spark.streaming.kafka.KafkaUtils#createRDD 我需要为所有分区设置偏移量,还需要将它们存储在某个位置(ZK,HDFS?),以了解从哪里开始下一个批处理作业. 在批处理作业中从Kafka读取的正确方法是什么? 我也在考虑编写一个流作业 ..

kafka-python-如何提交分区?

使用kafka-python-1.0.2. 如果我有一个包含10个分区的主题,那么如何在循环遍历各个分区和消息的同时提交特定的分区.我只是似乎在任何地方(无论是在文档中还是在其他地方)都找不到此示例 从文档中,我想使用: consumer.commit(offset=offsets) 具体来说,如何创建偏移量所需的分区和OffsetAndMetadata字典(字典,可选)– ..
发布时间:2020-04-25 08:26:20 Python

Kafka Connect JDBC接收器连接器不起作用

我正在尝试使用Kafka Connect JDBC接收器连接器将数据插入Oracle,但是它抛出错误.我尝试了该模式的所有可能配置.下面是示例. 如果下面缺少任何内容,请提出建议,这些是我的配置文件和错误. 情况1-首先配置 internal.value.converter.schemas.enable=false . 所以我得到了 [2017-08-28 16:16: ..

Kafka使用者(0.8.2.2)可以批量读取消息吗

据我了解,Kafka使用者顺序读取指定分区中的消息... 我们计划有多个Kafka使用者(Java),该使用者具有与我相同的组.因此,如果它从分配的分区中顺序读取,那么我们如何实现高吞吐量.例如,Producer每秒发布40条消息,例如... 使用者进程每秒味精1.虽然我们可以有多个使用者,但不能有40 rt?如果我错了请纠正我... 在我们的情况下,使用者必须在成功处理消息后才提交偏 ..
发布时间:2020-04-25 08:26:15 Java开发

使用Spark Streaming时限制Kafka批次大小

是否可以限制Kafka使用者返回的Spark Streaming批次的大小? 我之所以问是因为我获得的第一批记录有亿万条记录,并且需要很长时间才能处理并检查它们. 解决方案 我认为您可以通过 Spark Streaming Backpressure (火花流反压)解决您的问题. 检查spark.streaming.backpressure.enabled和spark.strea ..

消费者重新平衡在卡夫卡如何运作?

添加或关闭新的消费者/经纪人后,Kafka会触发重新平衡操作. Kafka正在重新平衡阻止操作吗?重新平衡操作进行期间,Kafka消费者是否受阻? 解决方案 取决于您所说的“受阻".如果您的意思是“触发重新平衡时现有连接已关闭",那么答案是肯定的.不幸的是,当前的Kafka的重新平衡算法并不完善. 这是消费者重新平衡过程中发生的事情. 假设我们有一个包含10个分区(0-9)的主 ..

什么是“重新平衡"?在Apache Kafka上下文中意味着什么?

我是Kafka的新用户,并且已经试用了2-3周.我相信目前我对Kafka的工作原理有一个很好的了解,但是在尝试为我自己的Kafka消费者使用该API之后(这虽然默默无闻,但我正在遵循新的KafkaConsumer的指导原则,适用于v 0.9(已在“主干"存储库中列出)v)如果我有多个使用相同groupID的使用者,则我遇到了某个主题消耗的延迟问题. 在此设置中,我的控制台始终记录有关“重新平 ..
发布时间:2020-04-25 08:25:55 其他开发

卡夫卡消费者未返回任何事件

下面的Scala kafka使用者未从poll调用返回任何事件. 但是,该主题是正确的,我可以看到使用控制台使用者发送到该主题的事件: /opt/kafka_2.11-0.10.1.0/bin/kafka-console-consumer.sh --bootstrap-server kafka:9092 --topic my_topic --from-beginning 当我使用调 ..
发布时间:2020-04-25 08:25:53 其他开发

如何制作可重启的生产者?

kafka的最新版本支持完全一次语义(EoS).为了支持此概念,将额外的详细信息添加到每条消息.这意味着对您的消费者而言;如果您打印邮件的偏移量,它们不一定是顺序的.这使得轮询一个主题以阅读最后提交的消息变得更加困难. 在我的情况下,消费者打印出这样的内容 Offset-0 0 Offset-2 1 Offset-4 2 问题:为了编写可重启的骄傲者;我轮询主题并阅读最后一条消息的 ..
发布时间:2020-04-25 08:25:49 其他开发

如何从一开始就使用Kafka Consumer API读取数据?

请问有谁能告诉我如何从每次运行使用者jar时开始就使用Kafka Consumer API阅读消息. 解决方案 这适用于0.9.x使用者.基本上,在创建使用者时,需要使用属性ConsumerConfig.GROUP_ID_CONFIG将此使用者组标识分配给该使用者.每次启动使用者执行类似properties.put(ConsumerConfig.GROUP_ID_CONFIG, UUID. ..
发布时间:2020-04-25 08:25:41 其他开发

Kafka:消费者API与流API

我最近开始学习Kafka,并遇到了这些问题. Consumer和Stream之间有什么区别?对我而言,如果有任何工具/应用程序消费来自Kafka的消息,则是Kafka世界中的消费者. Stream有何不同之处,因为它也会从Kafka中消费或产生消息到Kafka?以及为什么需要它,因为我们可以编写自己的消费者 使用消费者API的应用程序并根据需要对其进行处理,还是将其从消费者应用程序发送 ..

了解Kafka主题和分区

出于企业解决方案的目的,我开始学习Kafka. 在阅读期间,我想到了一些问题: 生产者在生成消息时-会指定要向其发送消息的 topic ,对吗?它关心分区吗? 订户正在运行时-是否指定其组ID,以便它可以成为同一主题或该组消费者感兴趣的多个主题的消费者集群的一部分? 每个消费者组在经纪人上都有对应的分区吗?还是每个消费者都有一个分区? 是由代理创建的分区,因此对于消费者而 ..
发布时间:2020-04-25 08:25:35 其他开发

在KAFKA中使用后删除消息

我正在使用apache kafka制作和使用5GB大小的文件.我想知道是否有一种方法可以在使用完主题消息后自动将其删除.我有什么办法来跟踪已消耗的消息吗?我不想手动删除它. 解决方案 在Kafka中,所消费物品的责任是消费者的责任,这也是Kafka具有如此出色的水平可伸缩性的主要原因之一. 使用高级使用者API将通过在Zookeeper中提交消耗的偏移量来自动为您执行此操作(或者特殊 ..
发布时间:2020-04-25 08:25:33 其他开发