防止消息处理的竞争条件 [英] Preventing race conditions for message processing

查看:125
本文介绍了防止消息处理的竞争条件的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个J2EE应用程序,它通过Web服务接收消息(事件)。消息具有不同类型(根据类型需要不同的处理)并以特定顺序发送。它已经确定了一些问题,其中一些消息类型需要比其他消息类型更长结果是序列中第二个接收的消息可以在序列中的第一个之前被处理。我试图通过在处理消息的方法周围放置一个synchronized块来解决这个问题。这似乎有效,但我不相信这是正确的方法吗?是否有可能更合适的替代方案或可接受?我已经包含了一小段代码,试图更清楚地解释。 ....任何建议/指导意见。

I have a J2EE application that receives messages (events) via a web service. The messages are of varying types (requiring different processing depending on type) and sent in a specific sequence. It have identified a problem where some message types take longer to process than others. The result is that a message received second in a sequence may be processed before the first in the sequence. I have tried to address this problem by placing a synchronized block around the method that processes the messages. This seems to work, but I am not confident that this is the "correct" approach? Is there perhaps an alternative that may be more appropriate or is this "acceptable"? I have included a small snippit of code to try to explain more clearly. .... Any advice / guidance appreciated.

public class EventServiceImpl implements EventService {
  public String submit (String msg) {

    if (msg == null)
        return ("NAK");

            EventQueue.getInstance().submit(msg);

    return "ACK";
  }
}


public class EventQueue {
    private static EventQueue instance = null;
    private static int QUEUE_LENGTH = 10000;
    protected boolean done = false;
    BlockingQueue<String> myQueue = new LinkedBlockingQueue<String>(QUEUE_LENGTH);

protected EventQueue() {
    new Thread(new Consumer(myQueue)).start();
}

public static EventQueue getInstance() {
      if(instance == null) {
         instance = new EventQueue();
      }
      return instance;
}

public void submit(String event) {
    try {
        myQueue.put(event);
    } catch (InterruptedException ex) {
    }
}

class Consumer implements Runnable {
    protected BlockingQueue<String> queue;

    Consumer(BlockingQueue<String> theQueue) { this.queue = theQueue; }

    public void run() {
      try {
        while (true) {
          Object obj = queue.take();
          process(obj);
          if (done) {
            return;
          }
        }
      } catch (InterruptedException ex) {
      }
    }

    void process(Object obj) {
        Event event = new Event( (String) obj);
        EventHandler handler = EventHandlerFactory.getInstance(event);
        handler.execute();
    }
}

// Close queue gracefully
public void close() {
    this.done = true;
}


推荐答案

我不确定是什么您正在使用的框架(EJB(MDB)/ JMS)。通常应该避免在 Managed Environment 中使用像EJB / JMS那样的同步(这不是一个好习惯)。一种解决方法是

I am not sure what is the framework (EJB(MDB)/JMS) you are working with. Generally using synchronization inside a Managed Environment like that of EJB/JMS should be avoided(its not a good practice). One way to get around is


  • 客户端在发送下一条消息之前应等待服务器的确认。

  • 这样客户端本身就可以控制事件的顺序。

请注意,如果有多个客户提交邮件,这将不起作用。

Please note this won't work if there are multiple client submitting the messages.

编辑:

您遇到的情况是,Web服务的客户端按顺序发送消息而不考虑记帐消息处理时间。它只是一个接一个地转储消息。对于基于队列(先进先出)的解决方案,这是一个很好的案例。我建议用以下两种方法来实现这个目标

You have a situation wherein the client of the web service sends message in sequence without taking into account the message processing time. It simply dumps the message one after another. This is a good case for Queue ( First In First Out ) based solution. I suggest following two ways to accomplish this


  1. 使用 JMS 。这将增加 JMS提供程序并编写一些管道代码。

  2. 使用一些多头模式,例如制片人 - 消费者其中您的Web服务处理程序将传入消息转储到队列中,单线程使用者将一次消耗一条消息。请参阅使用java.util.concurrent包此示例

  3. 使用数据库。将传入的消息转储到数据库中。使用不同的基于调度程序的程序扫描数据库(基于序列号)并相应地处理消息。

  1. Use JMS . This will have an additional overhead of adding a JMS providers and writing some plumbing code.
  2. Use some multitheading pattern like Producer-Consumer wherein your web service handler will be dumping the incoming message in a Queue and a single threaded consumer will consume one message at a time. See this example using java.util.concurrent package.
  3. Use database. Dump the incoming messages into a database. Use a different scheduler based program to scan the datbase (based on sequence number) and process the messages accordingly.

第一和第三种解决方案对于这些类型的问题非常标准。第二种方法很快,代码中不需要任何额外的库。

First and third solution is very standard for these type of problems. The second approach would be quick and won't need any additional libraries in your code.

这篇关于防止消息处理的竞争条件的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆