Lambda中Java使用者收听MSK主题时出现的问题 [英] Problem in Lambda for Java consumer listening to MSK topic

查看:53
本文介绍了Lambda中Java使用者收听MSK主题时出现的问题的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在观察一个特殊的问题.我已经在Java 8中构建了Lambda函数;它使用Kafka消费者api(即consumer.poll(5000))轮询MSK主题(我已经尝试过各种超时).第二个Lambda函数是生产者并将消息发送到同一主题.这两个功能都附加到装有MSK的VPC.
生产者运作良好.我可以从EC2看到消息b运行Kafka控制台使用者.但是消费者Lambda无效,只是超时.

I am observing a peculiar issue. I have built a Lambda function in Java 8; which polls a MSK topic using Kafka consumer api i.e. consumer.poll(5000) (I have tried with various timeout). There is a second Lambda function which is producer and sending messages to same topic. Both functions are attached to VPC in which MSK is in.
The producer working well. I can see the messages b running Kafka console consumer from an EC2. But the consumer Lambda does not work, it just gives timed out.

只有当我同时在EC2上运行lambda生产者,Kafka控制台使用者和Lambda使用者时,该使用者才得到一些信息!确切地说,生产者循环发送5条消息,EC2控制台使用者显示所有5条消息,而lambda使用者显示第三条或第四条消息.

Only when I am running lambda producer, Kafka console consumer on the EC2 and Lambda consumer simultaneously, the consumer is getting some message !! To be precise, the producer sending 5 messages in a loop, the EC2 console consumer shows all 5, but the lambda consumer is showing 3rd or 4th message.

为什么会这样?这可能是什么问题,如何在lambda使用者中始终如一地获取消息?

Why is this happening? What might be issue here, and How can I get the messages consistently in lambda consumer?

如果有人有一个有效的代码示例,我将不胜感激.

If anybody has a working code sample, I would be very grateful.

谢谢.

进一步更新:我已经安排了使用者函数,然后它将获取所有事件.我还有以下问题-1>手动触发该功能时为什么没有收到消息?2>我测试了用Python编写的使用者函数;也没有收到任何消息.这是python代码:

Further update: I have scheduled the consumer function, and then it gets all the events. I still have following questions - 1> Why is it not getting the messages when I manually trigger the function? 2> I tested a consumer function written in Python; that is not getting any messages too. Here is the python code:

def lambda_handler(event, context):
bootstrap_servers = ["<msk bootstrap>"]
topicName = '<mp-topic-name>'

consumer = KafkaConsumer (topicName, group_id = 'test',bootstrap_servers = bootstrap_servers,auto_offset_reset = 'earliest', consumer_timeout_ms=5000)

for message in consumer:
      consumer.commit()
      print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,message.offset, message.key,message.value))
    
KafkaConsumer.close()         
return ("Processed")


Java使用者的代码;这是一个简单的Kafka客户.从Lambda可以按计划的方式读取消息;但手动测试时不会.


Code of the Java consumer; it is a plain Kafka client. From Lambda this is able to read messages when run in scheduled manner; but not when manually tested.

    try {
        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);

        consumer.subscribe(Arrays.asList(topicName));

        context.getLogger().log("Subscribed to topic " + topicName);
        
        int i = 0;
        ConsumerRecords<String, String> records = consumer.poll(5000);          
        
        for (ConsumerRecord<String, String> record : records) {
            context.getLogger().log("Message:::  offset = "+record.offset()+", key = "+record.key()+", value = "+record.value()+"\n");
        }
        context.getLogger().log("After messages");
    } catch (Exception e) {
        e.printStackTrace();
        context.getLogger().log("Exception: "+e.getMessage());
    }

推荐答案

(如果您只待了一个月左右)!AWS现在支持来自MSK和自我管理的kafka集群(甚至在AWS外部)的kafka事件源

if only you'd have held on for another month or so! AWS now supports kafka event sources from both MSK and self-managed kafka clusters (even outside of AWS)

https://aws.amazon.com/blogs/compute/using-amazon-msk-as-an-event-source-for-aws-lambda/

https://aws.amazon.com/about-aws/whats-new/2020/12/aws-lambda-now-supports-self-managed-apache-kafka-作为事件源/

这篇关于Lambda中Java使用者收听MSK主题时出现的问题的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆