Redis 带锁的分布式增量 [英] Redis distributed increment with locking

查看:39
本文介绍了Redis 带锁的分布式增量的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我需要生成一个计数器,该计数器将发送到某些 api 调用.我的应用程序在多个节点上运行,所以我想如何生成唯一的计数器.我试过以下代码

I have a requirement for generating an counter which will be send to some api calls. My application is running on multiple node so some how I wanted to generate unique counter. I have tried following code

public static long GetTransactionCountForUser(int telcoId)
{
    long valreturn = 0;
    string key = "TelcoId:" + telcoId + ":Sequence";
    if (Muxer != null && Muxer.IsConnected && (Muxer.GetDatabase()) != null)
    {
        IDatabase db = Muxer.GetDatabase();
        var val = db.StringGet(key);
        int maxVal = 999;
        if (Convert.ToInt32(val) < maxVal)
        {
            valreturn = db.StringIncrement(key);
        }
        else
        {
            bool isdone = db.StringSet(key, valreturn);
            //db.SetAdd(key,new RedisValue) .StringIncrement(key, Convert.ToDouble(val))
        }
    }
    return valreturn;
}

并通过任务并行库对其进行运行测试.当我有边界值时,我看到的是设置了多次 0 条目

And run tested it via Task Parallel libray. When I have boundary values what i see is that multiple time 0 entry is set

请告诉我我需要做哪些更正

Please let me know what correction i needed to do

更新:我的最终逻辑如下

Update: My final logic is as following

public static long GetSequenceNumberForTelcoApiCallViaLuaScript(int telcoId)
{
    long valreturn = 0;
    long maxIncrement = 9999;//todo via configuration
    if (true)//todo via configuration
    {
        IDatabase db;
        string key = "TelcoId:" + telcoId + ":SequenceNumber";
        if (Muxer != null && Muxer.IsConnected && (db = Muxer.GetDatabase()) != null)
        {
            valreturn = (long)db.ScriptEvaluate(@"
                local result = redis.call('incr', KEYS[1])
                if result > tonumber(ARGV[1]) then
                result = 1
                redis.call('set', KEYS[1], result)
                end
                return result", new RedisKey[] { key }, flags: CommandFlags.HighPriority, values: new RedisValue[] { maxIncrement });
        }
    }
    return valreturn;
}

推荐答案

确实,您的代码在翻转边界附近并不安全,因为您正在执行获取"、(延迟和思考)、设置" - 没有检查您的获取"中的条件仍然适用.如果服务器在项目 1000 附近忙碌,则有可能获得各种疯狂​​的输出,包括:

Indeed, your code is not safe around the rollover boundary, because you are doing a "get", (latency and thinking), "set" - without checking that the conditions in your "get" still apply. If the server is busy around item 1000 it would be possible to get all sorts of crazy outputs, including things like:

1
2
...
999
1000 // when "get" returns 998, so you do an incr
1001 // ditto
1002 // ditto
0 // when "get" returns 999 or above, so you do a set
0 // ditto
0 // ditto
1

选项:

  1. 使用事务和约束 API 使您的逻辑并发安全
  2. 通过 ScriptEvaluate
  3. 将您的逻辑重写为 Lua 脚本
  1. use the transaction and constraint APIs to make your logic concurrency-safe
  2. rewrite your logic as a Lua script via ScriptEvaluate

现在,redis 事务(每个选项 1)很难.就个人而言,我会使用2"——除了更简单的编码和调试之外,这意味着你只有 1 次往返和操作,而不是get、watch、get、multi、incr/set、exec/丢弃"和从开始重试"循环以解决中止情况.如果你愿意,我可以试着把它写成 Lua - 它应该是大约 4 行.

Now, redis transactions (per option 1) are hard. Personally, I'd use "2" - in addition to being simpler to code and debug, it means you only have 1 round-trip and operation, as opposed to "get, watch, get, multi, incr/set, exec/discard", and a "retry from start" loop to account for the abort scenario. I can try to write it as Lua for you if you like - it should be about 4 lines.

这是 Lua 实现:

string key = ...
for(int i = 0; i < 2000; i++) // just a test loop for me; you'd only do it once etc
{
    int result = (int) db.ScriptEvaluate(@"
local result = redis.call('incr', KEYS[1])
if result > 999 then
    result = 0
    redis.call('set', KEYS[1], result)
end
return result", new RedisKey[] { key });
    Console.WriteLine(result);
}

注意:如果需要参数化最大值,可以使用:

Note: if you need to parameterize the max, you would use:

if result > tonumber(ARGV[1]) then

和:

int result = (int)db.ScriptEvaluate(...,
    new RedisKey[] { key }, new RedisValue[] { max });

(所以 ARGV[1]max 的值)

(so ARGV[1] takes the value from max)

有必要了解eval/evalsha(这是ScriptEvaluate 调用的内容)不与其他服务器请求竞争,所以 incr 和可能的 set 之间没有任何变化.这意味着我们不需要复杂的 watch 等逻辑.

It is necessary to understand that eval/evalsha (which is what ScriptEvaluate calls) are not competing with other server requests, so nothing changes between the incr and the possible set. This means we don't need complex watch etc logic.

这是相同的(我认为!)通过事务/约束 API:

Here's the same (I think!) via the transaction / constraint API:

static int IncrementAndLoopToZero(IDatabase db, RedisKey key, int max)
{
    int result;
    bool success;
    do
    {
        RedisValue current = db.StringGet(key);
        var tran = db.CreateTransaction();
        // assert hasn't changed - note this handles "not exists" correctly
        tran.AddCondition(Condition.StringEqual(key, current));
        if(((int)current) > max)
        {
            result = 0;
            tran.StringSetAsync(key, result, flags: CommandFlags.FireAndForget);
        }
        else
        {
            result = ((int)current) + 1;
            tran.StringIncrementAsync(key, flags: CommandFlags.FireAndForget);
        }
        success = tran.Execute(); // if assertion fails, returns false and aborts
    } while (!success); // and if it aborts, we need to redo
    return result;
}

复杂吧?简单的成功案例是:

GET {key}    # get the current value
WATCH {key}  # assertion stating that {key} should be guarded
GET {key}    # used by the assertion to check the value
MULTI        # begin a block
INCR {key}   # increment {key}
EXEC         # execute the block *if WATCH is happy*

这是......相当多的工作,并且涉及多路复用器上的管道停顿.更复杂的情况(断言失败、监视失败、回绕)的输出略有不同,但应该可以工作.

which is... quite a bit of work, and involves a pipeline stall on the multiplexer. The more complicated cases (assertion failures, watch failures, wrap-arounds) would have slightly different output, but should work.

这篇关于Redis 带锁的分布式增量的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆