使用XMLHttpRequest进行内存高效的消息块处理 [英] Memory efficient message chunk processing using a XMLHttpRequest

查看:122
本文介绍了使用XMLHttpRequest进行内存高效的消息块处理的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个 XMLHttpRequest ,带有一个 progress 事件处理程序,它正在请求一个连续发送添加消息块的分块页面。如果我没有设置 responseType ,我可以访问 XMLHttpRequest <的响应属性/ code>在每个 progress 事件中处理附加的消息块。这种方法的问题是浏览器必须将整个响应保留在内存中,最终,由于内存浪费,浏览器将崩溃。

I have a XMLHttpRequest with a progress event handler that is requesting a chunked page which continuously sends adds message chunks. If I do not set a responseType, I can access the response property of the XMLHttpRequest in each progress event and handle the additional message chunk. The problem of this approach is that the browser must keep the entire response in memory, and eventually, the browser will crash due to this memory waste.

所以,我试过了 responseType arraybuffer 希望我可以切片缓冲区以防止以前过多的内存浪费。不幸的是, progress 事件处理程序不再能够读取 XMLHttpRequest的响应属性此时。 progress 事件的事件参数也不包含缓冲区。这是我在此尝试的一个简短的,自包含的示例(这是为 node.js 编写的):

So, I tried a responseType of arraybuffer in the hope that I can slice the buffer to prevent the previous excessive memory waste. Unfortunately, the progress event handler is no longer capable of reading the response property of the XMLHttpRequest at this point. The event parameter of the progress event does not contain the buffer, either. Here is a short, self-contained example of my attempt at this (this is written for node.js):

var http = require('http');

// -- The server.

http.createServer(function(req, res) {
  if (req.url === '/stream') return serverStream(res);
  serverMain(res);
}).listen(3000);

// -- The server functions to send a HTML page with the client code, or a stream.

function serverMain(res) {
  res.writeHead(200, {'Content-Type': 'text/html'});
  res.write('<html><body>Hello World</body><script>');
  res.end(client.toString() + ';client();</script></html>');
}

function serverStream(res) {
  res.writeHead(200, {'Content-Type': 'text/html'});
  setInterval(function() {
    res.write('Hello World<br />\n');
  }, 1000);
}

// -- The client code which runs in the browser.

function client() {
  var xhr = new XMLHttpRequest();
  xhr.addEventListener('progress', function() {
    if (!xhr.response) return console.log('progress without response :-(');
    console.log('progress: ' + xhr.response.size);
  }, false);
  xhr.open('GET', '/stream', true);
  xhr.responseType = 'arraybuffer';
  xhr.send();
}

progress 事件处理程序无法访问我想要的响应。如何在内存中处理浏览器中的消息块 - 有效的方法?请不建议 WebSocket 。我不想只使用一个只处理消息块的只读流。

The progress event handler has no access to the response I wanted. How can I handle the message chunks in the browser in a memory-efficient way? Please do not suggest a WebSocket. I do not wish to use one just to process a read-only stream of message chunks.

推荐答案

XMLHttpRequest 似乎并不是真的设计用于这种用法。显而易见的解决方案是轮询,这是一种流行的使用 XMLHttpRequest 但我猜你不想错过你的流中的数据电话。

XMLHttpRequest doesn't seem really designed for this kind of usage. The obvious solution is polling, which is a popular use of XMLHttpRequest but I'm guessing you don't want to miss data from your stream that would slip between the calls.

我的问题 可以以某种方式识别真实数据块还是基本上是随机数据? ,你回答了 付出一些努力,可以通过向服务器端添加事件ID来识别块

To my question Can the "real" data chunks be identified in some way or is it basically random data ?, you answered With some effort, the chunks could be identified by adding an event-id of sorts to the server-side

基于此前提,我建议:


  1. 连接到流和设置进度监听器(称为 listenerA())。

  2. 当一个块到达时,处理它并输出它。保持对 listenerA()收到的第一个和最后一个块的ID的引用。计算已收到多少块 listenerA()

  3. 之后侦听器()已收到一定数量的块,产生另一个线程(连接+监听器, listenerB())与第一个并行执行步骤1和2但保持处理后的数据在缓冲区而不是输出它。

  4. listenerA()接收到与第一个块具有相同id的块时收到 listenerB(),发送信号到 listenerB(),删除第一个连接并终止 listenerA()

  5. listenerB()从<$ c收到终止信号时$ c> listenerA(),将缓冲区转储到输出并继续正常处理。

  6. 拥有 listenerB() spawn listenerC()与之前的条件相同。

  7. 根据需要继续重复多次连接+监听。

  1. Connect to the stream and set up the progress listener (referred to as listenerA()).
  2. When a chunk arrives, process it and output it. Keep a reference to the ids of both the first and last chunk received by listenerA(). Count how many chunks listenerA() has received.
  3. After listenerA() has received a certain amount of chunks, spawn another "thread" (connection + listener, listenerB()) doing the steps 1 and 2 in parallel to the first one but keep the processed data in a buffer instead of outputting it.
  4. When listenerA() receives the chunk with the same id as the first chunk received by listenerB(), send a signal to listenerB(), drop the first connection and kill listenerA().
  5. When listenerB() receives the termination signal from the listenerA(), dump the buffer to the output and keep processing normally.
  6. Have listenerB() spawn listenerC() on the same conditions as before.
  7. Keep repeating with as many connections + listeners as necessary.

使用两个重叠连接ctions,您可以防止丢失单个连接然后重新连接可能导致的块丢失。

By using two overlapping connections, you can prevent the possible loss of chunks that would result from dropping a single connection and then reconnecting.


  • 这假设所有连接的数据流相同,并且不会引入一些个性化设置。

  • 取决于流的输出速率和连接延迟,从一个连接转换到另一个连接期间的缓冲区转储可能会很明显。

  • 您还可以测量总响应大小而不是块计数来决定何时切换到新连接。

  • 可能需要保留一个完整的块ID列表来进行比较,而不仅仅是第一个和最后一个,因为我们无法保证重叠的时间。 / li>
  • XMLHttpRequest responseType 必须设置为其默认值或text,返回文字。其他数据类型不会返回部分响应。请参阅 https://xhr.spec.whatwg.org/#the-response-attribute

  • This assumes the data stream is the same for all connections and doesn't introduce some individualized settings.
  • Depending on the output rate of the stream and the connection delay, the buffer dump during the transition from one connection to another might be noticeable.
  • You could also measure the total response size rather than the chunks count to decide when to switch to a new connection.
  • It might be necessary to keep a complete list of chunks ids to compare against rather than just the first and last one because we can't guarantee the timing of the overlap.
  • The responseType of XMLHttpRequest must be set to its default value of "" or "text", to return text. Other datatypes will not return a partial response. See https://xhr.spec.whatwg.org/#the-response-attribute

以下code是一个node.js服务器,它输出一致的元素流以用于测试目的。您可以打开多个连接,输出将是相同的会话,减去可能的服务器滞后。

The following code is a node.js server that outputs a consistent stream of elements for testing purposes. You can open multiple connections to it, the output will be the same accross sessions, minus possible server lag.


http:// localhost:5500 / stream

将返回数据,其中id是递增的数字

will return data where id is an incremented number


http:// localhost:5500 / streamRandom

将返回id为随机的数据40字符长串。这是为了测试一个不能依赖id来排序数据的场景。

will return data where id is a random 40 characters long string. This is meant to test a scenario where the id can not be relied upon for ordering the data.

var crypto = require('crypto');

// init + update nodeId
var nodeId     = 0;
var nodeIdRand = '0000000000000000000000000000000000000000';

setInterval(function() {

    // regular id
    ++nodeId;

    //random id
    nodeIdRand = crypto.createHash('sha1').update(nodeId.toString()).digest('hex');
}, 1000);


// create server  (port 5500)
var http = require('http');
http.createServer(function(req, res) {

  if(req.url === '/stream') {
      return serverStream(res);
  }
  else if(req.url === '/streamRandom') {
      return serverStream(res, true);
  }
}).listen(5500);


// serve nodeId
function serverStream(res, rand) {

    // headers
    res.writeHead(200, {
        'Content-Type'                : 'text/plain',
        'Access-Control-Allow-Origin' : '*',
    });

    // remember last served id
    var last = null;

    // output interval
    setInterval(function() {

        // output on new node
        if(last != nodeId) {
            res.write('[node id="'+(rand ? nodeIdRand : nodeId)+'"]');
            last = nodeId;
        }
    }, 250);
}



概念证明,使用上述node.js服务器代码



Proof of concept, using aforementioned node.js server code

<!DOCTYPE html>
<html>
    <head>
        <meta http-equiv="Content-Type" content="text/html; charset=utf-8"/>
    </head>
    <body>
        <button id="stop">stop</button>
        <div id="output"></div>
        <script>

/*
Listening to a never ending page load (http stream) without running out of
memory by using concurrent overlapping connections to prevent loss of data,
using only xmlHttpRequest, under the condition that the data can be identified.

listen arguments
    url         url of the http stream
    chunkMax    number of chunks to receive before switching to new connection

listen properties
    output           a reference to a DOM element with id "output"
    queue            an array filled with non-duplicate received chunks and metadata
    lastFetcherId    an incrementing number used to assign an id to new fetchers
    fetchers         an array listing all active fetchers

listen methods
    fire        internal use    fire an event
    stop        external use    stop all connections
    fetch       internal use    starts a new connection
    fetchRun    internal use    initialize a new fetcher object

Usage

    var myListen = new listen('http://localhost:5500/streamRandom', 20);
        will listen to url "http://localhost:5500/streamRandom"
        will switch connections every 20 chunks

    myListen.stop()
        will stop all connections in myListen
*/
function listen(url, chunkMax) {

    // main ref
    var that = this;

    // output element
    that.output = document.getElementById('output');

    // main queue
    that.queue = [];

    // last fetcher id
    that.lastFetcherId = 0;

    // list of fetchers
    that.fetchers = [];




    //********************************************************* event dispatcher
    that.fire = function(name, data) {
        document.dispatchEvent(new CustomEvent(name, {'detail':data}));
    }




    //******************************************************** kill all fetchers
    that.stop = function() {
        that.fire('fetch-kill', -1);
    }




    //************************************************************** url fetcher
    that.fetch = function(fetchId, url, fetchRef) {

        //console.log('start fetcher #'+fetchId);
        var len = 0;
        var xhr = new XMLHttpRequest();
        var cb_progress;
        var cb_kill;


        // progress listener
        xhr.addEventListener('progress', cb_progress = function(e) {

            // extract chunk data
            var chunkData = xhr.response.substr(len);

            // chunk id
            var chunkId = chunkData.match(/id="([a-z0-9]+)"/)[1];

            // update response end point
            len = xhr.response.length;

            // signal end of chunk processing
            that.fire('chunk-ready', {
                'fetchId'   : fetchId,
                'fetchRef'  : fetchRef,
                'chunkId'   : chunkId,
                'chunkData' : chunkData,
            });
        }, false);


        // kill switch
        document.addEventListener('fetch-kill', cb_kill = function(e) {

            // kill this fetcher or all fetchers (-1)
            if(e.detail == fetchId || e.detail == -1) {

                //console.log('kill fetcher #'+fetchId);

                xhr.removeEventListener('progress', cb_progress);
                document.removeEventListener('fetch-kill', cb_kill);

                xhr.abort();
                that.fetchers.shift(); // remove oldest fetcher from list
                xhr = null;
                delete xhr;
            }
        }, false);


        // go
        xhr.open('GET', url, true);
        xhr.responseType = 'text';
        xhr.send();
    };




    //****************************************************** start a new fetcher
    that.fetchRun = function() {

        // new id
        var id = ++that.lastFetcherId;

        //console.log('create fetcher #'+id);

        // create fetcher with new id
        var fetchRef = {
            'id'           : id,    // self id
            'queue'        : [],    // internal queue
            'chunksIds'    : [],    // retrieved ids, also used to count
            'hasSuccessor' : false, // keep track of next fetcher spawn
            'ignoreId'     : null,  // when set, ignore chunks until this id is received (this id included)
        };
        that.fetchers.push(fetchRef);

        // run fetcher
        that.fetch(id, url, fetchRef);
    };




    //************************************************ a fetcher returns a chunk
    document.addEventListener('chunk-ready', function(e) {

        // shorthand
        var f = e.detail;

        // ignore flag is not set, process chunk
        if(f.fetchRef.ignoreId == null) {

            // store chunk id
            f.fetchRef.chunksIds.push(f.chunkId);

            // create queue item
            var queueItem = {'id':f.chunkId, 'data':f.chunkData};

            // chunk is received from oldest fetcher
            if(f.fetchId == that.fetchers[0].id) {

                // send to main queue
                that.queue.push(queueItem);

                // signal queue insertion
                that.fire('queue-new');
            }
            // not oldest fetcher
            else {

                // use fetcher internal queue
                f.fetchRef.queue.push(queueItem);
            }
        }
        // ignore flag is set, current chunk id the one to ignore
        else if(f.fetchRef.ignoreId == f.chunkId) {

            // disable ignore flag
            f.fetchRef.ignoreId = null;
        }







        //******************** check chunks count for fetcher, threshold reached
        if(f.fetchRef.chunksIds.length >= chunkMax && !f.fetchRef.hasSuccessor) {

            // remember the spawn
            f.fetchRef.hasSuccessor = true;

            // spawn new fetcher
            that.fetchRun();
        }




        /***********************************************************************
        check if the first chunk of the second oldest fetcher exists in the
        oldest fetcher.
        If true, then they overlap and we can kill the oldest fetcher
        ***********************************************************************/
        if(
            // is this the oldest fetcher ?
            f.fetchId == that.fetchers[0].id
            // is there a successor ?
            && that.fetchers[1]
            // has oldest fetcher received the first chunk of its successor ?
            && that.fetchers[0].chunksIds.indexOf(
                that.fetchers[1].chunksIds[0]
            ) > -1
        ) {

            // get index of last chunk of the oldest fetcher within successor queue
            var lastChunkId    = that.fetchers[0].chunksIds[that.fetchers[0].chunksIds.length-1]
            var lastChunkIndex = that.fetchers[1].chunksIds.indexOf(lastChunkId);

            // successor has not reached its parent last chunk
            if(lastChunkIndex < 0) {

                // discard whole queue
                that.fetchers[1].queue     = [];
                that.fetchers[1].chunksIds = [];

                // set ignore id in successor to future discard duplicates
                that.fetchers[1].ignoreId = lastChunkId;
            }
            // there is overlap
            else {

                /**
                console.log('triming queue start: '+that.fetchers[1].queue.length
                    +"   "+(lastChunkIndex+1)
                    +"   "+(that.fetchers[1].queue.length-1)
                );
                /**/
                var trimStart = lastChunkIndex+1;
                var trimEnd   = that.fetchers[1].queue.length-1;

                // trim queue
                that.fetchers[1].queue = that.fetchers[1].queue.splice(trimStart, trimEnd);
                that.fetchers[1].chunksIds = that.fetchers[1].chunksIds.splice(trimStart, trimEnd);

                //console.log('triming queue end: '+that.fetchers[1].queue.length);
            }

            // kill oldest fetcher
            that.fire('fetch-kill', that.fetchers[0].id);
        }





    }, false);




    //***************************************************** main queue processor
    document.addEventListener('queue-new', function(e) {

        // process chunks in queue
        while(that.queue.length > 0) {

            // get chunk and remove from queue
            var chunk = that.queue.shift();

            // output item to document
            if(that.output) {
                that.output.innerHTML += "<br />"+chunk.data;
            }
        }
    }, false);



    //****************************************************** start first fetcher
    that.fetchRun();
};


// run
var process = new listen('http://localhost:5500/streamRandom', 20);

// bind global kill switch to button
document.getElementById('stop').addEventListener('click', process.stop, false);

        </script>
    </body>
</html>

这篇关于使用XMLHttpRequest进行内存高效的消息块处理的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆