Java 消费者听 MSK 主题的 Lambda 问题 [英] Problem in Lambda for Java consumer listening to MSK topic

查看:27
本文介绍了Java 消费者听 MSK 主题的 Lambda 问题的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在观察一个特殊的问题.我在 Java 8 中构建了一个 Lambda 函数;它使用 Kafka 消费者 api 轮询 MSK 主题,即 consumer.poll(5000)(我尝试过各种超时).还有第二个 Lambda 函数,它是生产者并向同一主题发送消息.这两个功能都附加到 MSK 所在的 VPC.
制片人运作良好.我可以从 EC2 看到运行 Kafka 控制台使用者的消息.但是消费者 Lambda 不起作用,它只是超时.

I am observing a peculiar issue. I have built a Lambda function in Java 8; which polls a MSK topic using Kafka consumer api i.e. consumer.poll(5000) (I have tried with various timeout). There is a second Lambda function which is producer and sending messages to same topic. Both functions are attached to VPC in which MSK is in.
The producer working well. I can see the messages b running Kafka console consumer from an EC2. But the consumer Lambda does not work, it just gives timed out.

只有当我同时在 EC2 和 Lambda 消费者上运行 lambda 生产者、Kafka 控制台消费者时,消费者才会收到一些消息!!准确地说,生产者循环发送 5 条消息,EC2 控制台消费者显示所有 5 条消息,但 lambda 消费者显示第 3 条或第 4 条消息.

Only when I am running lambda producer, Kafka console consumer on the EC2 and Lambda consumer simultaneously, the consumer is getting some message !! To be precise, the producer sending 5 messages in a loop, the EC2 console consumer shows all 5, but the lambda consumer is showing 3rd or 4th message.

为什么会这样?这里可能有什么问题,我如何才能在 lambda 消费者中始终如一地获取消息?

Why is this happening? What might be issue here, and How can I get the messages consistently in lambda consumer?

如果有人有可用的代码示例,我将不胜感激.

If anybody has a working code sample, I would be very grateful.

谢谢.

进一步更新:我已经调度了消费者函数,然后它获取了所有事件.我还有以下问题 - 1>为什么我手动触发该功能时没有收到消息?2>我测试了一个用 Python 编写的消费者函数;那也没有收到任何消息.这是python代码:

Further update: I have scheduled the consumer function, and then it gets all the events. I still have following questions - 1> Why is it not getting the messages when I manually trigger the function? 2> I tested a consumer function written in Python; that is not getting any messages too. Here is the python code:

def lambda_handler(event, context):
bootstrap_servers = ["<msk bootstrap>"]
topicName = '<mp-topic-name>'

consumer = KafkaConsumer (topicName, group_id = 'test',bootstrap_servers = bootstrap_servers,auto_offset_reset = 'earliest', consumer_timeout_ms=5000)

for message in consumer:
      consumer.commit()
      print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,message.offset, message.key,message.value))
    
KafkaConsumer.close()         
return ("Processed")


Java 消费者代码;它是一个普通的 Kafka 客户端.从 Lambda 中,这能够在以预定方式运行时读取消息;但在手动测试时不会.


Code of the Java consumer; it is a plain Kafka client. From Lambda this is able to read messages when run in scheduled manner; but not when manually tested.

    try {
        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);

        consumer.subscribe(Arrays.asList(topicName));

        context.getLogger().log("Subscribed to topic " + topicName);
        
        int i = 0;
        ConsumerRecords<String, String> records = consumer.poll(5000);          
        
        for (ConsumerRecord<String, String> record : records) {
            context.getLogger().log("Message:::  offset = "+record.offset()+", key = "+record.key()+", value = "+record.value()+"\n");
        }
        context.getLogger().log("After messages");
    } catch (Exception e) {
        e.printStackTrace();
        context.getLogger().log("Exception: "+e.getMessage());
    }

推荐答案

如果你能再坚持一个月左右就好了!AWS 现在支持来自 MSK 和自我管理的 kafka 集群(甚至在 AWS 之外)的 kafka 事件源

if only you'd have held on for another month or so! AWS now supports kafka event sources from both MSK and self-managed kafka clusters (even outside of AWS)

https://aws.amazon.com/blogs/compute/using-amazon-msk-as-an-event-source-for-aws-lambda/

https://aws.amazon.com/about-aws/whats-new/2020/12/aws-lambda-now-supports-self-managed-apache-kafka-作为事件源/

这篇关于Java 消费者听 MSK 主题的 Lambda 问题的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆