RabbitMq-ConversationId与CorrelationId-哪个更适合跟踪特定请求? [英] RabbitMq - ConversationId vs CorrelationId - Which is the more appropriate for tracking a specific request?

查看:78
本文介绍了RabbitMq-ConversationId与CorrelationId-哪个更适合跟踪特定请求?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

RabbitMQ似乎具有两个非常相似的属性,我并不完全了解它们之间的区别.ConversationIdCorrelationId.

我的用例如下.我有一个生成 Guid 的网站.该网站将调用一个API,并将该唯一标识符添加到 HttpRequest 标头中.反过来,这会将消息发布到RabbitMQ.该消息由第一个使用者处理,然后传递到其他使用者,依此类推.

出于记录目的,我想记录一个将初始请求与所有后续操作联系在一起的标识符.对于整个应用程序不同部分的旅程而言,这应该是唯一的.因此.当登录到Serilog/ElasticSearch之类的东西时,可以很容易地看到哪个请求触发了初始请求,并且整个应用程序中该请求的所有日志条目都可以关联在一起.

我创建了一个提供程序,用于查看传入的 HttpRequest 作为标识符.我已经将其称为"CorrelationId",但我开始怀疑它是否应真正命名为"ConversationId".就RabbitMQ而言,"ConversationId"的想法更适合此模型,还是"CorrelationId"更好?

这两个概念有什么区别?

就代码而言,我希望做到以下几点.首先,在我的API中注册总线,然后将 SendPublish 配置为使用提供商提供的 CorrelationId .

API中的

 //总线注册var busSettings = context.Resolve< BusSettings>();//使用AspNetCoreCorrelationIdProvidervar relatedIdProvider = context.Resolve< ICorrelationIdProvider>();var busControl = Bus.Factory.CreateUsingRabbitMq(cfg =>{cfg.Host(新的Uri(busSettings.HostAddress),h =>{h.Username(busSettings.Username);h.Password(busSettings.Password);});cfg.ConfigurePublish(x => x.UseSendExecute(sendContext =>{//哪个更合适//sendContext.ConversationId = relatedIdProvider.GetCorrelationId();sendContext.CorrelationId = relatedIdProvider.GetCorrelationId();}));}); 

作为参考,这是我简单的提供程序界面

 //定义接口公共接口ICorrelationIdProvider{Guid GetCorrelationId();} 

还有AspNetCore实现,该实现提取调用方客户端(即网站)设置的唯一ID.

 公共类AspNetCoreCorrelationIdProvider:ICorrelationIdProvider{私有IHttpContextAccessor _httpContextAccessor;公共AspNetCoreCorrelationIdProvider(IHttpContextAccessor httpContextAccessor){_httpContextAccessor = httpContextAccessor;}公共Guid GetCorrelationId(){如果(_httpContextAccessor.HttpContext.Request.Headers.TryGetValue("correlation-Id",输出StringValues标头)){var header = headers.FirstOrDefault();如果(Guid.TryParse(header,out Guid headerCorrelationId)){返回headerCorrelationId;}}返回Guid.NewGuid();}} 

最后,我的服务主机是简单的Windows服务应用程序,用于存放和使用已发布的消息.他们使用以下内容来获取 CorrelationId,并且很可能会发布给其他消费者以及其他服务主机.

 公共类MessageContextCorrelationIdProvider:ICorrelationIdProvider{///< summary>///消费上下文///</summary>私有只读ConsumeContext _consumeContext;///< summary>///初始化< see cref ="MessageContextCorrelationIdProvider"/>的新实例.班级.///</summary>///< param name ="consumeContext">消费上下文.</param>公共MessageContextCorrelationIdProvider(ConsumeContext消费上下文){_consumeContext =消耗上下文;}///< summary>///获取关联标识符.///</summary>///< returns></returns>公共Guid GetCorrelationId(){//相关性或对话是吗?如果(_consumeContext.CorrelationId.HasValue& __ consumeContext.CorrelationId!= Guid.Empty){返回_consumeContext.CorrelationId.Value;}返回Guid.NewGuid();}} 

然后,我的使用者中有一个记录器,该记录器使用该提供程序来提取 CorrelationId :

 公共异步任务Consume(ConsumeContext< IMyEvent>上下文){var relatedId = _correlationProvider.GetCorrelationId();_logger.Info(correlationId,$"#### IMyEvent已为客户收到:{context.Message.CustomerId}");尝试{等待_mediator.Send(new SomeOtherRequest(correlationId){SomeObject:context.Message.SomeObject});}捕获(异常e){_logger.Exception(e,correlationId,$"Exception:{e}"));扔;}_logger.Info(correlationId,$处理完成:{DateTime.Now}");} 

阅读文档时,它会说以下有关"ConversationId"的内容:

对话是由发送的第一条消息创建的,或者已发布,没有可用的现有上下文(例如当使用IBus.Send或IBus.Publish发送或发布邮件.如果现有上下文用于发送或发布消息,将ConversationId复制到新消息中,以确保同一对话中的邮件具有相同的标识符.

现在,我开始认为我的术语混在一起了,从技术上讲,这是一次对话(尽管对话"就像电话游戏"一样.)

那么,在这种用例中是 CorrelationId 还是 ConversationId ?请帮助我弄清楚我的术语!!

解决方案

在消息对话中(提示性的乐谱),可能只有一条消息(我告诉过你做某事,或者我告诉每个正在听的人发生了什么)或多条消息(我告诉过您做某事,您告诉了其他人,或者我告诉正在听的每个人都发生了事,而那些听众告诉了他们的朋友,依此类推,依此类推.)

正确使用从第一条消息到最后一条消息的MassTransit,这些消息中的每个消息都将具有相同的 ConversationId .在消息使用过程中,MassTransit将属性未修改地从 ConsumeContext 复制到每个传出消息中.这使所有内容都成为同一 trace 的一部分-对话.

但是,MassTransit默认未设置CorrelationId.如果消息属性名为CorrelationId(或CommandId或EventId),则可以自动设置它,也可以添加自己的名称.

如果消耗的消息上存在CorrelationId,则所有传出消息都将具有该CorrelationId属性复制到InitiatorId属性(原因和效果-消耗的消息启动了后续消息的创建).这形成了一条链(或跟踪术语中的跨度),可以遵循该链来显示初始消息中的消息传播.

应将CorrelationId视为命令或事件的标识符,以便可以在整个系统日志中看到该命令的效果.

在我看来,您从HTTP输入的内容可能是发起者,因此将该标识符复制到InitiatorId中并为消息创建一个新的CorrelationId,或者您可能只想对初始CorrelationId使用相同的标识符,并让随后的消息将其用作发起方.

RabbitMQ seems to have two properties that are very similar, and I don't entirely understand the difference. ConversationId and CorrelationId.

My use case is as follows. I have a website that generates a Guid. The website calls an API, adding that unique identifier to the HttpRequest headers. This in turn publishes a message to RabbitMQ. That message is processed by the first consumer and passed off elsewhere to another consumer, and so on.

For logging purposes I want to log an identifier that ties the initial request together with all of the subsequent actions. This should be unique for that journey throughout the different parts of the application. Hence. When logged to something like Serilog/ElasticSearch, this then becomes easy to see which request triggered the initial request, and all of the log entries for that request throughout the application can be correlated together.

I have created a provider that looks at the incoming HttpRequest for an identifier. I've called this a "CorrelationId", but I'm starting to wonder if this should really be named a "ConversationId". In terms of RabbitMQ, does the idea of a "ConversationId" fit better to this model, or is "CorrelationId" better?

What is the difference between the two concepts?

In terms of code, I've looked to do the following. Firstly register the bus in my API and configure the SendPublish to use the CorrelationId from the provider.

// bus registration in the API
var busSettings = context.Resolve<BusSettings>();
// using AspNetCoreCorrelationIdProvider
var correlationIdProvider = context.Resolve<ICorrelationIdProvider>();

var busControl = Bus.Factory.CreateUsingRabbitMq(cfg =>
{
    cfg.Host(
        new Uri(busSettings.HostAddress),
        h =>
        {
            h.Username(busSettings.Username);
            h.Password(busSettings.Password);
        });
    cfg.ConfigurePublish(x => x.UseSendExecute(sendContext =>
    {
        // which one is more appropriate
        //sendContext.ConversationId = correlationIdProvider.GetCorrelationId();
        sendContext.CorrelationId = correlationIdProvider.GetCorrelationId();
    }));
});

For reference, this is my simple provider interface

// define the interface
public interface ICorrelationIdProvider
{
    Guid GetCorrelationId();
}

And the AspNetCore implementation, which extracts the unique ID set by the calling client (i.e. a website).

public class AspNetCoreCorrelationIdProvider : ICorrelationIdProvider
{
    private IHttpContextAccessor _httpContextAccessor;

    public AspNetCoreCorrelationIdProvider(IHttpContextAccessor httpContextAccessor)
    {
        _httpContextAccessor = httpContextAccessor;
    }

    public Guid GetCorrelationId()
    {
        if (_httpContextAccessor.HttpContext.Request.Headers.TryGetValue("correlation-Id", out StringValues headers))
        {
            var header = headers.FirstOrDefault();
            if (Guid.TryParse(header, out Guid headerCorrelationId))
            {
                return headerCorrelationId;
            }
        }

        return Guid.NewGuid();
    }
}

Finally, my Service hosts are simple windows service applications that sit and consume published messages. They use the following to grab the CorrelationId and might well publish to other consumers as well in other service hosts.

public class MessageContextCorrelationIdProvider : ICorrelationIdProvider
{
    /// <summary>
    /// The consume context
    /// </summary>
    private readonly ConsumeContext _consumeContext;

    /// <summary>
    /// Initializes a new instance of the <see cref="MessageContextCorrelationIdProvider"/> class.
    /// </summary>
    /// <param name="consumeContext">The consume context.</param>
    public MessageContextCorrelationIdProvider(ConsumeContext consumeContext)
    {
        _consumeContext = consumeContext;
    }

    /// <summary>
    /// Gets the correlation identifier.
    /// </summary>
    /// <returns></returns>
    public Guid GetCorrelationId()
    {
        // correlationid or conversationIs?
        if (_consumeContext.CorrelationId.HasValue && _consumeContext.CorrelationId != Guid.Empty)
        {
            return _consumeContext.CorrelationId.Value;
        }

        return Guid.NewGuid();
    }
}

I then have a logger in my consumer that uses that provider to extract the CorrelationId:

public async Task Consume(ConsumeContext<IMyEvent> context)
{
    var correlationId = _correlationProvider.GetCorrelationId();
    _logger.Info(correlationId, $"#### IMyEvent received for customer:{context.Message.CustomerId}");

    try
    {
        await _mediator.Send(new SomeOtherRequest(correlationId) { SomeObject: context.Message.SomeObject });
    }
    catch (Exception e)
    {
        _logger.Exception(e, correlationId, $"Exception:{e}");
        throw;
    }

    _logger.Info(correlationId, $"Finished processing: {DateTime.Now}");
}

Reading the docs, it says the following about a "ConversationId":

The conversation is created by the first message that is sent or published, in which no existing context is available (such as when a message is sent or published by using IBus.Send or IBus.Publish). If an existing context is used to send or publish a message, the ConversationId is copied to the new message, ensuring that a set of messages within the same conversation have the same identifier.

Now I'm starting to think that I've got my terminology mixed up, and technically this is a conversation (although the 'conversation' is like 'the telephone game').

So, CorrelationId in this use case, or ConversationId? Please help me get my terminology right!!

解决方案

In a message conversation (cue foreboding musical score), there can be a single message (I told you to do something, or I told everyone who is listening that something happened) or multiple messages (I told you to do something, and you told someone else, or I told everyone who is listening that something happened and those listeners told their friends, and so on, and so on).

Using MassTransit, from the first message to the final message, used properly, every single one of those messages would have the same ConversationId. MassTransit copies the property from ConsumeContext, unmodified, to every outgoing message during the consumption of a message. This makes everything part of the same trace - a conversation.

The CorrelationId, however, is not set by default by MassTransit. It can be automatically set if a message property is named CorrelationId (or CommandId, or EventId), or you can add your own names too.

If the CorrelationId is present on a consumed message, any outgoing messages will have that CorrelationId property copied to the InitiatorId property (cause, and effect -- the consumed message initiated the creation of the subsequent messages). This forms a chain (or span, in trace terminology) that can be followed to show the spread of messages from the initial message.

The CorrelationId should be thought of as the identifier for a command or event, such that the effects of that command can be seen throughout the system logs.

It sounds to me like your input from HTTP might be the Initiator, and thus copy that identifier into InitiatorId and create a new CorrelationId for the message, or you may want to just use the same identifier for the initial CorrelationId and let the subsequent messages use it as the initiator.

这篇关于RabbitMq-ConversationId与CorrelationId-哪个更适合跟踪特定请求?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆