Kafka consumer.poll 与 bitnami 容器一起挂起 [英] Kafka consumer.poll hangs with bitnami container

查看:25
本文介绍了Kafka consumer.poll 与 bitnami 容器一起挂起的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我在远程服务器上安装了最新的 bitnami kafka 容器.

I have the latest bitnami kafka container installed on a remote server.

[2021-04-07 18:05:38,263] INFO Client environment:zookeeper.version=3.5.8-f439ca583e70862c3068a1f2a7d4d068eec33315, built on 05/04/2020 15:53 GMT (org.apache.zookeeper.ZooKeeper)
[2021-04-07 18:05:40,137] INFO Kafka version: 2.7.0 (org.apache.kafka.common.utils.AppInfoParser)

我的 kafka 配置为可以有外部连接.

My kafka is configured so that I can have external connections.

kafka:
image: 'bitnami/kafka:latest'
container_name: kafka
ports:
  - '9092:9092'
  - '9093:9093'
environment:
  - KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
  - ALLOW_PLAINTEXT_LISTENER=yes
  - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CLIENT:PLAINTEXT,EXTERNAL:PLAINTEXT
  - KAFKA_CFG_LISTENERS=CLIENT://:9092,EXTERNAL://:9093
  - KAFKA_CFG_ADVERTISED_LISTENERS=CLIENT://kafka:9092,EXTERNAL://localhost:9093
  - KAFKA_INTER_BROKER_LISTENER_NAME=CLIENT

ping 和 telnet 到 IP 地址都有效.

ping and telnet to the ip address both work.

我能够运行生产者并在 python 中发送数据.

I am able to run a producer and send data in python.

import kafka
from time import sleep
from json import dumps
from kafka import KafkaProducer
from kafka import KafkaConsumer

#Producer---------------------------------------------------------------
producer = KafkaProducer(bootstrap_servers=['192.xxx.xx.xx:9093'],
                     value_serializer=lambda x: 
                     dumps(x).encode('utf-8'))

producer.send('TestTopic1', value='MyTest')

但是,我无法使用数据.脚本挂在 consumer.poll 上,永远不会改变行.

But, I am unable to consume the data. The script hangs at consumer.poll and never changes lines.

import kafka
from time import sleep
from json import dumps
from kafka import KafkaConsumer

# Consumer---------------------------------------------------------------
consumer = KafkaConsumer(
    'TestTopic1',
    bootstrap_servers=['192.xxx.xx.xx:9093'],
    auto_offset_reset='earliest',
    enable_auto_commit=False,
    group_id='testgroup',
    value_deserializer=lambda x : loads(x.decode('utf-8')))

#I've tried both with group_id to None or with a group_id.

print('BEFORE subscribe: ')
consumer.subscribe(['TestTopic1'])

print('BEFORE poll: ')
# HANGS HERE!! Never gets to the print after
consumer.poll(timeout_ms=500)

print('AFTER POLL: ')
consumer.seek_to_beginning()

print('partitions of the topic: ', consumer.partitions_for_topic('TestTopic1'))

for msg in consumer:
    print(type(msg))

在 Kafka 日志中,我看到正在创建的主题以及我不太确定它们的含义的其他行.

In the Kafka logs, I see the Topic getting created as well as other lines that I'm not quite sure what they mean.

[2021-04-07 18:05:40,234] INFO [broker-1001-to-controller-send-thread]: Recorded new controller, from now on will use broker 1001 (kafka.server.BrokerToControllerRequestThread)
[2021-04-07 18:06:37,509] INFO Creating topic __consumer_offsets with configuration {compression.type=producer, cleanup.policy=compact, segment.bytes=104857600} and initial partition assignment Map(23 -> ArrayBuffer(1001), 32 -> ArrayBuffer(1001), 41 -> ArrayBuffer(1001), 17 -> ArrayBuffer(1001), 8 -> ArrayBuffer(1001), 35 -> ArrayBuffer(1001), 44 -> ArrayBuffer(1001), 26 -> ArrayBuffer(1001), 11 -> ArrayBuffer(1001), 29 -> ArrayBuffer(1001), 38 -> ArrayBuffer(1001), 47 -> ArrayBuffer(1001), 20 -> ArrayBuffer(1001), 2 -> ArrayBuffer(1001), 5 -> ArrayBuffer(1001), 14 -> ArrayBuffer(1001), 46 -> ArrayBuffer(1001), 49 -> ArrayBuffer(1001), 40 -> ArrayBuffer(1001), 13 -> ArrayBuffer(1001), 4 -> ArrayBuffer(1001), 22 -> ArrayBuffer(1001), 31 -> ArrayBuffer(1001), 16 -> ArrayBuffer(1001), 7 -> ArrayBuffer(1001), 43 -> ArrayBuffer(1001), 25 -> ArrayBuffer(1001), 34 -> ArrayBuffer(1001), 10 -> ArrayBuffer(1001), 37 -> ArrayBuffer(1001), 1 -> ArrayBuffer(1001), 19 -> ArrayBuffer(1001), 28 -> ArrayBuffer(1001), 45 -> ArrayBuffer(1001), 27 -> ArrayBuffer(1001), 36 -> ArrayBuffer(1001), 18 -> ArrayBuffer(1001), 9 -> ArrayBuffer(1001), 21 -> ArrayBuffer(1001), 48 -> ArrayBuffer(1001), 3 -> ArrayBuffer(1001), 12 -> ArrayBuffer(1001), 30 -> ArrayBuffer(1001), 39 -> ArrayBuffer(1001), 15 -> ArrayBuffer(1001), 42 -> ArrayBuffer(1001), 24 -> ArrayBuffer(1001), 6 -> ArrayBuffer(1001), 33 -> ArrayBuffer(1001), 0 -> ArrayBuffer(1001)) (kafka.zk.AdminZkClient)
[2021-04-07 18:06:37,534] INFO [KafkaApi-1001] Auto creation of topic __consumer_offsets with 50 partitions and replication factor 1 is successful (kafka.server.KafkaApis)
[2021-04-07 18:06:37,547] INFO Creating topic TestTopic1 with configuration {} and initial partition assignment Map(0 -> ArrayBuffer(1001)) (kafka.zk.AdminZkClient)
[2021-04-07 18:06:37,557] INFO [KafkaApi-1001] Auto creation of topic TestTopic1 with 1 partitions and replication factor 1 is successful (kafka.server.KafkaApis)
[2021-04-07 18:06:37,906] INFO [ReplicaFetcherManager on broker 1001] Removed fetcher for partitions Set(__consumer_offsets-22, __consumer_offsets-30, __consumer_offsets-8, __consumer_offsets-21, __consumer_offsets-4, __consumer_offsets-27, __consumer_offsets-7, __consumer_offsets-9, __consumer_offsets-46, __consumer_offsets-25, __consumer_offsets-35, __consumer_offsets-41, __consumer_offsets-33, __consumer_offsets-23, __consumer_offsets-49, __consumer_offsets-47, __consumer_offsets-16, __consumer_offsets-28, __consumer_offsets-31, __consumer_offsets-36, __consumer_offsets-42, __consumer_offsets-3, __consumer_offsets-18, __consumer_offsets-37, __consumer_offsets-15, __consumer_offsets-24, __consumer_offsets-38, __consumer_offsets-17, __consumer_offsets-48, __consumer_offsets-19, __consumer_offsets-11, __consumer_offsets-13, __consumer_offsets-2, __consumer_offsets-43, __consumer_offsets-6, __consumer_offsets-14, __consumer_offsets-20, __consumer_offsets-0, __consumer_offsets-44, __consumer_offsets-39, __consumer_offsets-12, __consumer_offsets-45, __consumer_offsets-1, __consumer_offsets-5, __consumer_offsets-26, __consumer_offsets-29, __consumer_offsets-34, __consumer_offsets-10, __consumer_offsets-32, __consumer_offsets-40) (kafka.server.ReplicaFetcherManager)
[2021-04-07 18:06:37,979] INFO [Log partition=__consumer_offsets-0, dir=/bitnami/kafka/data] Loading producer state till offset 0 with message format version 2 (kafka.log.Log)
[2021-04-07 18:06:37,991] INFO Created log for partition __consumer_offsets-0 in /bitnami/kafka/data/__consumer_offsets-0 with properties {compression.type -> producer, min.insync.replicas -> 1, message.downconversion.enable -> true, segment.jitter.ms -> 0, cleanup.policy -> compact, flush.ms -> 9223372036854775807, retention.ms -> 604800000, segment.bytes -> 104857600, flush.messages -> 9223372036854775807, message.format.version -> 2.7-IV2, max.compaction.lag.ms -> 9223372036854775807, file.delete.delay.ms -> 60000, max.message.bytes -> 1048588, min.compaction.lag.ms -> 0, message.timestamp.type -> CreateTime, preallocate -> false, index.interval.bytes -> 4096, min.cleanable.dirty.ratio -> 0.5, unclean.leader.election.enable -> false, retention.bytes -> -1, delete.retention.ms -> 86400000, segment.ms -> 604800000, message.timestamp.difference.max.ms -> 9223372036854775807, segment.index.bytes -> 10485760}. (kafka.log.LogManager)
[2021-04-07 18:06:37,992] INFO [Partition __consumer_offsets-0 broker=1001] No checkpointed highwatermark is found for partition __consumer_offsets-0 (kafka.cluster.Partition)
[2021-04-07 18:06:37,994] INFO [Partition __consumer_offsets-0 broker=1001] Log loaded for partition __consumer_offsets-0 with initial high watermark 0 (kafka.cluster.Partition)
[2021-04-07 18:06:38,011] INFO [Log partition=__consumer_offsets-29, dir=/bitnami/kafka/data] Loading producer state till offset 0 with message format version 2 (kafka.log.Log)

然后我的日志中还有一系列此类消息.

Then I have another series of this type of messages in my log.

[2021-04-07 18:06:38,563] INFO Created log for partition __consumer_offsets-13 in /bitnami/kafka/data/__consumer_offsets-13 with properties {compression.type -> producer, min.insync.replicas -> 1, message.downconversion.enable -> true, segment.jitter.ms -> 0, cleanup.policy -> compact, flush.ms -> 9223372036854775807, retention.ms -> 604800000, segment.bytes -> 104857600, flush.messages -> 9223372036854775807, message.format.version -> 2.7-IV2, max.compaction.lag.ms -> 9223372036854775807, file.delete.delay.ms -> 60000, max.message.bytes -> 1048588, min.compaction.lag.ms -> 0, message.timestamp.type -> CreateTime, preallocate -> false, index.interval.bytes -> 4096, min.cleanable.dirty.ratio -> 0.5, unclean.leader.election.enable -> false, retention.bytes -> -1, delete.retention.ms -> 86400000, segment.ms -> 604800000, message.timestamp.difference.max.ms -> 9223372036854775807, segment.index.bytes -> 10485760}. (kafka.log.LogManager)
[2021-04-07 18:06:38,563] INFO [Partition __consumer_offsets-13 broker=1001] No checkpointed highwatermark is found for partition __consumer_offsets-13 (kafka.cluster.Partition)
[2021-04-07 18:06:38,563] INFO [Partition __consumer_offsets-13 broker=1001] Log loaded for partition __consumer_offsets-13 with initial high watermark 0 (kafka.cluster.Partition)
[2021-04-07 18:06:38,577] INFO [GroupMetadataManager brokerId=1001] Scheduling loading of offsets and group metadata from __consumer_offsets-22 (kafka.coordinator.group.GroupMetadataManager)
[2021-04-07 18:06:38,579] INFO [GroupMetadataManager brokerId=1001] Scheduling loading of offsets and group metadata from __consumer_offsets-25 (kafka.coordinator.group.GroupMetadataManager)
[2021-04-07 18:06:38,579] INFO [GroupMetadataManager brokerId=1001] Scheduling loading of offsets and group metadata from __consumer_offsets-28 (kafka.coordinator.group.GroupMetadataManager)
....
[2021-04-07 18:06:38,589] INFO [GroupMetadataManager brokerId=1001] Finished loading offsets and group metadata from __consumer_offsets-22 in 12 milliseconds, of which 2 milliseconds was spent in the scheduler. (kafka.coordinator.group.GroupMetadataManager)
[2021-04-07 18:06:38,596] INFO [GroupMetadataManager brokerId=1001] Finished loading offsets and group metadata from __consumer_offsets-25 in 17 milliseconds, of which 17 milliseconds was spent in the scheduler. (kafka.coordinator.group.GroupMetadataManager)
[2021-04-07 18:06:38,597] INFO [GroupMetadataManager brokerId=1001] Finished loading offsets and group metadata from __consumer_offsets-28 in 18 milliseconds, of which 17 milliseconds was spent in the scheduler. (kafka.coordinator.group.GroupMetadataManager)
....
[2021-04-07 18:06:38,638] INFO [ReplicaFetcherManager on broker 1001] Removed fetcher for partitions Set(TestTopic1-0) (kafka.server.ReplicaFetcherManager)
[2021-04-07 18:06:38,643] INFO [Log partition=TestTopic1-0, dir=/bitnami/kafka/data] Loading producer state till offset 0 with message format version 2 (kafka.log.Log)
[2021-04-07 18:06:38,644] INFO Created log for partition TestTopic1-0 in /bitnami/kafka/data/TestTopic1-0 with properties {compression.type -> producer, min.insync.replicas -> 1, message.downconversion.enable -> true, segment.jitter.ms -> 0, cleanup.policy -> [delete], flush.ms -> 9223372036854775807, retention.ms -> 604800000, segment.bytes -> 1073741824, flush.messages -> 9223372036854775807, message.format.version -> 2.7-IV2, max.compaction.lag.ms -> 9223372036854775807, file.delete.delay.ms -> 60000, max.message.bytes -> 1048588, min.compaction.lag.ms -> 0, message.timestamp.type -> CreateTime, preallocate -> false, index.interval.bytes -> 4096, min.cleanable.dirty.ratio -> 0.5, unclean.leader.election.enable -> false, retention.bytes -> -1, delete.retention.ms -> 86400000, segment.ms -> 604800000, message.timestamp.difference.max.ms -> 9223372036854775807, segment.index.bytes -> 10485760}. (kafka.log.LogManager)
[2021-04-07 18:06:38,647] INFO [Partition TestTopic1-0 broker=1001] No checkpointed highwatermark is found for partition TestTopic1-0 (kafka.cluster.Partition)
[2021-04-07 18:06:38,647] INFO [Partition TestTopic1-0 broker=1001] Log loaded for partition TestTopic1-0 with initial high watermark 0 (kafka.cluster.Partition)

我在这里没有看到任何与消费者相关的内容.

I don't see anything related to the consumer in here.

请注意,这只是一个开发服务器.我们应该用它作为概念证明,看看 Kafka 是否适合我们,看看我们是否会在产品中使用它.

Note that this is only a dev server. We're supposed to use that as a proof of concept to see if Kafka works for us and see if we'll use it in prod.

任何帮助将不胜感激,因为我们非常希望能够使其工作并在生产中使用它.

Any help would be appreciated as we'd really like to be able to make it work and use it in production.

推荐答案

安装在远程服务器上.

installed on a remote server.

然后你需要在KAFKA_CFG_ADVERTISED_LISTENERS中公布那个服务器的地址,仅仅端口映射是不够的

Then you need to advertise that server's address in KAFKA_CFG_ADVERTISED_LISTENERS, just a port mapping isn't sufficient

超时是因为引导协议返回广告地址,因此您的远程使用者正在尝试从 localhost:9093

It's timing out because the bootstrap protocol returns the advertised address, so your remote consumer is trying to read from localhost:9093

您的生产者也有类似的问题,但您并没有刷新生产者以实际发送数据

Your producer would also have a similar issue, but you aren't flushing the producer to actually send data

如果您使用 Docker 编排平台进行生产,则需要解决其他网络配置

If you productionize using Docker orchestration platforms, you'll need to work around other networking configurations

这篇关于Kafka consumer.poll 与 bitnami 容器一起挂起的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆