Lambda中Java使用者收听MSK主题时出现的问题 [英] Problem in Lambda for Java consumer listening to MSK topic
问题描述
我正在观察一个特殊的问题.我已经在Java 8中构建了Lambda函数;它使用Kafka消费者api(即consumer.poll(5000))轮询MSK主题(我已经尝试过各种超时).第二个Lambda函数是生产者并将消息发送到同一主题.这两个功能都附加到装有MSK的VPC.
生产者运作良好.我可以从EC2看到消息b运行Kafka控制台使用者.但是消费者Lambda无效,只是超时.
I am observing a peculiar issue.
I have built a Lambda function in Java 8; which polls a MSK topic using Kafka consumer api i.e. consumer.poll(5000) (I have tried with various timeout). There is a second Lambda function which is producer and sending messages to same topic. Both functions are attached to VPC in which MSK is in.
The producer working well. I can see the messages b running Kafka console consumer from an EC2.
But the consumer Lambda does not work, it just gives timed out.
只有当我同时在EC2上运行lambda生产者,Kafka控制台使用者和Lambda使用者时,该使用者才得到一些信息!确切地说,生产者循环发送5条消息,EC2控制台使用者显示所有5条消息,而lambda使用者显示第三条或第四条消息.
Only when I am running lambda producer, Kafka console consumer on the EC2 and Lambda consumer simultaneously, the consumer is getting some message !! To be precise, the producer sending 5 messages in a loop, the EC2 console consumer shows all 5, but the lambda consumer is showing 3rd or 4th message.
为什么会这样?这可能是什么问题,如何在lambda使用者中始终如一地获取消息?
Why is this happening? What might be issue here, and How can I get the messages consistently in lambda consumer?
如果有人有一个有效的代码示例,我将不胜感激.
If anybody has a working code sample, I would be very grateful.
谢谢.
进一步更新:我已经安排了使用者函数,然后它将获取所有事件.我还有以下问题-1>手动触发该功能时为什么没有收到消息?2>我测试了用Python编写的使用者函数;也没有收到任何消息.这是python代码:
Further update: I have scheduled the consumer function, and then it gets all the events. I still have following questions - 1> Why is it not getting the messages when I manually trigger the function? 2> I tested a consumer function written in Python; that is not getting any messages too. Here is the python code:
def lambda_handler(event, context):
bootstrap_servers = ["<msk bootstrap>"]
topicName = '<mp-topic-name>'
consumer = KafkaConsumer (topicName, group_id = 'test',bootstrap_servers = bootstrap_servers,auto_offset_reset = 'earliest', consumer_timeout_ms=5000)
for message in consumer:
consumer.commit()
print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,message.offset, message.key,message.value))
KafkaConsumer.close()
return ("Processed")
Java使用者的代码;这是一个简单的Kafka客户.从Lambda可以按计划的方式读取消息;但手动测试时不会.
Code of the Java consumer; it is a plain Kafka client. From Lambda this is able to read messages when run in scheduled manner; but not when manually tested.
try {
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
consumer.subscribe(Arrays.asList(topicName));
context.getLogger().log("Subscribed to topic " + topicName);
int i = 0;
ConsumerRecords<String, String> records = consumer.poll(5000);
for (ConsumerRecord<String, String> record : records) {
context.getLogger().log("Message::: offset = "+record.offset()+", key = "+record.key()+", value = "+record.value()+"\n");
}
context.getLogger().log("After messages");
} catch (Exception e) {
e.printStackTrace();
context.getLogger().log("Exception: "+e.getMessage());
}
推荐答案
(如果您只待了一个月左右)!AWS现在支持来自MSK和自我管理的kafka集群(甚至在AWS外部)的kafka事件源
if only you'd have held on for another month or so! AWS now supports kafka event sources from both MSK and self-managed kafka clusters (even outside of AWS)
https://aws.amazon.com/blogs/compute/using-amazon-msk-as-an-event-source-for-aws-lambda/
这篇关于Lambda中Java使用者收听MSK主题时出现的问题的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!