kafka-producer-api相关内容
我有一个经典的微服务架构.所以,有不同的应用.每个应用程序可能有 1..N 个实例.系统部署到Kubernetes. 所以,我们有很多不同的PODs,可以随时启动和停止. 我想实现 read-process-write 模式,所以我需要卡夫卡交易. 要配置事务,我需要为每个 Kafka 生产者设置一些 transaction id.(实际上,我需要 transaction-id-pre
..
我正在编写一个 REST 代理,比如融合的休息代理.它接受一个 JSON 负载、模式主题和 id,然后将 JSON 负载作为 Avro 对象写入流中.当我使用 kafka-avro-console-consumer 读取消息时,出现“未知幻字节"错误. 这是我的 kafka 生产者配置: properties.put("client.id", LocalHostUtils.getLoca
..
我使用的是 Kafka v0.9.0.1 (Scala v2.11) 和 com.101tec:zkclient v0.7.我正在尝试使用 AdminUtils 创建一个 kafka 主题.我的代码如下. String zkServers = "node1:2181,node2:2181,node3:2181,node4:2181";整数 sessionTimeout = (int)TimeUn
..
我的接收器属性: {"name": "jdbc-oracle",“配置":{"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector","tasks.max": "1","topics": "订单","connection.url": "jdbc:oracle:thin:@10.1.2.3:1071/orac","connec
..
我需要在异步发送到 Kafka 的情况下捕获异常.Kafka 生产者 Api 带有一个函数 send(ProducerRecord record, Callback callback).但是当我针对以下两种情况对此进行测试时: Kafka Broker 关闭 主题未预先创建回调没有被调用.相反,我在代码中收到了发送失败的警告(如下所示). 问题: 那么回调是否只针对特定异常调用
..
我正在运行 rdkafka_simple_producer.c 来向 Kafka 集群生成消息.我有一个主题和 30 个分区.使用默认的循环分区器.当生产者正在工作并向 Kafka 生成消息时,我向 Kafka 添加了更多分区 kafka/bin/kafka-topics.sh --alter --zookeeper server2:2181 --topic demotest --partiti
..
我正在使用 Spring Kafka 模板来生成消息.而且它产生消息的速度太慢了.生成 15000 条消息大约需要 8 分钟. 以下是我如何创建 Kafka 模板: @Bean公共 ProducerFactory高速AvroProducerFactory(@Qualifier("highSpeedProducerProperties") KafkaProperties 属性) {最终映射
..
我有一个由 3 个 Kafka 代理组成的集群,所有主题的复制因子为 3.由于最近几天我面临这个问题,突然(一天中几次)消费者和生产者在获得响应时卡住了,即使 Kafka 正在所有 3 个服务器上运行,直到我检查代理日志(“连接到 0 已断开连接)在响应被读取之前")并找出罪魁祸首节点为 0(在本例中为第一个节点)并在该节点上重新启动 zookeeper 和 broker. 根据日志,这是由
..
我应该如何知道何时必须扩展消费者组中的消费者.有快速生产者时,消费者扩大规模的触发因素是什么? 解决方案 一个直接的方法是获取消费者滞后(这可以计算为提交偏移量和开始偏移量之间的差异),如果滞后在最后一个计算n 倍增加,您可以扩大规模,反之亦然.您可能需要考虑一些边缘情况,例如,如果消费者数量下降,延迟会增加,并且自动缩放功能可能会产生更多线程/机器).
..
我是 kafka 的新手.当我运行此命令时 javac -cp "C:\kafka\kafka_2.11-0.10.2.0\libs\kafka-clients-0.10.2.0.jar" *.java 我收到一条错误消息 错误:包 org.apache.kafka.clients.producer 不存在 解决方案 以下命令效果很好 javac -classpath ".;C
..
我使用具有幂等生产者配置的 spring-kafka: 这些是我的配置道具: Properties props = new Properties();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, Joiner.on(",").join(appProps.getBrokers())); //配置SSL加密的以下三个设置props.p
..
我的主题有 10 个分区,1 个消费者组有 4 个消费者,工作线程大小为 3. 我可以看到分区中的消息分布不均匀,一个分区有很多数据,另一个是空闲的. 如何让我的生产者将负载平均分配到所有分区,从而使所有分区都得到正确利用? 解决方案 根据 DefaultPartitioner 类本身的 JavaDoc 注释,默认分区策略为: 如果记录中指定了分区,则使用它. 如果未指
..
所以我正在尝试使用 Kafka 流进行交互式查询.我有 Zookeeper 和 Kafka 在本地运行(在 Windows 上).我使用 C:\temp 作为存储文件夹,用于 Zookeeper 和 Kafka. 我已经设置了这样的主题 kafka-topics.bat --zookeeper localhost:2181 --create --replication-factor 1 -
..
我有几个关于 Kafka 的问题. 1) Kafka 是否有默认的 Web UI? 2) 我们如何优雅地关闭独立的 kafka 服务器,kafka 控制台-消费者/控制台生产者. 任何解决方案将不胜感激. 谢谢. 解决方案 1) 没有 Kafka 没有默认 UI. 然而,有许多第三方工具可以以图形方式显示 Kafka 资源.只需在 Google 上搜索 kaf
..
我有一个带有 1 个分区的 kafka 主题.如果其中有 100 条消息,则偏移量将从 0.99 开始. 根据 kafka 保留政策,所有消息将在指定时间段后被清除. 一旦所有消息都被清除(保留期后),我将向该主题发送 100 条新消息.现在,消息的新偏移量从哪里开始?是从 100 开始还是从 0 开始?? 我想了解新的偏移量是 100-199 还是 0-99? 解决方案
..
当我尝试发送超过 1 Mb 的消息时,我收到 Message size too large 异常.当我尝试生成消息时,错误出现在我的客户端应用程序中.经过一番谷歌搜索后,我发现应该更改设置以增加最大消息大小.好吧,我在 /kafka/config/server.properties 文件中做到了.我添加了接下来的 2 个设置: message.max.bytes=15728640replica.
..
我正在发布到远程 kafka 服务器并尝试使用来自该远程服务器的消息.(卡夫卡 v 0.90.1)发布工作正常,但消费也不行. 出版商 package org.test;导入 java.io.IOException;导入 java.util.Properties;导入 org.apache.kafka.clients.producer.KafkaProducer;导入 org.apache
..
我正在向 Kafka 写入一个 msg 并在另一端使用它.在其中做一些处理并将其写回另一个 Kafka 主题. 我想知道哪个消息响应针对哪个请求.. 当前决定从消费者端捕获偏移 id,然后写入响应并读取响应负载并决定相同. 对于这种方法,我们需要读取每条消息,还有其他方法可以根据消费者配置条件消费吗? 解决方案 消费者只能阅读整个主题.您只能通过 seek() 跳过消息,
..
我最近开始使用 Kafka,并针对少数用例评估了 Kafka. 如果我们想根据消息内容为消费者(订阅者)提供过滤消息的功能,那么最好的方法是什么? 假设生产商公开了一个名为“交易"的主题,该主题具有不同的交易详细信息,例如市场名称、创建日期、价格等. 一些消费者对特定市场的交易感兴趣,而另一些消费者则对特定日期后的交易感兴趣(基于内容的过滤) 由于代理端无法进行过滤,因此实
..
我在单节点中运行 kafka,我想在关闭我的 kafka 代理时看到 kafka Producer 的行为,然后我在几秒钟内重新启动我的代理,所以我创建了 Spring Boot 项目,我可能会在其中发送 1000 个客户 JSON 对象和打印每次发送的 JSON 对象的偏移量.我的应用程序工作正常,但是当我关闭我的 kafka 代理时,几秒钟后我重新启动我的代理,我的生产者返回以正常发送来自最新
..