AWS API网关WebSocket接收消息不一致 [英] AWS API gateway websocket receives messages inconsistently

查看:28
本文介绍了AWS API网关WebSocket接收消息不一致的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我在API网关中有一个连接到lambda的WebSocket,如下所示:

const AWS = require('aws-sdk');
const amqp = require('amqplib');

const api = new AWS.ApiGatewayManagementApi({
    endpoint: 'MY_ENDPOINT',
});

async function sendMsgToApp(response, connectionId) {
    console.log('=========== posting reply');
    const params = {
        ConnectionId: connectionId,
        Data: Buffer.from(response),
    };
    return api.postToConnection(params).promise();
}

let rmqServerUrl =
    'MY_RMQ_SERVER_URL';
let rmqServerConn = null;

exports.handler = async event => {
    console.log('websocket event:', event);
    const { routeKey: route, connectionId } = event.requestContext;

    switch (route) {
        case '$connect':
            console.log('user connected');
            const creds = event.queryStringParameters.x;
            console.log('============ x.length:', creds.length);
            const decodedCreds = Buffer.from(creds, 'base64').toString('utf-8');
            try {
                const conn = await amqp.connect(
                    `amqps://${decodedCreds}@${rmqServerUrl}`
                );
                const channel = await conn.createChannel();
                console.log('============ created channel successfully:');
                rmqServerConn = conn;
                const [userId] = decodedCreds.split(':');
                const { queue } = await channel.assertQueue(userId, {
                    durable: true,
                    autoDelete: false,
                });
                console.log('============ userId:', userId, 'queue:', queue);
                channel.consume(queue, msg => {
                    console.log('========== msg:', msg);
                    const { content } = msg;
                    const msgString = content.toString('utf-8');
                    console.log('========== msgString:', msgString);
                    sendMsgToApp(msgString, connectionId)
                        .then(res => {
                            console.log(
                                '================= sent queued message to the app, will ack, outcome:',
                                res
                            );
                            try {
                                channel.ack(msg);
                            } catch (e) {
                                console.log(
                                    '================= error acking message:',
                                    e
                                );
                            }
                        })
                        .catch(e => {
                            console.log(
                                '================= error sending queued message to the app, will not ack, error:',
                                e
                            );
                        });
                });
            } catch (e) {
                console.log(
                    '=========== error initializing amqp connection',
                    e
                );
                if (rmqServerConn) {
                    await rmqServerConn.close();
                }
                const response = {
                    statusCode: 401,
                    body: JSON.stringify('failed auth!'),
                };
                return response;
            }
            break;
        case '$disconnect':
            console.log('user disconnected');
            if (rmqServerConn) {
                await rmqServerConn.close();
            }
            break;
        case 'message':
            console.log('message route');
            await sendMsgToApp('test', connectionId);
            break;
        default:
            console.log('unknown route', route);
            break;
    }
    const response = {
        statusCode: 200,
        body: JSON.stringify('Hello from websocket Lambda!'),
    };
    return response;
};

AMQP连接用于由amazonmq提供的rabbitmq服务器。我遇到的问题是,发布到队列的消息要么根本不会在.consume回调中显示,要么只在断开并重新连接WebSocket之后才显示。从本质上讲,他们一直在失踪,直到很久以后,他们才出人意料地出现。那是在网络插座里。即使它们出现了,它们也不会被发送到连接到网络插座的客户端(在本例中为app)。这里可能有什么问题?

推荐答案

这里的问题是,我对API Gateway的WebSocket的工作方式有错误的理解。API网关维护WebSocket连接,但不维护lambda本身。我将.consume订阅逻辑放在lambda中,这不起作用,因为lambda运行并终止,而不是保持活动状态。更好的方法是make the queue an event source for the lambda。然而,这对我也不起作用,因为它要求您在设置lambda时了解您的队列。我的队列是动态创建的,因此违反了要求。我最终在一台VP上安装了一台RMQ服务器。

这篇关于AWS API网关WebSocket接收消息不一致的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆