卡夫卡消费者.民意测验与bitnami容器一起垂悬 [英] Kafka consumer.poll hangs with bitnami container

查看:85
本文介绍了卡夫卡消费者.民意测验与bitnami容器一起垂悬的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我在远程服务器上安装了最新的bitnami kafka容器.

I have the latest bitnami kafka container installed on a remote server.

[2021-04-07 18:05:38,263] INFO Client environment:zookeeper.version=3.5.8-f439ca583e70862c3068a1f2a7d4d068eec33315, built on 05/04/2020 15:53 GMT (org.apache.zookeeper.ZooKeeper)
[2021-04-07 18:05:40,137] INFO Kafka version: 2.7.0 (org.apache.kafka.common.utils.AppInfoParser)

我的kafka已配置为可以进行外部连接.

My kafka is configured so that I can have external connections.

kafka:
image: 'bitnami/kafka:latest'
container_name: kafka
ports:
  - '9092:9092'
  - '9093:9093'
environment:
  - KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
  - ALLOW_PLAINTEXT_LISTENER=yes
  - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CLIENT:PLAINTEXT,EXTERNAL:PLAINTEXT
  - KAFKA_CFG_LISTENERS=CLIENT://:9092,EXTERNAL://:9093
  - KAFKA_CFG_ADVERTISED_LISTENERS=CLIENT://kafka:9092,EXTERNAL://localhost:9093
  - KAFKA_INTER_BROKER_LISTENER_NAME=CLIENT

ping和telnet到IP地址都可以.

ping and telnet to the ip address both work.

我能够运行生产者并使用python发送数据.

I am able to run a producer and send data in python.

import kafka
from time import sleep
from json import dumps
from kafka import KafkaProducer
from kafka import KafkaConsumer

#Producer---------------------------------------------------------------
producer = KafkaProducer(bootstrap_servers=['192.xxx.xx.xx:9093'],
                     value_serializer=lambda x: 
                     dumps(x).encode('utf-8'))

producer.send('TestTopic1', value='MyTest')

但是,我无法使用数据.该脚本挂在consumer.poll上,并且从不更改行.

But, I am unable to consume the data. The script hangs at consumer.poll and never changes lines.

import kafka
from time import sleep
from json import dumps
from kafka import KafkaConsumer

# Consumer---------------------------------------------------------------
consumer = KafkaConsumer(
    'TestTopic1',
    bootstrap_servers=['192.xxx.xx.xx:9093'],
    auto_offset_reset='earliest',
    enable_auto_commit=False,
    group_id='testgroup',
    value_deserializer=lambda x : loads(x.decode('utf-8')))

#I've tried both with group_id to None or with a group_id.

print('BEFORE subscribe: ')
consumer.subscribe(['TestTopic1'])

print('BEFORE poll: ')
# HANGS HERE!! Never gets to the print after
consumer.poll(timeout_ms=500)

print('AFTER POLL: ')
consumer.seek_to_beginning()

print('partitions of the topic: ', consumer.partitions_for_topic('TestTopic1'))

for msg in consumer:
    print(type(msg))

在Kafka日志中,我看到正在创建主题"以及我不太确定其含义的其他行.

In the Kafka logs, I see the Topic getting created as well as other lines that I'm not quite sure what they mean.

[2021-04-07 18:05:40,234] INFO [broker-1001-to-controller-send-thread]: Recorded new controller, from now on will use broker 1001 (kafka.server.BrokerToControllerRequestThread)
[2021-04-07 18:06:37,509] INFO Creating topic __consumer_offsets with configuration {compression.type=producer, cleanup.policy=compact, segment.bytes=104857600} and initial partition assignment Map(23 -> ArrayBuffer(1001), 32 -> ArrayBuffer(1001), 41 -> ArrayBuffer(1001), 17 -> ArrayBuffer(1001), 8 -> ArrayBuffer(1001), 35 -> ArrayBuffer(1001), 44 -> ArrayBuffer(1001), 26 -> ArrayBuffer(1001), 11 -> ArrayBuffer(1001), 29 -> ArrayBuffer(1001), 38 -> ArrayBuffer(1001), 47 -> ArrayBuffer(1001), 20 -> ArrayBuffer(1001), 2 -> ArrayBuffer(1001), 5 -> ArrayBuffer(1001), 14 -> ArrayBuffer(1001), 46 -> ArrayBuffer(1001), 49 -> ArrayBuffer(1001), 40 -> ArrayBuffer(1001), 13 -> ArrayBuffer(1001), 4 -> ArrayBuffer(1001), 22 -> ArrayBuffer(1001), 31 -> ArrayBuffer(1001), 16 -> ArrayBuffer(1001), 7 -> ArrayBuffer(1001), 43 -> ArrayBuffer(1001), 25 -> ArrayBuffer(1001), 34 -> ArrayBuffer(1001), 10 -> ArrayBuffer(1001), 37 -> ArrayBuffer(1001), 1 -> ArrayBuffer(1001), 19 -> ArrayBuffer(1001), 28 -> ArrayBuffer(1001), 45 -> ArrayBuffer(1001), 27 -> ArrayBuffer(1001), 36 -> ArrayBuffer(1001), 18 -> ArrayBuffer(1001), 9 -> ArrayBuffer(1001), 21 -> ArrayBuffer(1001), 48 -> ArrayBuffer(1001), 3 -> ArrayBuffer(1001), 12 -> ArrayBuffer(1001), 30 -> ArrayBuffer(1001), 39 -> ArrayBuffer(1001), 15 -> ArrayBuffer(1001), 42 -> ArrayBuffer(1001), 24 -> ArrayBuffer(1001), 6 -> ArrayBuffer(1001), 33 -> ArrayBuffer(1001), 0 -> ArrayBuffer(1001)) (kafka.zk.AdminZkClient)
[2021-04-07 18:06:37,534] INFO [KafkaApi-1001] Auto creation of topic __consumer_offsets with 50 partitions and replication factor 1 is successful (kafka.server.KafkaApis)
[2021-04-07 18:06:37,547] INFO Creating topic TestTopic1 with configuration {} and initial partition assignment Map(0 -> ArrayBuffer(1001)) (kafka.zk.AdminZkClient)
[2021-04-07 18:06:37,557] INFO [KafkaApi-1001] Auto creation of topic TestTopic1 with 1 partitions and replication factor 1 is successful (kafka.server.KafkaApis)
[2021-04-07 18:06:37,906] INFO [ReplicaFetcherManager on broker 1001] Removed fetcher for partitions Set(__consumer_offsets-22, __consumer_offsets-30, __consumer_offsets-8, __consumer_offsets-21, __consumer_offsets-4, __consumer_offsets-27, __consumer_offsets-7, __consumer_offsets-9, __consumer_offsets-46, __consumer_offsets-25, __consumer_offsets-35, __consumer_offsets-41, __consumer_offsets-33, __consumer_offsets-23, __consumer_offsets-49, __consumer_offsets-47, __consumer_offsets-16, __consumer_offsets-28, __consumer_offsets-31, __consumer_offsets-36, __consumer_offsets-42, __consumer_offsets-3, __consumer_offsets-18, __consumer_offsets-37, __consumer_offsets-15, __consumer_offsets-24, __consumer_offsets-38, __consumer_offsets-17, __consumer_offsets-48, __consumer_offsets-19, __consumer_offsets-11, __consumer_offsets-13, __consumer_offsets-2, __consumer_offsets-43, __consumer_offsets-6, __consumer_offsets-14, __consumer_offsets-20, __consumer_offsets-0, __consumer_offsets-44, __consumer_offsets-39, __consumer_offsets-12, __consumer_offsets-45, __consumer_offsets-1, __consumer_offsets-5, __consumer_offsets-26, __consumer_offsets-29, __consumer_offsets-34, __consumer_offsets-10, __consumer_offsets-32, __consumer_offsets-40) (kafka.server.ReplicaFetcherManager)
[2021-04-07 18:06:37,979] INFO [Log partition=__consumer_offsets-0, dir=/bitnami/kafka/data] Loading producer state till offset 0 with message format version 2 (kafka.log.Log)
[2021-04-07 18:06:37,991] INFO Created log for partition __consumer_offsets-0 in /bitnami/kafka/data/__consumer_offsets-0 with properties {compression.type -> producer, min.insync.replicas -> 1, message.downconversion.enable -> true, segment.jitter.ms -> 0, cleanup.policy -> compact, flush.ms -> 9223372036854775807, retention.ms -> 604800000, segment.bytes -> 104857600, flush.messages -> 9223372036854775807, message.format.version -> 2.7-IV2, max.compaction.lag.ms -> 9223372036854775807, file.delete.delay.ms -> 60000, max.message.bytes -> 1048588, min.compaction.lag.ms -> 0, message.timestamp.type -> CreateTime, preallocate -> false, index.interval.bytes -> 4096, min.cleanable.dirty.ratio -> 0.5, unclean.leader.election.enable -> false, retention.bytes -> -1, delete.retention.ms -> 86400000, segment.ms -> 604800000, message.timestamp.difference.max.ms -> 9223372036854775807, segment.index.bytes -> 10485760}. (kafka.log.LogManager)
[2021-04-07 18:06:37,992] INFO [Partition __consumer_offsets-0 broker=1001] No checkpointed highwatermark is found for partition __consumer_offsets-0 (kafka.cluster.Partition)
[2021-04-07 18:06:37,994] INFO [Partition __consumer_offsets-0 broker=1001] Log loaded for partition __consumer_offsets-0 with initial high watermark 0 (kafka.cluster.Partition)
[2021-04-07 18:06:38,011] INFO [Log partition=__consumer_offsets-29, dir=/bitnami/kafka/data] Loading producer state till offset 0 with message format version 2 (kafka.log.Log)

然后,我的日志中还有另一类此类消息.

Then I have another series of this type of messages in my log.

[2021-04-07 18:06:38,563] INFO Created log for partition __consumer_offsets-13 in /bitnami/kafka/data/__consumer_offsets-13 with properties {compression.type -> producer, min.insync.replicas -> 1, message.downconversion.enable -> true, segment.jitter.ms -> 0, cleanup.policy -> compact, flush.ms -> 9223372036854775807, retention.ms -> 604800000, segment.bytes -> 104857600, flush.messages -> 9223372036854775807, message.format.version -> 2.7-IV2, max.compaction.lag.ms -> 9223372036854775807, file.delete.delay.ms -> 60000, max.message.bytes -> 1048588, min.compaction.lag.ms -> 0, message.timestamp.type -> CreateTime, preallocate -> false, index.interval.bytes -> 4096, min.cleanable.dirty.ratio -> 0.5, unclean.leader.election.enable -> false, retention.bytes -> -1, delete.retention.ms -> 86400000, segment.ms -> 604800000, message.timestamp.difference.max.ms -> 9223372036854775807, segment.index.bytes -> 10485760}. (kafka.log.LogManager)
[2021-04-07 18:06:38,563] INFO [Partition __consumer_offsets-13 broker=1001] No checkpointed highwatermark is found for partition __consumer_offsets-13 (kafka.cluster.Partition)
[2021-04-07 18:06:38,563] INFO [Partition __consumer_offsets-13 broker=1001] Log loaded for partition __consumer_offsets-13 with initial high watermark 0 (kafka.cluster.Partition)
[2021-04-07 18:06:38,577] INFO [GroupMetadataManager brokerId=1001] Scheduling loading of offsets and group metadata from __consumer_offsets-22 (kafka.coordinator.group.GroupMetadataManager)
[2021-04-07 18:06:38,579] INFO [GroupMetadataManager brokerId=1001] Scheduling loading of offsets and group metadata from __consumer_offsets-25 (kafka.coordinator.group.GroupMetadataManager)
[2021-04-07 18:06:38,579] INFO [GroupMetadataManager brokerId=1001] Scheduling loading of offsets and group metadata from __consumer_offsets-28 (kafka.coordinator.group.GroupMetadataManager)
....
[2021-04-07 18:06:38,589] INFO [GroupMetadataManager brokerId=1001] Finished loading offsets and group metadata from __consumer_offsets-22 in 12 milliseconds, of which 2 milliseconds was spent in the scheduler. (kafka.coordinator.group.GroupMetadataManager)
[2021-04-07 18:06:38,596] INFO [GroupMetadataManager brokerId=1001] Finished loading offsets and group metadata from __consumer_offsets-25 in 17 milliseconds, of which 17 milliseconds was spent in the scheduler. (kafka.coordinator.group.GroupMetadataManager)
[2021-04-07 18:06:38,597] INFO [GroupMetadataManager brokerId=1001] Finished loading offsets and group metadata from __consumer_offsets-28 in 18 milliseconds, of which 17 milliseconds was spent in the scheduler. (kafka.coordinator.group.GroupMetadataManager)
....
[2021-04-07 18:06:38,638] INFO [ReplicaFetcherManager on broker 1001] Removed fetcher for partitions Set(TestTopic1-0) (kafka.server.ReplicaFetcherManager)
[2021-04-07 18:06:38,643] INFO [Log partition=TestTopic1-0, dir=/bitnami/kafka/data] Loading producer state till offset 0 with message format version 2 (kafka.log.Log)
[2021-04-07 18:06:38,644] INFO Created log for partition TestTopic1-0 in /bitnami/kafka/data/TestTopic1-0 with properties {compression.type -> producer, min.insync.replicas -> 1, message.downconversion.enable -> true, segment.jitter.ms -> 0, cleanup.policy -> [delete], flush.ms -> 9223372036854775807, retention.ms -> 604800000, segment.bytes -> 1073741824, flush.messages -> 9223372036854775807, message.format.version -> 2.7-IV2, max.compaction.lag.ms -> 9223372036854775807, file.delete.delay.ms -> 60000, max.message.bytes -> 1048588, min.compaction.lag.ms -> 0, message.timestamp.type -> CreateTime, preallocate -> false, index.interval.bytes -> 4096, min.cleanable.dirty.ratio -> 0.5, unclean.leader.election.enable -> false, retention.bytes -> -1, delete.retention.ms -> 86400000, segment.ms -> 604800000, message.timestamp.difference.max.ms -> 9223372036854775807, segment.index.bytes -> 10485760}. (kafka.log.LogManager)
[2021-04-07 18:06:38,647] INFO [Partition TestTopic1-0 broker=1001] No checkpointed highwatermark is found for partition TestTopic1-0 (kafka.cluster.Partition)
[2021-04-07 18:06:38,647] INFO [Partition TestTopic1-0 broker=1001] Log loaded for partition TestTopic1-0 with initial high watermark 0 (kafka.cluster.Partition)

我在这里看不到任何与消费者有关的信息.

I don't see anything related to the consumer in here.

请注意,这只是一个开发服务器.我们应该将其用作概念验证,以了解Kafka是否对我们有用,并看看我们是否会在产品中使用它.

Note that this is only a dev server. We're supposed to use that as a proof of concept to see if Kafka works for us and see if we'll use it in prod.

任何帮助将不胜感激,因为我们非常希望能够使其投入生产并在生产中使用.

Any help would be appreciated as we'd really like to be able to make it work and use it in production.

推荐答案

安装在远程服务器上.

installed on a remote server.

然后,您需要在 KAFKA_CFG_ADVERTISED_LISTENERS 中发布该服务器的地址,仅端口映射是不够的

Then you need to advertise that server's address in KAFKA_CFG_ADVERTISED_LISTENERS, just a port mapping isn't sufficient

由于引导协议返回广告地址而超时,因此您的远程使用者正尝试从 localhost:9093

It's timing out because the bootstrap protocol returns the advertised address, so your remote consumer is trying to read from localhost:9093

您的生产者也会遇到类似的问题,但是您并没有刷新生产者来实际发送数据

Your producer would also have a similar issue, but you aren't flushing the producer to actually send data

如果您使用Docker编排平台进行生产,则需要解决其他网络配置

If you productionize using Docker orchestration platforms, you'll need to work around other networking configurations

这篇关于卡夫卡消费者.民意测验与bitnami容器一起垂悬的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆