PostgreSQL - 实现一个可靠的队列 [英] PostgreSQL - implementing a reliable queue

查看:71
本文介绍了PostgreSQL - 实现一个可靠的队列的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试使用 postgres 数据库实现一个具有多个写入器和多个读取器的可靠队列.当队列读取器扫描表并在读取后提交正在进行的事务时如何避免丢失行.

I am trying to implement a reliable queue with multiple writers and a multiple readers using postgres database. How to avoid missing rows when a queue reader scans a table then in-progress transactions commit after it reads.

我们有一个读取器使用检查点"时间分批选择行,其中每个批次都获取上一个批次中最后一个时间戳之后的行,而我们缺少行.(原因:时间戳值基于插入发生的时间(00.00.00).在重负载时,如果事务需要更长的时间,它会被插入,比如说 10 秒后(00.00.10),读者将错过这一行(row1) 如果它在那 10 秒内读取并找到其 INSERT 时间比 row1 晚 (00.00.05) 的行.问题的完整描述与此博客中所写的类似.http://blog.thefourthparty.com/stopping-time-in-postgresql/)

We have a reader selecting rows in batches using a "checkpoint" time, where each batch gets the rows after the last timestamp in the previous batch, and we are missing rows. (Reason: The timestamp value is based on the time INSERT happens(00.00.00). At heavy loads, if the transaction takes longer time, it gets inserted let say 10 sec later(00.00.10), the reader will miss this row(row1) if it reads during that 10 seconds and finds a row which had its INSERT time at a later time(00.00.05) than row1. The complete description of the problem is similar to the one written in this blog. http://blog.thefourthparty.com/stopping-time-in-postgresql/)

上下文相关的先前问题:Postgres LISTEN/NOTIFY - 低延迟,实时?

Related prior question for context: Postgres LISTEN/NOTIFY - low latency, realtime?

更新:我已将问题从单个读者更新为多个读者.读者阅读的顺序很重要.

Update: I had updated the question from having single reader to multiple readers. Order in which the reader reads does matter.

推荐答案

考虑到多个阅读器,有必要控制每个阅读器已经收到了哪些记录.

Considering multiple readers, it is necessary to have control on which records each reader had received already.

另外,据说顺序也是将记录发送给读者的条件.因此,如果在前一个事务之前提交了一些进一步的事务,我们必须停止"并在它提交后再次发送记录,以保持发送给读取器的记录的顺序.

Also, it's been said the order is a condition to send records to a reader as well. So, if some further transaction had committed before a earlier one, we have to "stop" and just send records again when it had committed, to maintain the order of records sent to the reader.

也就是说,检查实现:

-- lets create our queue table 
drop table if exists queue_records cascade;
create table if not exists queue_records 
(
  cod serial primary key,
  date_posted timestamp default timeofday()::timestamp,
  message text
);


-- lets create a table to save "checkpoints" per reader_id
drop table if exists queue_reader_checkpoint cascade;
create table if not exists queue_reader_checkpoint 
(
  reader_id text primary key,
  last_checkpoint numeric
);



CREATE OR REPLACE FUNCTION get_queue_records(pREADER_ID text)
RETURNS SETOF queue_records AS
$BODY$
DECLARE
    vLAST_CHECKPOINT    numeric;
    vCHECKPOINT_EXISTS  integer;
    vRECORD         queue_records%rowtype;
BEGIN

    -- let's get the last record sent to the reader 
    SELECT  last_checkpoint
    INTO    vLAST_CHECKPOINT
    FROM    queue_reader_checkpoint
    WHERE   reader_id = pREADER_ID;

    -- if vLAST_CHECKPOINT is null (this is the very first time of reader_id), 
    -- sets it to the last cod from queue. It means that reader will get records from now on.
    if (vLAST_CHECKPOINT is null) then
        -- sets the flag indicating the reader does not have any checkpoint recorded
        vCHECKPOINT_EXISTS = 0;
        -- gets the very last commited record
        SELECT  MAX(cod)
        INTO    vLAST_CHECKPOINT
        FROM    queue_records;
    else
        -- sets the flag indicating the reader already have a checkpoint recorded
        vCHECKPOINT_EXISTS = 1; 
    end if;

    -- now let's get the records from the queue one-by-one 
    FOR vRECORD IN 
            SELECT  *
            FROM    queue_records
            WHERE   COD > vLAST_CHECKPOINT 
            ORDER   BY COD
    LOOP

        -- if next record IS EQUALS to (vLAST_CHECKPOINT+1), the record is in the expected order
        if (vRECORD.COD = (vLAST_CHECKPOINT+1)) then

            -- let's save the last record read
            vLAST_CHECKPOINT = vRECORD.COD;

            -- and return it
            RETURN NEXT vRECORD;

        -- but, if it is not, then is out of order
        else
            -- the reason is some transaction did not commit yet, but there's another further transaction that alread did.
            -- so we must stop sending records to the reader. And probably next time he calls, the transaction will have committed already;
            exit;
        end if;
    END LOOP;


    -- now we have to persist the last record read to be retrieved on next call
    if (vCHECKPOINT_EXISTS = 0) then
        INSERT INTO queue_reader_checkpoint (reader_id, last_checkpoint) values (pREADER_ID, vLAST_CHECKPOINT);
    else        
        UPDATE queue_reader_checkpoint SET last_checkpoint = vLAST_CHECKPOINT where reader_id = pREADER_ID;
    end if; 
end;
$BODY$ LANGUAGE plpgsql VOLATILE;

这篇关于PostgreSQL - 实现一个可靠的队列的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆