在单一线程使用循环缓冲器的可能性 [英] Possibility of using circular buffer in a single thread

查看:189
本文介绍了在单一线程使用循环缓冲器的可能性的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个UDP线程它通过不同的复用流的​​recvmmsg系统调用读取多个数据包,并推动他们在不同的循环/环形缓冲区。这些环形缓冲区流结构的一部分。每个流发送语音帧每20ms。因此,UDP数据包可能是这样的:F1S1 F1S2 F1S3 F2S1等或在突发它可能是这样的情况:F1S1 F2S1 F3S1 F1S2等。酒会结束后,这些数据包将并行由上的 ITP 。 UDP的线程分派这些parllel任务非常久远来处理数据包的列表。这里的限制是必须的任务不是来自同一个数据流并行处理的两帧和任务,必须有帧处理自己独立的内存。所以,我需要确保FIFO执行这些帧的顺序,这将UDP的线程中之前,我产卵这些任务来完成。目前,当我收到报文后,我查找流ID,然后将帧,其中是一个FOR_LOOP流Strctures的一部分循环缓冲器。

下面是code这说明什么在UDP线程回事。

 而(!(* thread_stop))
    {
        INT nr_datagrams =的recvmmsg(socket_handle-> fd_udp,数据报VLEN,0,
                空值);
        .....
        的for(int i = 0; I< nr_datagrams;我++)
        {
        ....
            hash_table_search(codec_buffers,_stream_id,(无效**)及codecPtr))
        .....
            //循环缓冲的语音帧
            //存储为最新的数据包进入的序列号
            codecPtr-> circBuff.seqNum [codecPtr-> circBuff.newestIdx] = _seq_num;            //更新条目指针指向最新的帧
            codecPtr-> circBuff.entries = codecPtr-> circBuff.entries
                    + codecPtr-> circBuff.newestIdx * codecPtr->帧长;
            //帧的内容复制在进入缓冲
            的memcpy(codecPtr-> circBuff.entries,
                    2 * sizeof的(uint16_t)+ datagramBuff [I]
                    codecPtr->帧长);
            //更新最新指数
            codecPtr-> circBuff.newestIdx =
                    (codecPtr-> circBuff.newestIdx + 1)及
                    codecPtr-> circBuffSize;
          }

我的节目现在应该弹出从最近接收到的数据,但不是最新的数据,因为所有的最近接收到的分组可能属于相同流中的脉冲串的情况下,不同的数据流的环路缓冲器的帧。现在我应该怎么往前走是我现在面临的窘境?


解决方案

(这是一个完全重写OP澄清与其他细节问题后)。

我会建议使用一个无序缓冲区接收到的报文中,流标识符,流计数器,并接收每个计数器;并为每个流的最新出动计数器:

 的#define _GNU_SOURCE
#包括LT&;&stdlib.h中GT;
#包括LT&;&unistd.h中GT;
#包括LT&;&limits.h中GT;
#包括LT&; SYS / types.h中>
#包括LT&; SYS / socket.h中>
#包括LT&; netinet / in.h中>/ *缓冲的数据包的最大数量* /
#定义MAX_DATAGRAMS 16/ *每个数据报的最大大小* /
#定义MAX_DATAGRAM_SIZE 4096/ *流的最大数量* /
#定义MAX_STREAMS 4typedef结构{
    INT流; / * -1无* /
    无符号整型计数器; / *每流计数* /
    无符号整型秩序; / *全球反恐战略* /
    unsigned int类型的大小; / *数据的数据的字节[] * /
    CHAR数据[MAX_DATAGRAM_SIZE]
}数据报;
无效的过程(const int的socketfd)
{
    / *每流计数器最新消息派遣* /
    unsigned int类型newest_dispatched [MAX_STREAMS] = {} 0U;    / *包缓冲器* /
    数据报缓冲区[MAX_DATAGRAMS]    / *发件人IPv4地址* /
    从[MAX_DATAGRAMS]结构SOCKADDR_IN;    / *向量指数据包缓冲区。数据成员* /
    结构iovec的IOV [MAX_DATAGRAMS]    / *消息头* /
    结构mmsghdr HDR [MAX_DATAGRAMS]    / *缓冲区指数; HDR [I]中,IOV [I],由[I]都指
     *缓冲[BUF [I]。 * /
    unsigned int类型BUF [MAX_DATAGRAMS]    / *临时数组,指示该缓冲区包含
     *被分派为每个数据流*下一个数据报/
    接下来INT [MAX_STREAMS]    / *接收计数器(特定流无关)* /
    无符号整型为了= 0U;    INT I,N;    / *标记所有未用的缓冲区。 * /
    对于(i = 0; I< MAX_DATAGRAMS;我++){
        缓冲器[Ⅰ] .stream = -1;
        缓冲[I] .size = 0U;
    }    / *清流调度台。 * /
    对于(i = 0; I< MAX_STREAMS;我++)
        newest_dispatched [I] = 0U;    而(1){        / *丢弃数据包接收太离谱了秩序。 * /
        对于(i = 0; I< MAX_DATAGRAMS;我++)
            如果(缓冲[I] .stream> = 0)
                如果(缓冲[Ⅰ] .counter - newest_dispatched [缓冲液[I] .stream]≥= UINT_MAX / 2){
                    / *无论是在过去,或太遥远的未来* /
                    缓冲器[Ⅰ] .stream = -1;
                    缓冲[I] .size = 0U;
                }        / * prepare接收新邮件。
         *流-1表示未使用/处理的消息。 * /
        为(N = 0,I = 0; I&下; MAX_DATAGRAMS;我+ +)
            如果(缓冲[I] .stream == -1){                / * $ P $页缓冲区。 * /
                缓冲器[Ⅰ] .stream = -1;
                缓冲[I] .counter = 0U;
                缓冲[I] .order =订单+ N;
                缓冲[I] .size = 0U;                / *本地指数n是指缓冲区我。 * /
                BUF [N] =我;                / *本地指数n是指缓冲I数据。 * /
                IOV [N] .iov_base =缓冲[I]。数据;
                IOV [N] .iov_len = sizeof的缓冲[I]。数据;                / *清除接收的字节计数器。 * /
                HDR [N] .msg_len = 0U;                / *源地址从[]数组。 * /
                HDR +从[N] .msg_hdr.msg_name = I;
                HDR [N] .msg_hdr.msg_namelen = sizeof的距离[I];                / *每个IOV有效载荷[N]。 * /
                HDR [N] .msg_hdr.msg_iov = IOV + N;
                HDR [N] .msg_hdr.msg_iovlen = 1;                / *无辅助数据。 * /
                HDR [N] .msg_hdr.msg_control = NULL;
                HDR [N] .msg_hdr.msg_controllen = 0;                / *清除收到邮件标志* /
                HDR [N] .msg_hdr.msg_flags = 0;                / * prepared多了一个HDR [],在[],IOV [],BUF []。 * /
                Ñ​​++;
            }        如果(N。1){
            / *缓冲区已满。发现最早接收数据报。 * /
            unsigned int类型MAX_AGE = 0U;
            诠释最老= -1;            对于(i = 0; I< MAX_DATAGRAMS;我++){
                const的无符号整型年龄=顺序 - 缓冲[I] .order;
                如果(年龄> = MAX_AGE){
                    MAX_AGE =年龄;
                    最古老= I;
                }
            }            / * TODO:派遣最早接收数据报:
             *流缓冲[最古老] .stream
             *数据缓冲[最古老]。数据,缓冲[历史最悠久] .size字节
            * /            / *更新流计数器。 * /
            newest_dispatched [缓冲[最古老] .stream] =缓冲[最古老] .counter;            / *删除缓冲区。 * /
            缓冲液[最老] .stream = -1;
            缓冲液[最老] .size = 0;            / *需要更多的数据报。 * /
            继续;
        }        N =的recvmmsg(socketfd,HDR,N,O,NULL);
        如果(N。1){
            / * TODO:检查是否存在错误。 * /
            继续;
        }        / *为每个收到的消息更新缓冲区描述。 * /
        对于(i = 0; I< N;我++){
            const int的B = BUF [I]            缓冲[B] .order =秩序; / *已经设置,其实* /
            缓冲[B] .size = HDR [I] .msg_len;            / * TODO:确定流和计数器,
             *从[i]和缓冲并[b]的.data根据。
             *这赋予它们在循环方式。 * /
            缓冲[B] .stream =命令%MAX_STREAMS;
            缓冲[B] .counter =订单/ MAX_STREAMS;            / *账户接收到的信息。 * /
            为了++;
        }    而(1){            / *清除下的被派遣索引列表。 * /
            对于(i = 0; I< MAX_STREAMS;我++)
                接下来的[I] = -1;            / *查找下一个消息被发送。 * /
            对于(i = 0; I< MAX_DATAGRAMS;我++)
                如果(缓冲[I] .stream> = 0&放大器;&安培;缓冲区[I] .counter == newest_dispatched [缓冲[I] .stream] + 1U)
                    接下来[缓冲[I] .stream] =我;            / *注:这是一个点,你会希望
             *确保所有未决的调度是否齐全,
             *发行新的之前。 * /            / *计数(n)和调度的消息。 * /
            为(N = 0,I = 0; I&下; MAX_STREAMS;我+ +)
                如果(下一[I]!= -1){
                    const int的B =下一个[I]
                    const int的S =缓冲[B] .stream;                    / * TODO:调度缓冲器B,流s。 * /                    / *更新调度台。 * /
                    newest_dispatched [S] ++;
                    Ñ​​++;
                }            / *不出动? * /
            如果(N。1)
                打破;            / *移除从缓冲区派遣的消息。还删除重复。 * /
            对于(i = 0; I< MAX_DATAGRAMS;我++)
                如果(缓冲[I] .stream> = 0&放大器;&安培;缓冲区[I] .counter == newest_dispatched [缓冲[I] .stream]){
                    缓冲器[Ⅰ] .stream = -1;
                    缓冲[I] .size = 0U;
                }        }
    }
}

请注意,我省略了,你应该等待派出条消息的点(因为有多个选项,这取决于你如何分配以及是否你想在同一时间做工作)。此外,该code仅编译测试,所以它可能包含逻辑错误。

的环结构是如下所示:


  1. 放弃缓冲是消息无论是在过去,还是将来太远,是有益的。

    该计数器是循环的。我说这里柜台包装逻辑的描述。


  2. 为构建的recvmmsg)头文件(每个空闲缓冲区插槽。


  3. 如果有免费的没有缓冲槽,发现和调度或丢弃最古老的一个,从第1步重复。


  4. 接收一个或多个消息。


  5. 根据所接收的消息,更新缓冲器槽

    主要的一点是确定流,流计数器,和接收到的消息中的字节数。


  6. 调度循环。

    这是一个循环,因为如果我们收到的信息失灵,但后来完成这些,我们将需要一次派遣一组以上的消息。

    在循环中,流标数组(下一[] )将首先被清零。

    然后,我们检查的缓冲区是下一个要被派遣的消息。对于这一点,我们需要每个流的计数器。这是在一个独立的步骤完成,在情况下,我们曾收到重复UDP数据报

    如果没有流有他们的下一个消息已经缓存,我们退出该循环,并等待新的数据包到达。

    的消息被下一个调度。循环调度为每个数据流的至多一个消息。

    调度后,我们取出出动的消息。而不是遍历每个流和删除对应的那一个,我们遍历整个缓冲区缓存,让我们赶上重复UDP消息了。


请注意,缓冲区未在上述顺序复制的。

如果消息是COM pressed或uncom pressed音频,你需要pssed音频流uncom $ P $附加(循环)缓冲区。拥有所有UDP报文的共享无序缓冲区有,你总是可以选择哪个音频流推进下一个(如果您收到该数据报),以及不小心花那么多时间推动一个流,其他流可能会用完数据的好处,导致音频故障。

对于每个音频流的循环缓冲区的大小应该是一个数据报的最大尺寸的至少三倍。这允许您使用包装逻辑与(((后来%的上限)+ LIMIT - (先前%LIMIT))%的限制,与结果> LIMIT / 2 表示相反的顺序),对每个样品,甚至在播放/ DECOM pression追加新数据。 (分派更新一个索引,音频播放其他的。只要确保他们在原子访问。)放大音频流缓冲可能会导致较大的延迟。

在摘要,假定音频流多路分解和调度是在另一方面,也有要使用的两个完全独立的缓冲器结构。对于UDP数据包,使用一个无序的缓冲槽。缓冲区插槽需要一点簿记(如上述code所示),但它是pretty简单派遣他们为了一些不同的数据流。每个音频流确实需要的环状缓冲液(至少三次(DECOM pressed)数据报的最大尺寸),尽管

不幸的是,我没有看到这里使用独立的任务并行(如羊毛C库)。

在实际上,这可能是更简单的,以每流添加一个结构来描述DECOM pressor状态,并根据该优先化它们环状音频缓冲器具有留下的至少缓冲数据。典型DECOM pressors报告,如果他们需要更多的数据,从而增加每个流的临时工作区(两个COM pressed数据报),将允许DECOM pressor消耗整个数据包,但只有当绝对复制内存必要的。


编辑补充有关循环缓冲区细节:

有跟踪循环缓冲区的状态的两种主要方法,加上第三,衍生方法我怀疑这里可能是有用的:


  1. 使用单独的索引添加(),并删除()的数据

    如果有一个生产者和一个消费者,循环缓冲区可以locklessly维持,作为制片人只能增加,居民消费增量

    缓冲区为空时,头==尾。如果缓冲区有尺寸项,头=头%SIZE 尾=尾部%大小,则有(头​​+ SIZE - 尾)。%的大小缓存条目

    缺点是,一个简单的实现将永远在缓冲区至少有一个自由进入,因为上面简单的模运算不能用全部和无条目区分。有一个稍微复杂的code的解决方法。

    在简单的情况下,缓冲区有尺寸 - 1 - (头+ SIZE - 尾)。%的大小免费条目


  2. 缓冲数据开始在指数启动,并用长度输入缓冲。

    缓冲区内容总是在存储器或连续,或在存储器分成两部分(与缓冲空间结束时为止的第一部分,和第二部分开始在缓冲器空间的开始)。生产者和消费者需要修改这两个启动长度,所以无锁的使用需要一个比较和交换原子操作(而且通常都打包成一个整数)。

    在任何时候,有长度项中使用,而尺寸 - 长条目在循环缓冲器免费。

    当制片人追加 N 数据条目,它复制从索引数据(起始+长度)%SIZE ,最后在指数(起始+长度+ N - 1)%SIZE 和增量长度 N 。如所提到的,该数据被复制可能是连续的,或分成两部分。

    当消费者消耗 N 数据条目,它复制从索引数据开始于指数,最后报名(启动+ N)%SIZE 和更新开始=(启动+ N)%的大小; 长度=长度 - N; 。再次,所消耗的数据可能会被分成两部分在存储器(如果它会否则跨越缓冲区的末尾)。


  3. 衍生物。

    如果只有一个生产者线程/任务,一个消费者,我们可以增加一倍的缓冲区状态变量,以允许从缓冲区中添加​​或使用的数据的异步的,通过DMA或异步I / O操作。


    1. 使用 head_pending tail_pending 指标

      头!= head_pending ,从 head_pending-1中的数据,包容,被消耗。在完成后,消费者套 =头上%head_pending SIZE

      尾巴!= tail_pending ,有更多​​的数据以指数添加到 tail_pending-1 ,包容性。当传输完成后,制片组尾= tail_pending%SIZE

      请注意,使用的DMA时,通常最好在存储器的连续块来工作。在微控制器,它通常使用一个中断下DMA'ble块装入DMA寄存器,在这种情况下,你确实有 head_pending head_next tail_pending tail_next ,与选择,使你最终不会DMA'ing分割点附近很短块的每个DMA'd块的大小(在缓冲区的物理结束),但保持中断速率可以接受的。

      在任何时候,有(头​​+ SIZE - 尾)%SIZE 项present在可消耗的缓冲区。使用简单的模运算在缓冲器的至少一个条目总是未使用的,这样,可加入表项的最大数目是尺寸 - 1 - (头+ SIZE - 尾)%尺寸


    2. 使用启动长度传入传出

      在这里,启动长度必须自动修改,因此对方将不能够观察老启动新的长度,反之亦然。这可以locklessly来完成,如上所述,但必须小心,因为这是问题的常见来源

      在任何时候,该缓冲区包含长度条目,以被添加进来项(在(起始+长度)%SIZE (起始+长度+来电 - 1)%SIZE ,包容性,如果传入大于0 )和消耗传出项(在启动(开始传出+ - 1)%SIZE ,包容性,如果即将离任的方式> 0

      当来电转移完成后,制片增量长度传入

      当传出转让完成后,消费者更新开始=(启动+发送)%SIZE 长度=长度 - 传出



至于原子操作:

支持C11

C编译器提供一个家庭的,可用于以原子更新上述变量原子功能。使用弱版本允许跨越不同类型的硬件最大兼容性。
对于启动长度

  uint64_t中buffer_state; / *实际原子变量* /
    uint64_t中OLD_STATE; / *临时变量* /    temp_state = atomic_load(安培; buffer_state);
    做{
        uint32_t的开始= temp_state>> 32;
        uint32_t的长度=(uint32_t的)temp_state;
        uint64_t中NEW_STATE;        / *更新的开始和长度必要的* /        NEW_STATE =(uint64_t中)长度| ((uint64_t中)的状态下与;< 32);
    }而(atomic_compare_exchange_weak(安培;!buffer_state,&安培; OLD_STATE,NEW_STATE));

有关递增一些缓冲状态变量状态金额与缓冲区大小为尺寸,假设所有的类型为size_t 的:

 为size_t老; / *临时变量* /    老= atomic_load(安培状态)%的大小;
    做{
        size_t型新=(旧+量)%的大小;
    }而(atomic_compare_exchange_weak(安培;!状态,和放大器;旧的,新));

请注意,如果 atomic_compare_exchange_weak()失败,它将状态的当前值复制到。这就是为什么只需要一个初始原子负载

许多C编译器提供pre-C11原子内置插件是不是标准,只是常见的扩展名的C编译器都提供。例如,启动长度可以用原子进行修改

  uint64_t中buffer_state; / *实际原子变量* /
    uint64_t中OLD_STATE,NEW_STATE; / *临时变量* /    做{
        uint32_t的开始,长度;        OLD_STATE = buffer_state; / *非原子访问* /        开始= OLD_STATE>> 32;
        长度=(uint32_t的)OLD_STATE;        / *更新的开始和/或长度* /        NEW_STATE =(uint64_t中)长度| ((uint64_t中)开始<< 32);
    }而(!__ sync_bool_compare_and_swap(安培; buffer_state,OLD_STATE,NEW_STATE));

要增加一些缓冲状态变量状态金额许多pre-C11编译器,以缓冲大小为尺寸,假设所有的数据类型为为size_t ,你可以使用:

 为size_t OLD_STATE,NEW_STATE; / *临时变量* /    做{
        OLD_STATE =状态;
        NEW_STATE =(OLD_STATE +量)%的大小;
    }而(__ sync_bool_compare_and_swap(安培;!状态,OLD_STATE,NEW_STATE));

所有这些原子公司基本上是旋转,直到修改成功原子。虽然这看起来就两个或多个并发内核可以打不休,当前高速缓存架构是这样的一个核心将永远是赢家(第一个)。因此,在实践中,只要每个核心拥有一些其他的工作,执行这样的原子更新循环的一个之间的事,这些都将工作得很好。 (而且,事实上,在无锁C $ C $Ç无处不在。)

我想提的最后一个部分是允许的数据包的部分调度。这基本上意味着,每个数据报缓冲槽已经不仅仅是尺寸(表示字节该插槽的数量),也启动。当接收到一个新的数据报,启动设置为零。如果一个数据包不能被分派(复制到每个流缓冲)完全,缓冲槽启动尺寸更新,但流调度计数器不会增加。这样一来,在下一回合,此数据报的剩余部分被缓冲

我可以写一出完整的例子我怎么会DECOM preSS从无序的数据包缓冲传入数据包分成几个流,利用我在previous不久段中提到的部分缓冲数据报方案,但具体实现在很大程度上依赖于编程接口DECOM pressor库了。

在特别的,我个人preFER例如中使用的接口POSIX 的iconv() 功能 - 但也许返回转换的字符数的状态code代替。各种音频和语音库具有不同的接口,它甚至可能无法将它们转换成这样的接口。 (从不同的舞台上例如,大多数SSL /保护socket通信TLS库没有这样的接口,因为他们总是希望套接字描述直接直接进入;这使得单线程多座异步SSL / TLS实现难嗯,更像是从头开始写,如果你想它。)对于DECOM pressed数据的音频分析,说使用的 FFTW 库变换(或DCT,或哈特利,或优库进行,尤其是当该优化智慧在该窗口的尺寸是可改变的其它变​​换中的一个),所述DECOM pressed数据通常需要在固定大小的块。这也将影响实际的实现。

I have a single UDP thread which reads multiple datagrams through recvmmsg system call from different multiplexed streams and pushes them in different circular/ring buffers. These ring buffers are part of Stream structures. Each stream is sending a speech frame every 20ms. So the UDP packets might look like this: F1S1 F1S2 F1S3 F2S1 and so on OR in case of a burst it might look like this: F1S1 F2S1 F3S1 F1S2 and so on. After the reception, these packets will be processed in parallel by a library which works on the principle of ITP. The UDP thread has to dispatch these parllel tasks alongwith the list of the packets to process. Limitation here is that Tasks must not process two frames in parallel from the SAME stream AND Tasks MUST have their own independent memory for frame handling. So I need to make sure an order of FIFO execution for these frames and this will be done within the UDP thread before I spawn these tasks. Currently, when I receive these packets, I lookup the streamId and place the frame in a circular buffer which is part of Stream Strctures with a for_loop.

Here is the code which shows whats going on in the UDP thread.

while (!(*thread_stop))
    {
        int nr_datagrams = recvmmsg(socket_handle->fd_udp, datagramS, VLEN, 0,
                NULL);
        .....
        for (int i = 0; i < nr_datagrams; i++)
        {
        ....
            hash_table_search(codec_buffers, _stream_id, (void **) &codecPtr))
        .....
            // Circular Buffer for speech frames
            // store the incoming sequence number for the newest packet
            codecPtr->circBuff.seqNum[codecPtr->circBuff.newestIdx] = _seq_num;

            // Update the entry pointer to point to the newest frame
            codecPtr->circBuff.entries = codecPtr->circBuff.entries
                    + codecPtr->circBuff.newestIdx * codecPtr->frameLength;
            // Copy the contents of the frame in entry buffer
            memcpy(codecPtr->circBuff.entries,
                    2 * sizeof(uint16_t) + datagramBuff[i],
                    codecPtr->frameLength);
            // Update the newest Index
            codecPtr->circBuff.newestIdx =
                    (codecPtr->circBuff.newestIdx + 1) &
                    codecPtr->circBuffSize;
          }

My program should now pop the frames from the ring buffers of different streams which has recently received the data but NOT the latest data as all recently received packets might belong to the same stream in case of a burst. Now how should I go forward is the dilemma I am facing?

解决方案

(This is a complete rewrite after OP clarified the question with additional details.)

I would suggest using an unordered buffer for the received datagrams, with a stream identifier, stream counter, and receive counter for each; and a latest dispatched counter for each stream:

#define _GNU_SOURCE
#include <stdlib.h>
#include <unistd.h>
#include <limits.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>

/* Maximum number of buffered datagrams */
#define  MAX_DATAGRAMS  16

/* Maximum size of each datagram */
#define  MAX_DATAGRAM_SIZE  4096

/* Maximum number of streams */
#define  MAX_STREAMS  4

typedef struct {
    int             stream;                     /* -1 for none */
    unsigned int    counter;                    /* Per stream counter */
    unsigned int    order;                      /* Global counter */
    unsigned int    size;                       /* Bytes of data in data[] */
    char            data[MAX_DATAGRAM_SIZE];
} datagram;


void process(const int socketfd)
{
    /* Per-stream counters for latest dispatched message */
    unsigned int newest_dispatched[MAX_STREAMS] = { 0U };

    /* Packet buffer */
    datagram     buffer[MAX_DATAGRAMS];

    /* Sender IPv4 addresses */
    struct sockaddr_in  from[MAX_DATAGRAMS];

    /* Vectors to refer to packet buffer .data member */
    struct iovec        iov[MAX_DATAGRAMS];

    /* Message headers */
    struct mmsghdr      hdr[MAX_DATAGRAMS];

    /* Buffer index; hdr[i], iov[i], from[i] all refer
     * to buffer[buf[i]]. */
    unsigned int        buf[MAX_DATAGRAMS];

    /* Temporary array indicating which buffer contains
     * the next datagram to be dispatched for each stream */
    int                 next[MAX_STREAMS];

    /* Receive counter (not stream specific) */
    unsigned int        order = 0U;

    int                 i, n;

    /* Mark all buffers unused. */
    for (i = 0; i < MAX_DATAGRAMS; i++) {
        buffer[i].stream = -1;
        buffer[i].size = 0U;
    }

    /* Clear stream dispatch counters. */
    for (i = 0; i < MAX_STREAMS; i++)
        newest_dispatched[i] = 0U;

    while (1) {

        /* Discard datagrams received too much out of order. */
        for (i = 0; i < MAX_DATAGRAMS; i++)
            if (buffer[i].stream >= 0)
                if (buffer[i].counter - newest_dispatched[buffer[i].stream] >= UINT_MAX/2) {
                    /* Either in the past, or too far into the future */
                    buffer[i].stream = -1;
                    buffer[i].size = 0U;
                }

        /* Prepare for receiving new messages.
         * Stream -1 indicates unused/processed message. */
        for (n = 0, i = 0; i < MAX_DATAGRAMS; i++)
            if (buffer[i].stream == -1) {

                /* Prep the buffer. */
                buffer[i].stream = -1;
                buffer[i].counter = 0U;
                buffer[i].order = order + n;
                buffer[i].size = 0U;

                /* Local index n refers to buffer i. */
                buf[n] = i;

                /* Local index n refers to buffer i data. */
                iov[n].iov_base = buffer[i].data;
                iov[n].iov_len  = sizeof buffer[i].data;

                /* Clear received bytes counter. */
                hdr[n].msg_len = 0U;

                /* Source address to from[] array. */
                hdr[n].msg_hdr.msg_name = from + i;
                hdr[n].msg_hdr.msg_namelen = sizeof from[i];

                /* Payload per iov[n]. */
                hdr[n].msg_hdr.msg_iov = iov + n;
                hdr[n].msg_hdr.msg_iovlen = 1;

                /* No ancillary data. */
                hdr[n].msg_hdr.msg_control = NULL;
                hdr[n].msg_hdr.msg_controllen = 0;

                /* Clear received message flags */
                hdr[n].msg_hdr.msg_flags = 0;

                /* Prepared one more hdr[], from[], iov[], buf[]. */
                n++;
            }

        if (n < 1) {
            /* Buffer is full. Find oldest received datagram. */
            unsigned int max_age = 0U;
            int          oldest = -1;

            for (i = 0; i < MAX_DATAGRAMS; i++) {
                const unsigned int age = order - buffer[i].order;
                if (age >= max_age) {
                    max_age = age;
                    oldest = i;
                }
            }

            /* TODO: Dispatch the oldest received datagram:
             * Stream  buffer[oldest].stream
             * Data    buffer[oldest].data, buffer[oldest].size bytes
            */

            /* Update stream counters. */
            newest_dispatched[buffer[oldest].stream] = buffer[oldest].counter;             

            /* Remove buffer. */
            buffer[oldest].stream = -1;
            buffer[oldest].size   =  0;

            /* Need more datagrams. */
            continue;
        }

        n = recvmmsg(socketfd, hdr, n, 0, NULL);
        if (n < 1) {
            /* TODO: Check for errors. */
            continue;
        }

        /* Update buffer description for each received message. */
        for (i = 0; i < n; i++) {
            const int b = buf[i];

            buffer[b].order = order;          /* Already set, actually */
            buffer[b].size  = hdr[i].msg_len;

            /* TODO: determine stream and counter,
             *       based on from[i] and buffer[b].data.
             *       This assigns them in round-robin fashion. */
            buffer[b].stream  = order % MAX_STREAMS;
            buffer[b].counter = order / MAX_STREAMS;

            /* Account for the message received. */
            order++;
        }

    while (1) {

            /* Clear next-to-be-dispatched index list. */
            for (i = 0; i < MAX_STREAMS; i++)
                next[i] = -1;

            /* Find next messages to be dispatched. */
            for (i = 0; i < MAX_DATAGRAMS; i++)
                if (buffer[i].stream >= 0 && buffer[i].counter == newest_dispatched[buffer[i].stream] + 1U)
                    next[buffer[i].stream] = i;

            /* Note: This is one point where you will wish to 
             *       ensure all pending dispatches are complete,
             *       before issuing new ones. */

            /* Count (n) and dispatch the messages. */
            for (n = 0, i = 0; i < MAX_STREAMS; i++)
                if (next[i] != -1) {
                    const int b = next[i];
                    const int s = buffer[b].stream;

                    /* TODO: Dispatch buffer b, stream s. */

                    /* Update dispatch counters. */
                    newest_dispatched[s]++;
                    n++;
                }

            /* Nothing dispatched? */
            if (n < 1)
                break;

            /* Remove dispatched messages from the buffer. Also remove duplicates. */
            for (i = 0; i < MAX_DATAGRAMS; i++)
                if (buffer[i].stream >= 0 && buffer[i].counter == newest_dispatched[buffer[i].stream]) {
                    buffer[i].stream = -1;
                    buffer[i].size = 0U;
                }

        }
    }
}

Note that I omitted the points where you should wait for dispatched messages to complete (as there are multiple options, depending on how you dispatch and whether you wish to do "work" at the same time). Also, this code is only compile-tested, so it might contain logic bugs.

The loop structure is as follows:

  1. Discard buffered messages that are either in the past, or too far in the future, to be useful.

    The counters are cyclic. I added a description of the counter wrapping logic here.

  2. Construct headers for recvmmsg() for each free buffer slot.

  3. If there are no buffer slots free, find and dispatch or discard the oldest one, and repeat from step 1.

  4. Receive one or more messages.

  5. Based on the received messages, update the buffer slots.

    The main point is to determine the stream, the stream counter, and the number of bytes in the received message.

  6. Dispatch loop.

    This is a loop because if we receive messages out of order, but complete them later, we will need to dispatch more than one set of messages at a time.

    Within the loop, the stream index array (next[]) is cleared first.

    Then, we check the buffer for messages that are to be dispatched next. For this, we need the per-stream counters. This is done in a separate steps, in case we ever receive duplicate UDP datagrams.

    If none of the streams have their next message buffered already, we exit this loop, and wait for new datagrams to arrive.

    The messages are dispatched next. The loop dispatches at most one message for each stream.

    After the dispatch, we remove the dispatched messages. Instead of looping over each stream and removing the buffer corresponding to that one, we loop over the entire buffer, so that we catch duplicated UDP messages, too.

Note that buffers are not copied in the above sequence at all.

If the messages are compressed or uncompressed audio, you do need additional (cyclic) buffers for the uncompressed audio streams. Having a shared unordered buffer for all UDP messages has the benefit that you can always pick which audio stream to advance next (if you have received that datagram), and not accidentally spend so much time advancing one stream that other streams might run out of data, causing an audio glitch.

The size of the cyclic buffer for each audio stream should be at least three times the maximum size of a datagram. This lets you use wrapping logic with (((later % LIMIT) + LIMIT - (earlier % LIMIT)) % LIMIT, with result > LIMIT/2 indicating inverse order) for each sample, and append new data even during playback/decompression. (Dispatcher updates one index, audio playback the other. Just make sure they're accessed atomically.) Larger audio stream buffers may cause larger latencies.

In summary, assuming audio stream demuxing and dispatching is at hand, there are two completely separate buffer structures to be used. For UDP datagrams, an unordered set of buffer slots is used. The buffer slots need a bit of bookkeeping (as shown in above code), but it is pretty simple to dispatch them in order for a number of different streams. Each audio stream does require a cyclic buffer (at least three times the maximum size of a (decompressed) datagram), though.

Unfortunately, I don't see any benefit of using independent task parallelism here (e.g. the wool C library).

In fact, it might be even simpler to add a structure per stream to describe the decompressor state, and prioritize them according to which cyclic audio buffer has the least buffered data left. Typical decompressors report if they need more data, so adding a temporary work area per stream (two compressed datagrams), would allow the decompressor to consume whole packets, but copy memory only when absolutely necessary.


Edited to add details about the circular buffers:

There are two main methods of tracking the state of a circular buffer, plus a third, derivative method I suspect might be useful here:

  1. Using separate indexes for adding (head) and removing (tail) data

    If there is one producer and one consumer, the circular buffer can be maintained locklessly, as the producer only increments head, and the consumer increments tail.

    The buffer is empty when head == tail. If the buffer has SIZE entries, head = head % SIZE, and tail = tail % SIZE, then there are (head + SIZE - tail) % SIZE buffered entries.

    The downside is that a simple implementation will always have at least one free entry in the buffer, because the above simple modular arithmetic cannot distinguish between all and none entries used. There are workarounds at a slightly more complex code.

    In the simple case, the buffer has SIZE - 1 - (head + SIZE - tail) % SIZE free entries.

  2. The buffered data begins at index start, and with length entries buffered.

    The buffer contents are always either consecutive in memory, or split into two parts in memory (with first part ending at the end of buffer space, and the second part starting at the start of the buffer space). Producers and consumers need to modify both start and length, so lockless usage requires a compare-and-swap atomic operation (and usually packing both into one integer).

    At any point, there are length entries used, and size - length entries free in the circular buffer.

    When a producer appends n data entries, it copies the data starting at index (start + length) % SIZE, final at index (start + length + n - 1) % SIZE, and increments length by n. As mentioned, the data to be copied might be consecutive, or split in two parts.

    When a consumer consumes n data entries, it copies the data starting at index start, final entry at index (start + n) % SIZE, and updates start = (start + n) % SIZE; and length = length - n;. Again, the consumed data might be split into two parts in memory (if it would otherwise span the end of the buffer).

  3. The derivatives.

    If there are only one producer thread/task, and one consumer, we can double the buffer state variables to allow data to be added or consumed from the buffer asynchronously, via DMA or async I/O.

    1. Using head, tail, head_pending, and tail_pending indexes

      When head != head_pending, the data from head to head_pending-1, inclusive, is being consumed. At completion, the consumer sets head = head_pending % SIZE.

      When tail != tail_pending, there is more data being added at index tail to tail_pending-1, inclusive. When the transfer completes, the producer sets tail = tail_pending % SIZE.

      Note that when using DMA, it is usually best to work with consecutive chunks in memory. In microcontrollers, it is common to use an interrupt to load the next DMA'ble chunk into the DMA registers, in which case you actually have head, head_pending, and head_next, or tail, tail_pending, and tail_next, with the size of each DMA'd chunk chosen so that you do not end up DMA'ing very short chunks near the split point (at physical end of the buffer), but keeping the interrupt rate acceptable.

      At any point, there are (head + SIZE - tail) % SIZE entries present in the buffer that can be consumed. Using simple modular arithmetic at least one entry in the buffer is always unused, so the maximum number of entries that can be added is SIZE - 1 - (head + SIZE - tail) % SIZE.

    2. Using start, length, incoming, and outgoing

      Here, start and length must be modified atomically, so that the other party will not be able to observe old start with new length or vice versa. This can be done locklessly as mentioned above, but care must be taken, as this is a common source of problems.

      At any point, the buffer contains length entries, with incoming entries being added (at (start + length) % SIZE to (start + length + incoming - 1) % SIZE, inclusive, if incoming > 0), and outgoing entries being consumed (at start to (start + outgoing - 1) % SIZE, inclusive, if outgoing > 0).

      When an incoming transfer completes, the producer increments length by incoming.

      When an outgoing transfer completes, the consumer updates start = (start + outgoing) % SIZE and length = length - outgoing.

As to the atomic handling:

C compilers that support C11 provide a family of atomic functions that can be used to update the above variables atomically. Using the weak version allows maximum compatibility across different types of hardware. For start and length:

    uint64_t buffer_state; /* Actual atomic variable */
    uint64_t old_state;    /* Temporary variable */

    temp_state = atomic_load(&buffer_state);
    do {
        uint32_t start = temp_state >> 32;
        uint32_t length = (uint32_t)temp_state;
        uint64_t new_state;

        /* Update start and length as necessary */

        new_state = (uint64_t)length | ((uint64_t)state << 32);
    } while (!atomic_compare_exchange_weak(&buffer_state, &old_state, new_state));

For incrementing some buffer state variable state by amount, with buffer size being size, assuming all are of type size_t:

    size_t old; /* Temporary variable */

    old = atomic_load(&state) % size;
    do {
        size_t new = (old + amount) % size;
    } while (!atomic_compare_exchange_weak(&state, &old, new));

Note that if the atomic_compare_exchange_weak() fails, it will copy the current value of state to old. That is why only one initial atomic load is needed.

Many C compilers provide pre-C11 atomic built-ins that are not standard, just common extensions many C compilers do provide. For example, start and length can be modified atomically using

    uint64_t buffer_state;         /* Actual atomic variable */
    uint64_t old_state, new_state; /* Temporary variables */

    do {
        uint32_t start, length;

        old_state = buffer_state; /* Non-atomic access */

        start = old_state >> 32;
        length = (uint32_t)old_state;

        /* Update start and/or length */

        new_state = (uint64_t)length | ((uint64_t)start << 32);
    } while (!__sync_bool_compare_and_swap(&buffer_state, old_state, new_state));

To increment some buffer state variable state by amount on many pre-C11 compilers, with buffer size being size, assuming all are of type size_t, you can use:

    size_t old_state, new_state; /* Temporary variables */

    do {
        old_state = state;
        new_state = (old_state + amount) % size;
    } while (!__sync_bool_compare_and_swap(&state, old_state, new_state));

All these atomics essentially spin until a modification succeeds atomically. While it would seem like two or more concurrent cores could fight endlessly, current cache architectures are such that one core will always win (first). So, in practice, as long as each core has some other work to do between executing one of such atomic update loops, these will work just fine. (And are, indeed, ubiquitous in lockless C code.)

The last part I'd like to mention is allowing partial dispatch of datagrams. This basically means that each datagram buffer slot has not just size (indicating the number of bytes in that slot), but also start. When a new datagram is received, start is set to zero. If a datagram cannot be dispatched (copied to per-stream buffer) completely, the buffer slot start and size are updated, but the stream dispatch counter is not incremented. That way, on the next round, the rest of this datagram is buffered.

I could write a complete example showing how I'd decompress incoming datagrams from an unordered datagram buffer into several streams, using the partial buffered datagram scheme I mentioned shortly in the previous paragraph, but the exact implementation depends heavily on the programming interface the decompressor library has.

In particular, I personally prefer the interface used in e.g. POSIX iconv() function -- but perhaps returning a status code instead of the number of characters converted. Various audio and speech libraries have different interfaces, and it may even be impossible to convert them to such interface. (As an example from a different arena, most SSL/TLS libraries for protecting socket communications do not have such an interface, as they always expect direct access to the socket descriptor directly; that makes single-threaded multiple-socket asynchronous SSL/TLS implementation "difficult". Well, more like "write from scratch if you want it".) For audio analysis of decompressed data, say using FFTW library for Fast Fourier transform (or DCT, or Hartley, or one of the other transforms that excellent library performs, especially when optimized wisdom for that transform at that window size is available), the decompressed data is typically needed in fixed-size chunks. That too would affect the exact implementation.

这篇关于在单一线程使用循环缓冲器的可能性的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆