kafka-producer-api相关内容
您如何控制Kafka Producer或Consumer的控制台日志记录级别?我正在Scala中使用Kafka 0.9 API. 每次调用KafkaProducer上的send时,控制台都会提供如下输出.这是否表明我没有正确设置KafkaProducer,而不仅仅是一个日志过多的问题? 17:52:21.236 [pool-10-thread-7] INFO o.a.k.c.prod
..
我正在运行rdkafka_simple_producer.c来向Kafka集群生成消息.我有一个主题和30个分区.使用默认的循环分区程序. 当生产者正在工作并向Kafka生成消息时,我向Kafka添加了更多分区 kafka/bin/kafka-topics.sh --alter --zookeeper server2:2181 --topic demotest --partitions 40
..
我正在运行一个Samza流作业,该作业正在将数据写入Kafka主题. Kafka正在运行一个3节点群集. Samza作业部署在纱线上.我们在容器日志中看到了许多此类异常: INFO [2018-10-16 11:14:19,410] [U:2,151,F:455,T:2,606,M:2,658] samza.container.ContainerHeartbeatMonitor:[Cont
..
我是kafka的新手.当我运行此命令 javac -cp "C:\kafka\kafka_2.11-0.10.2.0\libs\kafka-clients-0.10.2.0.jar" *.java 我收到一条错误消息 错误:包org.apache.kafka.clients.producer不存在 解决方案 以下命令效果很好 javac -classpath ".;C
..
我正在异步模式下使用kafka生产者,但是当所有代理都关闭时,它的行为就像同步,并且它等待直到meta.fetch.timeout.ms到期,这对我来说是60秒.我的第一个问题是这是正常现象还是我做错了什么? 由于我的逻辑中的事务最多应在100毫秒内完成,因此该超时值对我来说确实是一个很大的延迟.也许将metadata.fetch.timeout.ms设置为10 ms可能会解决我的问题,但是
..
我正在使用非阻塞(异步)方式向Kafka发送消息: ListenableFuture> future = template.send(record); future.addCallback(new ListenableFutureCallback>() {
..
更新主题的TTL,使记录在主题中保留10天.我只需要通过保留所有其他主题TTL相同的当前配置来对特定主题进行此操作,就必须使用java进行此操作,因为我是通过Java将主题推到kafka的.我正在设置以下属性以将主题推送到kafka Properties props = new Properties(); props.put("bootstrap.servers", KAFKA_SERVER
..
在生产或使用数据时我看不到任何故障,但是生产中有一堆重复的消息.对于一个大约有10万条消息的小话题,大约有4000条重复项,尽管就像我说的那样,没有失败,而且没有实现重试逻辑或未设置配置值. 我还检查那些重复消息的偏移值,每个偏移值都有不同的值,告诉我问题出在生产者中. 任何帮助将不胜感激 解决方案 了解有关在Kafka中传递消息的更多信息: https://kafka.
..
我只是在探索Kafka,目前我正在使用一个producer和一个主题来生成消息,并且该消息被一个Consumer使用.非常简单. 我正在阅读Kafka页面,new Producer API is thread-safe和共享单个实例将提高性能. 这是否意味着我可以使用单个生产者将消息发布到多个主题? 解决方案 我从来没有尝试过,但是我想可以.由于生产者和发送记录的代码是(从这里
..
我拥有低于kafka的制作人Api程序,并且对kafka本身还是陌生的.下面的代码从API之一获取数据并将消息发送到kafka主题. package kafka_Demo; import java.util.Properties; import java.io.BufferedReader; import java.io.InputStream; import java.io.InputS
..
如果我在Producer上将Kafka配置参数设置为: 1. retries = 3 2. max.in.flight.requests.per.connection = 5 那么一个分区中的邮件可能不在send_order中. Kafka是否采取任何其他措施来确保分区中的消息仅保持发送顺序 或者 通过以上配置,是否可能在分区内出现乱序消息? 解决方案 不幸的是,没有.
..
我已经在Windows PC上构建了一个小型测试环境,并写下了以下代码来测试kafka(使用org.apache.kafka的kafka_2.10:0.9.0.1). package iii.functiontesting; import java.text.ParseException; import java.util.Properties; import org.apache.ka
..
卡夫卡高级制作人何时,多久选举一位领导者? 它是在发送每条消息之前执行还是在建立连接时仅执行一次? 解决方案 每个经纪人都拥有有关主题(和分区)及其首领列表的信息,只要新的首领被任命,动物园饲养员就会将其保持最新.当选或者当分区改变的次数. 因此,当生产者对其中一个经纪人进行呼叫时,它将以该信息列表进行响应.生产者收到此信息后,将对其进行缓存并使用它来连接至领导者.因此,下次当它想要
..
EDIT2 :最后,我使用Java制作了自己的制作器,并且效果很好,所以问题出在Kafka-console-producer 中. kafka-console-consumer效果很好. 编辑:我已经尝试过0.9.0.1版,并且具有相同的行为. 我正在完成学士学位的最终项目,即Spark Streaming和Flink之间的比较.在这两个框架之前,我都使用Kafka和一个脚本来生成数据
..
在Kafka 0.11.0.0的最新版本中,Apache团队正在引入幂等的生产者和交易. 是否可以保证要记录的整套消息(例如一百万条)仅在最后提交? 我希望这样,例如,如果生产者失去与经纪人的联系并且无法重新建立联系,那么消费者就不会看到任何消息.是否有可能? 解决方案 是的,可以使用生产者中的“交易"来实现.您开始一个事务,发布所有消息,然后提交该事务.所有消息一次都被写入Kafka,但
..
我正在使用Intellij IDE&在本地计算机上运行Kafka生产者.制作人将制作一百万条记录.这样做时,我想通过以下方式捕获生产者指标: 我知道kafka&的JMX端口我确实尝试将Kafka JMX端口设置为9999. 但是我不确定是否可以按照我期望的上述方式使用JConsole或JVisualVM获得指标. 任何人都可以提出有关如何实现此目标的任何想法吗? 解决方案 除J
..
我正在创建流分析应用程序,其中的每个分析/功能都将实现为微服务,以便该分析以后可以在其他项目中使用. 我正在使用Lagom创建微服务.我是Lagom的新手,这就是为什么我遇到一些疑问的原因. 我不知道将数据流(来自多个传感器)发送到微服务,然后该微服务将数据发布到kafka主题的最佳方法是什么. 服务描述中的流消息是否具有Lagom功能 ServiceCall [Source [
..
我正在衡量卡夫卡生产者生产者的表现. 目前,我遇到了两个配置和用法略有不同的客户端: 常用: def buildKafkaConfig(hosts: String, port: Int): Properties = { val props = new Properties() props.put("metadata.broker.list", brokers) pr
..
当前,我正在评估不同的消息传递系统. 有一个与Apache Kafka有关的问题,我无法回答自己. Kafka制作人是否可以动态创建主题和分区(以及现有主题)? 如果是,那么它是否有任何不利之处? 预先感谢 解决方案 已更新: kafka经纪人具有以下财产: auto.create.topics.enable 如果生产者将消息发布到具有新主题名称的主题,则将其设置为
..
以下在kafka中启用压缩的方式有何区别? 方法1:使用以下命令创建主题: bin/kafka-topics.sh --create --zookeeper localhost:2181 --config compression.type=gzip --topic test 方法2:在Kafka Producer客户端API中设置属性compression.type = gzip.
..