Node.js:一次接收太多UDP消息,导致丢失 [英] Node.js: Receiving too many UDP messages at a time, losing them

查看:250
本文介绍了Node.js:一次接收太多UDP消息,导致丢失的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我的节点服务器在一秒钟内接收到大约 400条UDP消息,并且所有正常工作,我能够处理所有400条UDP消息.

但是,当我在一秒钟内开始收到大约 700条UDP消息时,我丢失了2-20 条消息,并且它们从未得到解析:((

我在这里考虑过一些选项:

  1. 创建所有套接字消息的队列,然后一个一个地消耗, 虽然我不确定如何实现
    • 不知道如何实现
  2. Node /Express/dgram套接字中找到一个设置,我可以在其中增加内存大小/缓冲区大小,类似这样
    • 尽管如此,我找不到任何这样的设置:(
  3. 使用其他UDP接收器,停止使用节点的内置套接字UDP接收器
    • 找不到其他接收者

这是我的UDP发送者的样子:

var dgram = require("dgram");
var udpserver = dgram.createSocket("udp4");
var seatStateStore = require("./SeatStateStore");

udpserver.on("message",
        function (msg, rinfo)
        {
        seatStateStore.parseMessage(msg.toString());
    });

有人有什么想法吗?我不知道这三个选项中的任何一个:/有人可以帮我吗?

节点v0.10.29

Express v3.14.0

==============================

更新/解决方案

这是我最终使用的代码(略微修改了@RoyHB的解决方案):

var dgram = require("dgram");
var udpserver = dgram.createSocket("udp4");
var seatStateStore = require("./SeatStateStore");
var Dequeue = require('dequeue');
var FIFO = new Dequeue();

fetcher();

udpserver.on("message",
        function (msg, rinfo)
        {
           FIFO.push(msg.toString());
        });

udpserver.bind(43278);

function fetcher () {
    while (FIFO.length > 0) 
    {
        var msg = FIFO.shift();
        seatStateStore.parseMessage(msg);
    }
    setImmediate(fetcher); //make this function continuously run
}

解决方案

有一个名为 node的NPM模块-出队.在与您类似的情况下,我经常使用它.

基本上

  1. 您的程序将收到的消息推送到队列的末尾.
  2. 间隔计时器会定期激活另一种方法或功能(队列提取器),该方法或函数检查队列中是否有消息,如果有,则提取一个或多个消息并进行处理.
  3. 或者(可能更好),不使用任何计时器来调度队列获取.而是使用节点process.nextTick方法.

或者,可能更可取的是,您可以使用节点process.nextTick连续检查队列中是否有消息.

理想情况下,seatStateStore.parseMessage将创建一个新对象以异步处理一条消息,以便parseMessage可以立即返回,而实际的消息处理在后台继续进行. (请参见示例代码的底部)

我没有测试下面的代码,这只是为了说明而不是运行

var FIFO = require ('dequeue');
var seatStateStore = require("./SeatStateStore");
var dgram = require("dgram");

setInterval(fetcher, 1);

var udpserver = dgram.createSocket("udp4");

udpserver.on("message",
    function (msg, rinfo) {
        FIFO.push(msg);
    }
);

function fetcher () {
    while (FIFO.length > 0) {
        var msg = FIFO.shift();
        seatStateStore.parseMessage(msg);
    }
}

**或(可能更好)**

var FIFO = require ('dequeue');
var seatStateStore = require("./SeatStateStore");
var dgram = require("dgram");

fetcher();

var udpserver = dgram.createSocket("udp4");

udpserver.on("message",
    function (msg, rinfo) {
        FIFO.push(msg);
    }
);

function fetcher () {
    while (FIFO.length > 0) {
        var msg = FIFO.shift();
        seatStateStore.parseMessage(msg);
        process.nextTick(fetcher);
    }
}

seatStateProcessor.parseMessage的概述:

seatStateProcessor.parseMessage = function (msg) {
    proc = new asyncProcHandler(msg, function (err) {
        if (err) {
            //handle the error
        }
    });
}

My node server receives about 400 UDP messages in one second, and it all works, and I am able to process all 400 of them.

However, when I start to receive about 700 UDP messages in one second, I lose 2-20 of the messages, and they never get parsed :(

I have thought about some options here:

  1. Create a queue of all the socket messages, then consume one-by-one, although I'm not sure how to implement this
    • Can't figure out how to implement
  2. Find a setting in Node / Express / dgram socket where i can increase the memory size / buffer size, something like that
    • I couldn't find any settings like this, though :(
  3. Use a different UDP receiver, stop using node's build in socket UDP receiver
    • Didn't find other receivers

Here's what my UDP sender looks like:

var dgram = require("dgram");
var udpserver = dgram.createSocket("udp4");
var seatStateStore = require("./SeatStateStore");

udpserver.on("message",
        function (msg, rinfo)
        {
        seatStateStore.parseMessage(msg.toString());
    });

Anyone have any ideas? I couldn't figure out any of the 3 options :/ Can someone help me out?

Node v0.10.29

Express v3.14.0

===============================

UPDATE / SOLUTION

Here's the code I ended up using (slightly modified @RoyHB 's solution):

var dgram = require("dgram");
var udpserver = dgram.createSocket("udp4");
var seatStateStore = require("./SeatStateStore");
var Dequeue = require('dequeue');
var FIFO = new Dequeue();

fetcher();

udpserver.on("message",
        function (msg, rinfo)
        {
           FIFO.push(msg.toString());
        });

udpserver.bind(43278);

function fetcher () {
    while (FIFO.length > 0) 
    {
        var msg = FIFO.shift();
        seatStateStore.parseMessage(msg);
    }
    setImmediate(fetcher); //make this function continuously run
}

解决方案

There is a NPM module called node-dequeue. I use it a lot for similar situations to yours.

basically,

  1. your program pushes received messages onto the end of the queue.
  2. an interval timer periodically activates another method or function ( a queue-fetcher) which checks to see if there are messages on the queue and if so, fetches one or more and processes it.
  3. Alternatively (maybe better) no timer is used to schedule queue fetches. Instead the node process.nextTick method is used.

Alternatively, maybe preferably, you can use node process.nextTick to continuously check the queue for messages.

Ideally, seatStateStore.parseMessage would create a new object to asynchronously process one message so that parseMessage returns without delay while the actual message processing continues in the background. (see bottom of example code )

I haven't tested the code below, it's meant to illustrate, not to run

var FIFO = require ('dequeue');
var seatStateStore = require("./SeatStateStore");
var dgram = require("dgram");

setInterval(fetcher, 1);

var udpserver = dgram.createSocket("udp4");

udpserver.on("message",
    function (msg, rinfo) {
        FIFO.push(msg);
    }
);

function fetcher () {
    while (FIFO.length > 0) {
        var msg = FIFO.shift();
        seatStateStore.parseMessage(msg);
    }
}

** OR (maybe better) **

var FIFO = require ('dequeue');
var seatStateStore = require("./SeatStateStore");
var dgram = require("dgram");

fetcher();

var udpserver = dgram.createSocket("udp4");

udpserver.on("message",
    function (msg, rinfo) {
        FIFO.push(msg);
    }
);

function fetcher () {
    while (FIFO.length > 0) {
        var msg = FIFO.shift();
        seatStateStore.parseMessage(msg);
        process.nextTick(fetcher);
    }
}

Outline of seatStateProcessor.parseMessage:

seatStateProcessor.parseMessage = function (msg) {
    proc = new asyncProcHandler(msg, function (err) {
        if (err) {
            //handle the error
        }
    });
}

这篇关于Node.js:一次接收太多UDP消息,导致丢失的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆