Google Pub/Sub的RetryPolicy中配置的指数补偿如何工作? [英] How does the exponential backoff configured in Google Pub/Sub's RetryPolicy work?

查看:79
本文介绍了Google Pub/Sub的RetryPolicy中配置的指数补偿如何工作?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

最近发布的 cloud.google.com/go/pubsub 库(在v1.5.0中,请参见

我已经阅读了Wikipedia文章,尽管它描述了离散时间的指数补偿,但我看不到该文章与 MinimumBackoff MaximumBackoff 参数之间的关系.具体来说.对于此方面的指导,我参考了 github.com/cenkalti/backoff 的文档, type ExponentialBackOff struct {初始间隔时间RandomizationFactor float64乘数float64最大间隔时间//在MaxElapsedTime之后,ExponentialBackOff返回Stop.//如果MaxElapsedTime == 0,它永远不会停止.MaxElapsedTime time.Duration停止时间时钟时钟//包含已过滤或未导出的字段}

每个随机间隔的计算方式为

 随机间隔=RetryInterval *(范围为[1-RandomizationFactor,1 + RandomizationFactor]内的随机值) 

其中 RetryInterval 是当前重试间隔,据我所知,该间隔从 InitialInterval 的值开始,并由 MaxInterval 设置上限.

我是否正确理解 MinimumBackoff MaximumBackoff 对应于中的 InitialInterval MaxInterval github.com/cenkalti/backoff ?也就是说, MinimumBackoff 是初始等待时间,而 MaximumBackoff 是重试之间允许的最大时间?

为了检验我的理论,我编写了以下简化程序:

 程序包主要进口 (上下文"标志""fmt"日志""os"时间"" cloud.google.com/go/pubsub"" google.golang.org/grpc/codes"" google.golang.org/grpc/status")var(projectID字符串最小退避时间,最大退避时间.)const(topicName ="test-topic";subName =测试订阅";defaultMinimumBackoff = 10 *时间.第二defaultMaximumBackoff = 10 *时间.分钟)func main(){flag.StringVar(& projectID,"projectID","my-project","Google Project ID")flag.DurationVar(& minimumBackoff,"minimumBackoff",5 * time.Second,"minimum backoff")flag.DurationVar(& maximumBackoff,"maximumBackoff",60 * time.Second,"maximum backoff")flag.Parse()log.Printf(以最小退避%v和最大退避%v ...运行",minimumBackoff,maximumBackoff)retryPolicy:=& pubsub.RetryPolicy {MinimumBackoff:minimumBackoff,MaximumBackoff:maximumBackoff}客户端,错误:= pubsub.NewClient(context.Background(),projectID)如果err!= nil {log.Fatalf("NewClient:%v",err)}主题,错误:= client.CreateTopic(context.Background(),topicName)如果err!= nil {log.Fatalf("CreateTopic:%v",err)}log.Printf(创建的主题%q",topicName)延迟func(){topic.Stop()如果err:= topic.Delete(context.Background());err!= nil {log.Fatalf(删除主题:%v",err)}log.Printf(已删除的主题%s",topicName)}()sub,err:= client.CreateSubscription(context.Background(),subName,pubsub.SubscriptionConfig {主题:主题,RetryPolicy:retryPolicy,})如果err!= nil {log.Fatalf("CreateSubscription:%v",err)}log.Printf(创建的预订%q",子名称)延迟func(){如果err:= sub.Delete(context.Background());err!= nil {log.Fatalf(删除订阅:%v",err)}log.Printf(已删除的预订%q",子名称)}()转到func(){sub.Receive(context.Background(),func(ctx context.Context,msg * pubsub.Message){log.Printf("Nacking message:%s",msg.Data)msg.Nack()})}()topic.Publish(context.Background(),& pubsub.Message {Data:[] byte("Hello,world!")})log.Println(已发布消息")time.Sleep(60 * time.Second)} 

如果我分别使用标志默认的 MinimumBackoff MaximumBackoff 分别为5s和60s来运行它,则会得到以下输出:

 >去运行main.go2020/07/29 18:49:32以最小退避5s和最大退避1m0s运行...2020/07/29 18:49:33创建了主题"test-topic",2020/07/29 18:49:34创建了订阅"test-subscription"2020/07/29 18:49:34发布消息2020/07/29 18:49:36提示信息:世界,您好!2020/07/29 18:49:45提示信息:世界,您好!2020/07/29 18:49:56提示信息:世界,您好!2020/07/29 18:50:06提示信息:世界,您好!2020/07/29 18:50:17提示信息:世界,您好!2020/07/29 18:50:30提示信息:世界,您好!2020/07/29 18:50:35删除了订阅"test-subscription"2020/07/29 18:50:35删除的主题test-topic 

而如果我分别使用1s和2s的 MinimumBackoff MaximumBackoff 运行它,则会得到

 >去运行main.go --minimumBackoff = 1s --maximumBackoff = 2s2020/07/29 18:50:42以最小退避1s和最大退避2s运行...2020/07/29 18:51:11创建了主题"test-topic",2020/07/29 18:51:12创建了订阅"test-subscription"2020/07/29 18:51:12发布消息2020/07/29 18:51:15提示信息:世界,您好!2020/07/29 18:51:18提示信息:世界,您好!2020/07/29 18:51:21提示信息:世界,您好!2020/07/29 18:51:25提示信息:世界,您好!2020/07/29 18:51:28提示信息:世界,您好!2020/07/29 18:51:31提示信息:世界,您好!2020/07/29 18:51:35提示信息:世界,您好!2020/07/29 18:51:38提示信息:世界,您好!2020/07/29 18:51:40提示信息:世界,您好!2020/07/29 18:51:44提示信息:世界,您好!2020/07/29 18:51:47提示信息:世界,您好!2020/07/29 18:51:50提示信息:世界,您好!2020/07/29 18:51:52提示信息:世界,您好!2020/07/29 18:51:54提示信息:世界,您好!2020/07/29 18:51:57提示信息:世界,您好!2020/07/29 18:52:00打扰消息:世界,您好!2020/07/29 18:52:03提示信息:世界,您好!2020/07/29 18:52:06提示信息:世界,您好!2020/07/29 18:52:09提示信息:世界,您好!2020/07/29 18:52:12提示信息:世界,您好!2020/07/29 18:52:13删除了订阅"test-subscription"2020/07/29 18:52:13删除的主题test-topic 

在后一示例中,在两次打between之间的时间似乎一直持续约3秒,这大概表示尽力而为".在2s的 MaximumBackoff 中执行此操作?对我来说仍然不清楚的是,是否存在任何随机化,是否存在乘数(从第一个示例来看,重试之间的时间似乎并不是每次的两倍),以及是否存在相等的乘数. MaxElapsedTime 中的值,超过此范围就不会再进行重试?

最小重试和最大重试的重试策略字段与上面的示例中的InitialInterval和MaxInterval相似.Cloud Pub/Sub使用与您提到的公式相似的公式来计算指数延迟.这还包括随机化.

除MaxInterval之外,每次后续重试都会增加MaxInterval的延迟.如果要在尝试一定次数后停止重试,建议使用已死信件队列.

The cloud.google.com/go/pubsub library recently released (in v1.5.0, cf. https://github.com/googleapis/google-cloud-go/releases/tag/pubsub%2Fv1.5.0) support for a new RetryPolicy server-side feature. The documentation (https://godoc.org/cloud.google.com/go/pubsub#RetryPolicy) for this currently reads

I've read the Wikipedia article, and although it describes exponential backoff in discrete time, I don't see how the article relates to the MinimumBackoff and MaximumBackoff parameters specifically. For guidance on this, I referred to the documentation for github.com/cenkalti/backoff, https://pkg.go.dev/github.com/cenkalti/backoff/v4?tab=doc#ExponentialBackOff. That library defines an ExponentialBackoff as

type ExponentialBackOff struct {
    InitialInterval     time.Duration
    RandomizationFactor float64
    Multiplier          float64
    MaxInterval         time.Duration
    // After MaxElapsedTime the ExponentialBackOff returns Stop.
    // It never stops if MaxElapsedTime == 0.
    MaxElapsedTime time.Duration
    Stop           time.Duration
    Clock          Clock
    // contains filtered or unexported fields
}

where each randomized interval is calculated as

randomized interval =
    RetryInterval * (random value in range [1 - RandomizationFactor, 1 + RandomizationFactor])

where RetryInterval is the current retry interval which, as I understand it, starts at a value of InitialInterval and is capped by MaxInterval.

Do I understand correctly that the MinimumBackoff and MaximumBackoff correspond to the InitialInterval and MaxInterval in github.com/cenkalti/backoff? That is, the MinimumBackoff is the initial wait period, and the MaximumBackoff is the largest amount of time allowed between retries?

To test my theories, I wrote the following simplified program:

package main

import (
    "context"
    "flag"
    "fmt"
    "log"
    "os"
    "time"

    "cloud.google.com/go/pubsub"
    "google.golang.org/grpc/codes"
    "google.golang.org/grpc/status"
)

var (
    projectID                      string
    minimumBackoff, maximumBackoff time.Duration
)

const (
    topicName             = "test-topic"
    subName               = "test-subscription"
    defaultMinimumBackoff = 10 * time.Second
    defaultMaximumBackoff = 10 * time.Minute
)

func main() {
    flag.StringVar(&projectID, "projectID", "my-project", "Google Project ID")
    flag.DurationVar(&minimumBackoff, "minimumBackoff", 5*time.Second, "minimum backoff")
    flag.DurationVar(&maximumBackoff, "maximumBackoff", 60*time.Second, "maximum backoff")
    flag.Parse()
    log.Printf("Running with minumum backoff %v and maximum backoff %v...", minimumBackoff, maximumBackoff)

    retryPolicy := &pubsub.RetryPolicy{MinimumBackoff: minimumBackoff, MaximumBackoff: maximumBackoff}

    client, err := pubsub.NewClient(context.Background(), projectID)
    if err != nil {
        log.Fatalf("NewClient: %v", err)
    }

    topic, err := client.CreateTopic(context.Background(), topicName)
    if err != nil {
        log.Fatalf("CreateTopic: %v", err)
    }
    log.Printf("Created topic %q", topicName)
    defer func() {
        topic.Stop()
        if err := topic.Delete(context.Background()); err != nil {
            log.Fatalf("Delete topic: %v", err)
        }
        log.Printf("Deleted topic %s", topicName)
    }()

    sub, err := client.CreateSubscription(context.Background(), subName, pubsub.SubscriptionConfig{
        Topic:       topic,
        RetryPolicy: retryPolicy,
    })
    if err != nil {
        log.Fatalf("CreateSubscription: %v", err)
    }
    log.Printf("Created subscription %q", subName)
    defer func() {
        if err := sub.Delete(context.Background()); err != nil {
            log.Fatalf("Delete subscription: %v", err)
        }
        log.Printf("Deleted subscription %q", subName)
    }()

    go func() {
        sub.Receive(context.Background(), func(ctx context.Context, msg *pubsub.Message) {
            log.Printf("Nacking message: %s", msg.Data)
            msg.Nack()
        })
    }()

    topic.Publish(context.Background(), &pubsub.Message{Data: []byte("Hello, world!")})
    log.Println("Published message")
    time.Sleep(60 * time.Second)
}

If I run it with the flag-default MinimumBackoff and MaximumBackoff of 5s and 60s, respectively, I get the following output:

> go run main.go
2020/07/29 18:49:32 Running with minumum backoff 5s and maximum backoff 1m0s...
2020/07/29 18:49:33 Created topic "test-topic"
2020/07/29 18:49:34 Created subscription "test-subscription"
2020/07/29 18:49:34 Published message
2020/07/29 18:49:36 Nacking message: Hello, world!
2020/07/29 18:49:45 Nacking message: Hello, world!
2020/07/29 18:49:56 Nacking message: Hello, world!
2020/07/29 18:50:06 Nacking message: Hello, world!
2020/07/29 18:50:17 Nacking message: Hello, world!
2020/07/29 18:50:30 Nacking message: Hello, world!
2020/07/29 18:50:35 Deleted subscription "test-subscription"
2020/07/29 18:50:35 Deleted topic test-topic

whereas if I run it with MinimumBackoff and MaximumBackoff of 1s and 2s, respectively, I get

> go run main.go --minimumBackoff=1s --maximumBackoff=2s
2020/07/29 18:50:42 Running with minumum backoff 1s and maximum backoff 2s...
2020/07/29 18:51:11 Created topic "test-topic"
2020/07/29 18:51:12 Created subscription "test-subscription"
2020/07/29 18:51:12 Published message
2020/07/29 18:51:15 Nacking message: Hello, world!
2020/07/29 18:51:18 Nacking message: Hello, world!
2020/07/29 18:51:21 Nacking message: Hello, world!
2020/07/29 18:51:25 Nacking message: Hello, world!
2020/07/29 18:51:28 Nacking message: Hello, world!
2020/07/29 18:51:31 Nacking message: Hello, world!
2020/07/29 18:51:35 Nacking message: Hello, world!
2020/07/29 18:51:38 Nacking message: Hello, world!
2020/07/29 18:51:40 Nacking message: Hello, world!
2020/07/29 18:51:44 Nacking message: Hello, world!
2020/07/29 18:51:47 Nacking message: Hello, world!
2020/07/29 18:51:50 Nacking message: Hello, world!
2020/07/29 18:51:52 Nacking message: Hello, world!
2020/07/29 18:51:54 Nacking message: Hello, world!
2020/07/29 18:51:57 Nacking message: Hello, world!
2020/07/29 18:52:00 Nacking message: Hello, world!
2020/07/29 18:52:03 Nacking message: Hello, world!
2020/07/29 18:52:06 Nacking message: Hello, world!
2020/07/29 18:52:09 Nacking message: Hello, world!
2020/07/29 18:52:12 Nacking message: Hello, world!
2020/07/29 18:52:13 Deleted subscription "test-subscription"
2020/07/29 18:52:13 Deleted topic test-topic

It seems like in the latter example, the time between nacks is pretty consistently ~3s, which presumably represents a "best effort" to do it in the MaximumBackoff of 2s? What is still not clear to me is whether there is any randomization, whether there is a multiplier (from the first example, it doesn't seem like the time between retries is getting twice as long every time), and whether there is an equivalent of the MaxElapsedTime beyond which there are no more retries?

解决方案

Retry policy fields for minimum backoff and maximum backoff are similar to InitialInterval and MaxInterval in your example above. Cloud Pub/Sub uses a similar formula as you mentioned to compute the exponential delay. This includes randomization as well.

Beyond MaxInterval, every subsequent retry would have an added delay of MaxInterval. If you want to stop the retries after a certain number of attempts, we recommend using Dead Letter Queues.

这篇关于Google Pub/Sub的RetryPolicy中配置的指数补偿如何工作?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆