Kafka主题中丢失的消息 [英] Lost message from the Kafka Topic
问题描述
在 ProducerRecord 中尝试时间戳记;我发现了一些奇怪的东西.从生产者发送少量消息后,我运行了kafka-console-consumer.sh并验证了这些消息是否在主题中.我停下了制片人,等了一分钟.当我重新运行kafka-console-consumer.sh时,它没有显示我先前生成的消息.我还添加了producer.flush()和producer.close(),但结果仍然相同.
While trying timestamp in ProducerRecord; I found something weird. After sending few messages from the producer, I ran kafka-console-consumer.sh and verified that those messages are in the topic. I stopped the producer and waited for a minute. When I reran kafka-console-consumer.sh then it did not show the messages that I generated previously. I also added producer.flush() and producer.close() but the outcome was still the same.
现在,当我停止使用timestamp字段时,一切工作正常,这使我相信带有timestamp的消息有些挑剔.
Now, when I stopped using timestamp field then everything worked fine which makes me believe that there is something finicky about messages with timestamp.
我正在使用Kafka_2.11-2.0.0(于2018年7月30日发布)
I am using Kafka_2.11-2.0.0 (released on July 30, 2018)
以下是示例代码.
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.internal.RecordHeaders;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
import static java.lang.Thread.sleep;
public class KafkaProducerSample{
public static void main(String[] args){
String kafkaHost="sample:port";
String notificationTopic="test";
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, this.kafkaHost);
props.put(ProducerConfig.ACKS_CONFIG, 1);
props.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE);
Producer<String, String> producer = new KafkaProducer(props, new StringSerialize(), new StringSerializer);
RecordHeaders recordHeaders = new RecordHeader();
ProducerRecord<String, String> record = new ProducerRecord(notificationTopic, null, 1574443515L, sampleKey, SampleValue);
producer.send(record);
sleep(1000);
}
}
我按照以下方式运行控制台使用者
I run console consumer as following
$KAFKA_HOME/bin/kafka-console-consumer.sh --bootstrap.server KAFKA_HOST:PORT --topic test --from-beginning
#output after running producer
test
#output 5mins after shutting down producer
推荐答案
您仅异步发送一条记录,但没有确认或刷新缓冲区.
You are asynchronously sending only one record, but not ack-ing or flushing the buffer.
您将需要发送更多记录,
You will need to send more records,
或
producer.send(record).get();
或
producer.send(record);
producer.flush();
或(首选),在您的main方法中执行Runtime.addShutdownHook()
刷新并关闭生产者
or (preferred), do Runtime.addShutdownHook()
in your main method to flush and close the producer
这篇关于Kafka主题中丢失的消息的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!