IBM MQ cmit和使用同步点的回滚 [英] IBM MQ cmit and rollback with syncpoint

查看:129
本文介绍了IBM MQ cmit和使用同步点的回滚的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

基础概述:

我有一个设置,我在其中读取来自IBM MQ的一组消息,并在k8集群env中处理这些消息,并将其发送到目标主机.

I have a setup where I am reading a set of messages from IBM MQ and processing those messages in k8 cluster env and sending it to the destination host.

问题:

我观察到有时消息流很大,并且在将其发送到目标主机之前,我们的pod失败并重新启动,由于我们遵循从<读取和删除的方法,因此我们丢失了所有消息.一个href ="https://github.com/ibm-messaging/mq-mqi-nodejs/blob/master/samples/amqsget.js" rel ="nofollow noreferrer"> ibmmq示例

I observed that sometimes the flow of the messages is huge and before sending it to the destination host our pod gets failed and restarts, by this we are losing all the messages as we are following a read-and-delete approach from ibmmq example

期望的解决方案:

我正在寻找一种解决方案,在这些消息发送到目标主机之前,我们不会丢失消息的踪迹.

I am looking for a solution where, until these messages are sent to the destination host, we don't lose the track of the messages.

我尝试过的事情:

我们在IBM MQ中有一个工作单元的概念,但是由于我们不能期望读取和处理出现延迟,因此我不能等待单个消息被处理然后读取.另一个消息,因为它可能会严重影响性能.

We have a concept of unit of work in IBM MQ but since we can't expect a delay in reading and processing, I can't wait for a single message to get processed and then read the another message as it might have a major performance setback.

代码语言:

NodeJs

推荐答案

正如评论所建议的,有很多方法可以使这只猫皮肤化,但是您将需要使用事务处理.

As the comments suggest there are a number of ways to skin this cat, but you will need to use transactions.

使用事务选项创建连接后,事务作用域即会开始.当您提交或回滚时,此操作将关闭并开始下一个事务.

As soon as you create the connection with the transaction option, the transaction scope begins. This gets closed and next transaction begins when you either commit or rollback.

因此,您应该分批处理对您的应用程序有意义的消息,并在批处理完成时提交.如果您的应用程序被k8杀死,那么所有未提交的已读消息将通过回退队列过程来回滚,以停止有毒消息.

So you should handle the messages in batches, that make sense to your application, and commit when the batch is complete. If your application is killed by k8s then all uncommitted read messages will get rolled back, via back out queue process to stop poison messages.

添加了部分以显示示例代码,并解释了退出队列.

Section added to show sample code, and explanation of backout queues.

在正常处理中,如果某个应用在没有时间处理消息之前就停止了,您将希望该消息返回到队列中.这样邮件仍然可以处理.

In your normal processing, if an app gets stopped before it has had time to process the message, you will want that message returned to the queue. So that the message is still available to be processed.

要启用此回滚,您需要将 MQC.MQPMO_SYNCPOINT 中的或放入获取消息选项

gmo.Options |= MQC.MQGMO_SYNCPOINT 

然后,如果一切顺利,您可以提交.

Then if all goes well, you can commit.

  mq.Cmit(hConn, function(err) {
    if (err) {
      debug_warn('Error on commit', err);
    } else {
      debug_info('Commit was successful');
    }
  });

或回滚

  mq.Back(hConn, function(err) {
    if (err) {
      debug_warn('Error on rollback', err);
    } else {
      debug_info('rollback was successful');
    }
  });

如果回滚,则消息将返回到队列.这意味着它也是您的应用程序将读取的下一条消息.这可能会生成有害消息循环.因此,您还应该为您的应用程序用户设置一个 backout队列,并为其 pass all context 权限和一个 backout threshold .

假设将阈值设置为5.该消息可以回滚读取5次.您的应用程序需要检查阈值并确定它是有害消息,然后将其移出队列.

Say you set the threshold to 5. The message can be read 5 times, with rollback. Your app needs to check the threshold and decide that it is a poison message and move it off the queue.

要检查回退阈值(和回退队列名称),可以使用以下代码

To check the backout threshold (and the backout queue name) you can use the following code


    // Remember to or in the Inquire option on the Open
    openOptions |= MQC.MQOO_INQUIRE;

...
    attrs = [ new mq.MQAttr(MQC.MQIA_BACKOUT_THRESHOLD), 
             new mq.MQAttr(MQC.MQCA_BACKOUT_REQ_Q_NAME) ];

    mq.Inq(hObj, attrs, (err, selectors) => {
        if (err) {
          debug_warn('Error retrieving backout threshold', err);
        } else {
          debug_info('Attributes have been found');

          selectors.forEach((s) => {
              switch (s.selector) {
                case MQC.MQIA_BACKOUT_THRESHOLD:
                debug_info('Threshold is ', s.value);
                break;
              case MQC.MQCA_BACKOUT_REQ_Q_NAME:
                debug_info('Backout queue is ', s.value);
                break;
           }
          });
        }
      });

收到消息时,您的应用程序可以使用mqmd.BackoutCount来检查消息回滚的频率.

When getting the message your app can use mqmd.BackoutCount to check how often the message has been rolled back.

    if (mqmd.BackoutCount >= threshold) {
      ...
    }

我注意到的是,如果这是在同一应用程序实例中重复调用同一条消息的回滚,那么在阈值处将引发 MQRC_HOBJ_ERROR 错误.您的应用可以检查哪些内容,然后丢弃该消息.如果它是不同的应用程序实例,则不会出现 MQRC_HOBJ_ERROR 错误,因此它可以检查退出阈值并可以丢弃消息,记住要执行丢弃操作.

What I have noticed, that if this is in the same application instance that is repeatedly calling rollback on the same message, then at the threshold a MQRC_HOBJ_ERROR error is thrown. Which your app can check for, and then discard the message. If its a different app instance then it doesn't get the MQRC_HOBJ_ERROR error, so it can check the backout threshold and can discard the message, remembering to commit the discard action.

请参见 https://github.com/ibm-messaging/mq-dev-patterns/tree/master/transactions/JMS/SE 了解更多信息.

您也可以使用keda- https://keda.sh -与k8s一起使用根据等待处理的消息数(而不是CPU/内存消耗)来监视队列的深度和规模.这样,您可以在有大量消息等待处理时扩大规模,然后慢慢缩小规模,从而使队列变得可管理.这是入门的链接- https://github.com/ibm-messaging/mq-dev-patterns/tree/master/Go-K8s -该示例适用于Go应用程序,但同样适用于Node.js

As an alternative you could use keda - https://keda.sh - which works with k8s to monitor your queue depth and scale according to the number of messages waiting to be processed, as opposed to CPU / memory consumption. That way you can scale up when there are lots of messages waiting to be processed, and slowly scale down then the queue becomes manageable. Here is a link to getting started - https://github.com/ibm-messaging/mq-dev-patterns/tree/master/Go-K8s - the example is for a Go app, but equally applies to Node.js

这篇关于IBM MQ cmit和使用同步点的回滚的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆