Apache Kafka 0.9.0.0显示所有带有分区的主题 [英] Apache Kafka 0.9.0.0 Show all Topics with Partitions
问题描述
我目前正在评估Apache Kafka,我有一个简单的使用者,应该从特定主题分区中读取消息.这是我的客户:
I'm currently evaluating Apache Kafka and I have a simple consumer that is supposed to read messages from a specific topic partition. Here is my client:
public static void main(String args[]) {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test");
props.put("enable.auto.commit", "false");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
TopicPartition partition0 = new TopicPartition("test_topic", Integer.parseInt(args[0]));
ArrayList topicAssignment = new ArrayList();
topicAssignment.add(partition0);
consumer.assign(topicAssignment);
//consumer.subscribe(Arrays.asList("test_topic"));
int commitInterval = 200;
List<ConsumerRecord<String, String>> buffer = new ArrayList<ConsumerRecord<String, String>>();
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
buffer.add(record);
if (buffer.size() >= commitInterval) {
process(buffer);
consumer.commitSync();
buffer.clear();
}
}
}
}
static void process(List<ConsumerRecord<String, String>> buffers) {
for (ConsumerRecord<String, String> buffer : buffers) {
System.out.println(buffer);
}
}
这是我用来启动Apache Kafka的命令:
Here is the command that I use to start Apache Kafka:
bin/kafka-server-start.sh config/server.properties & bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 2 --partitions 2 --topic test_topic
正如您在此处看到的,我正在创建具有2个分区(p0和p1)的主题!
As you can see here, I'm creating the topic with 2 partitions (p0 and p1)!
然后我使用以下命令启动我的使用者的两个实例:
I'm then starting two instances of my consumer with the following commands:
对于消费者1:
java -cp target/scala-2.11/kafka-consumer-0.1.0-SNAPAHOT.jar com.test.api.consumer.KafkaConsumer09Java 0
对于消费者2:
java -cp target/scala-2.11/kafka-consumer-0.1.0-SNAPAHOT.jar com.test.api.consumer.KafkaConsumer09Java 1
其中0和1代表我希望消费者从中读取消息的实际分区.
Where 0 and 1 represent the actual partition from which I want my consumer's to read the messages from.
但是发生的是,只有我的使用者1正在收到所有消息.我的印象是,来自生产者的消息最终平均分配到分区上.
But what happens is that only my Consumer 1 is getting all the messages. I was under the impression that the messages from the producer end up equally on the partitions.
我使用以下命令查看主题test_topic的分区数量:
I used the following command to see how many partitions that I have for my topic test_topic:
Joes-MacBook-Pro:kafka_2.11-0.9.0.0 joe$ bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --broker-info --group test --topic test_topic --zookeeper localhost:2181
[2016-01-14 13:36:48,831] WARN WARNING: ConsumerOffsetChecker is deprecated and will be dropped in releases following 0.9.0. Use ConsumerGroupCommand instead. (kafka.tools.ConsumerOffsetChecker$)
Group Topic Pid Offset logSize Lag Owner
test test_topic 0 10000 10000 0 none
BROKER INFO
0 -> 172.22.4.34:9092
即使我对Kafka说要为test_topic创建2个分区,为什么也只有一个分区?
Why is there only one partition even though I said to Kafka to create 2 partitions for the test_topic?
这是我的制片人:
def main(args: Array[String]) {
//val conf = new SparkConf().setAppName("VPP metrics producer")
//val sc = new SparkContext(conf)
val props: Properties = new Properties()
props.put("metadata.broker.list", "localhost:9092,localhost:9093")
props.put("serializer.class", "kafka.serializer.StringEncoder")
val config = new ProducerConfig(props)
val producer = new Producer[String, String](config)
1 to 10000 map {
case i =>
val jsonStr = getRandomTsDataPoint().toJson.toString
println(s"sending message $i to kafka")
producer.send(new KeyedMessage[String, String]("test_topic", jsonStr))
println(s"sent message $i to kafka")
}
}
推荐答案
如果使用2创建主题,我不确定为什么会有1个分区.这是肯定的.
I'm not sure why you would have 1 partition if you created the topic with 2. Never happened to me, that's for sure.
您可以尝试以下方法:bin/kafka-topics.sh-描述--zookeeper本地主机:2181 --topic test_topic
那应该告诉您实际上有多少个分区.
Can you try this:
bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test_topic
That should show you how many partitions are really there.
然后,如果确实有1个分区,也许您可以通过以下方式创建一个新主题来重新开始:bin/kafka-topics.sh-创建--zookeeper本地主机:2181-复制因子2-分区2-主题test_topic_2
Then, if there's really 1 partition, maybe you could start over by creating a new topic with:
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 2 --partitions 2 --topic test_topic_2
然后尝试:bin/kafka-topics.sh-描述--zookeeper本地主机:2181 --topic test_topic_2
...并报告调查结果.
And then try:
bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test_topic_2
... and report back the findings.
这篇关于Apache Kafka 0.9.0.0显示所有带有分区的主题的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!