在Java Servlet中使用中断器并处理多个事件 [英] Using disruptor in the Java Servlet and handling multiple events

查看:115
本文介绍了在Java Servlet中使用中断器并处理多个事件的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我在Web应用程序中使用LMAX干扰器,该干扰器接受http请求参数并将其处理到环形缓冲区. 3个事件处理程序处理并处理数据,最后一个将其保存到数据库中. 实例化Servlet时,初始化一次环形缓冲区.这样对吗?

I am using the LMAX disruptor in my web application which takes the http request parameters and handle them to the ringbuffer. 3 event-handlers handle and process data, the final one, saves it to the database. Initialize the ringbuffer once, when the servlet is instantiated. is this right?

public void init() throws ServletException {

    Disruptor<CampaignCode> disruptor = new Disruptor<CampaignCode>(
            CampaignCode.FACTORY, 1024, Executors.newCachedThreadPool());

    EventHandler<CampaignCode> campaignDetailsLoader = new CampaignDetailsLoaderEvent();
    EventHandler<CampaignCode> templateEvent = new TemplateBuildEvent();
    EventHandler<CampaignCode> codeGenerator = new CodeGenerationEventHandler();
    EventHandler<CampaignCode> campaignSaveEventHandler= new CampaignSaveEventHandler();

    disruptor.handleEventsWith(templateEvent, campaignDetailsLoader).then(
            codeGenerator).then(campaignSaveEventHandler);
    this.ringBuffer = disruptor.start();
}

在这里,我将值直接放入环形缓冲区

here I put the values straight into the ringbuffer

    @Override
protected void doPost(HttpServletRequest request,
        HttpServletResponse response) throws ServletException, IOException      {
    String campaignId = request.getParameter("campaignId");
    String campaignType = request.getParameter("campaignType");
    if (campaignId != null && !campaignId.isEmpty()) {
        long sequence = ringBuffer.next();
        CampaignCode campaign = ringBuffer.get(sequence);
        campaign.setCampaignId(Long.parseLong(campaignId));
        campaign.setCampaignType(campaignType);
        ringBuffer.publish(sequence);
    }
  }

事件处理程序

public class CampaignDetailsLoaderEvent implements EventHandler<CampaignCode> {
     @Override
public void onEvent(CampaignCode event, long sequence, boolean endOfBatch)
        throws Exception {
         //load details from db and process
         // udpate values to the event object
  }
 }

  public class TemplateBuildEvent implements EventHandler<CampaignCode> {
     @Override
public void onEvent(CampaignCode event, long sequence, boolean endOfBatch)
        throws Exception {
         // find the template of this type
         // set template to the event object
  }
 }

 public class CodeGenerationEventHandler implements EventHandler<CampaignCode> {
     @Override
public void onEvent(CampaignCode event, long sequence, boolean endOfBatch)
        throws Exception {
         // generate custom dsl code and execute it
         // update the output to the event object 
         //next handler will save it the db
  }
 }

  public class CampaignSaveEventHandler implements EventHandler<CampaignCode> {
     @Override
public void onEvent(CampaignCode event, long sequence, boolean endOfBatch)
        throws Exception {
         // save the details to db
         // done!
  }
 }

这是发布到环形缓冲区的正确方法吗?我需要同步"ringBuffer"对象吗?前2个事件并行运行,然后第3个事件并行运行.如果发布商速度快且消费者速度慢,我应该如何处理?我使用的是Disruptor 3.1.1,在Web环境中找不到很好的Disruptor用法示例.一个简单的代码实现(如果您已完成的话)将帮助我理解很多!

is this the right way to publish to the ringbuffer? do I need to synchronize the "ringBuffer" object? First 2 events run parallel, then the 3rd event. How should I handle this when I have fast publishers and slow consumers? I am using the disruptor 3.1.1, I could not find good usage example of disruptor in a web environment. A simple code implementation, if you have done one, would help me understand this a lot!

推荐答案

给定您要求的代码要求,此实现是正确的.最佳做法是将发布代码包装在try-finally块中,以确保始终发布声明的序列:

This implementation is correct given the code requirements you've stated. Best practice is to wrap your publishing code in a try-finally block to ensure that a claimed sequence is always published:

long sequence = ringBuffer.next();
  try {
  Event e = ringBuffer.get(sequence);
  // Do some work with the event.
} finally {
  ringBuffer.publish(sequence);
}

在构造函数中明确指定您需要多生产者Disruptor,但这也是一个好主意,但已在您使用的默认构造函数中完成.您应该将对RingBuffer的写入同步,因为声明和发布序列号的过程已经是线程安全的.但是请注意,不能保证在同时调用doPost()时将事件发布到RingBuffer的顺序将与您的Web应用程序收到事件的顺序相同.

It may also be a good idea to explicitly specify in the constructor that you need a multiple-producer Disruptor, but that is already done in the default constructor you've used. You should not synchronize writes to the RingBuffer as the process of claiming and publishing the sequence number is already thread-safe. Note however that there is no guarantee that the order in which events are published to the RingBuffer in concurrent invocations of doPost() will be the same as the order they're received by your web application.

Disruptor只是一个专门的队列,因此受到无限制增长所遇到的所有常见问题的困扰.如果缓冲区中没有可用的插槽,则对ringBuffer.next()的调用将阻塞,直到一个可用为止.您应该同时为RingBuffer提供足够的容量来处理突发流量,并考虑在缓冲区已满的情况下(希望极少发生)施加反向压力的方法.

The Disruptor is just a specialized queue and is therefore subject to all the usual problems they have with unbounded growth. If there are no available slots in the buffer, your call to ringBuffer.next() will block until one becomes available. You should both provide sufficient capacity to the RingBuffer to handle bursts of traffic, and consider ways to apply back pressure in the (hopefully rare) case that the buffer is filled.

在您的特定用例中,如果CodeGenerationCampaignSave步骤与前两个步骤相比花费的时间很长,并且可以推迟,则可以使用其他Disruptors/RingBuffers来排队事件对于那些处决.

In your particular use case, if the CodeGeneration or CampaignSave steps are taking a very long time compared to the first two, and can be deferred, it may make sense to use additional Disruptors/RingBuffers to queue up events for those executions.

这篇关于在Java Servlet中使用中断器并处理多个事件的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆