消费者投票房价阿卡,SQS和骆驼 [英] Consumer Poll Rate with Akka, SQS, and Camel

查看:314
本文介绍了消费者投票房价阿卡,SQS和骆驼的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我工作的一个项目,需要从SQS消息的阅读,我决定用阿卡分发这些消息的处理。

A project I'm working on requires the reading of messages from SQS, and I decided to use Akka to distribute the processing of these messages.

由于SQS是骆驼的支持,并有内置在消费类阿卡使用的功能,我想象这将是最好的实现端点和阅读邮件这种方式,虽然我从来没有见过的人很多例子这样做。

Since SQS is support by Camel, and there is functionality built in for use in Akka in the Consumer class, I imagined it would be best to implement the endpoint and read messages this way, though I had not seen many examples of people doing so.

我的问题是,我不能查询我的队列很快,足以让我的队列为空或接近空。我原本以为,我可以得到一个消费超过骆驼从SQS在X / s的速率接收消息。从那里,我可以简单地创建更多的消费者起床到我所需要处理的消息率。

My problem is that I cannot poll my queue quickly enough to keep my queue empty, or near empty. What I originally thought was that I could get a Consumer to receive messages over Camel from SQS at a rate of X/s. From there, I could simply create more Consumers to get up to the rate at which I needed messages processed.

我的消费者:

import akka.camel.{CamelMessage, Consumer}
import akka.actor.{ActorRef, ActorPath}

class MyConsumer() extends Consumer {
  def endpointUri = "aws-sqs://my_queue?delay=1&maxMessagesPerPoll=10&accessKey=myKey&secretKey=RAW(mySecret)"
  var count = 0

  def receive = {
    case msg: CamelMessage => {
      count += 1
    }
    case _ => {
      println("Got something else")
    }
  }

  override def postStop(){
    println("Count for actor: " + count)
  }
}

如图所示,我设置了延迟= 1 以及&安培; maxMessagesPerPoll = 10 来改善消息的速度,但我无法产卵多个消费者使用相同的终点。

As shown, I've set delay=1 as well as &maxMessagesPerPoll=10 to improve the rate of messages, but I'm unable to spawn multiple consumers with the same endpoint.

我在文档的默认情况下端点被认为不支持多消费者阅读。,我相信这适用于SQS终端也是如此,产卵多的消费者会给我的只有一个消费者,其中系统运行一分钟后,输出消息为计数演员:X 而不是其他哪个输出计数演员:0

I read in the docs that By default endpoints are assumed not to support multiple consumers. and I believe this holds true for SQS endpoints as well, as spawning multiple consumers will give me only one consumer where after running the system for a minute, the output message is Count for actor: x instead of the others which output Count for actor: 0.

如果这是在所有有用的;我能读约33消息/秒,对单个消费者这个当前的实现。

If this is at all useful; I'm able to read approximately 33 messages/second with this current implementation on the single consumer.

这是正确的方法是在阿卡的SQS队列中读取消息?如果是这样,有没有办法,我能得到这个规模向外,这样我可以增加我的消息的消费率更接近900消息/秒?

Is this the proper way to be reading messages from an SQS queue in Akka? If so, is there way I can get this to scale outward so that I can increase my rate of message consumption closer to that of 900 messages/second?

推荐答案

可悲的是骆驼目前不支持消息的并行消费上SQS。

Sadly Camel does not currently support parallel consumption of messages on SQS.

http://camel.465427.n5.nabble.com/Amazon-SQS-listener-as-multi-threaded-td5741541.html

要解决这个问题,我写我自己的演员使用AWS-Java的SDK来轮询一批消息SQS。

To address this I've written my own Actor to poll batch messages SQS using the aws-java-sdk.

  def receive = {
    case BeginPolling => {
      // re-queue sending asynchronously
      self ! BeginPolling
      // traverse the response
      val deleteMessageList = new ArrayList[DeleteMessageBatchRequestEntry]
      val messages = sqs.receiveMessage(receiveMessageRequest).getMessages
      messages.toList.foreach {
        node => {
          deleteMessageList.add(new DeleteMessageBatchRequestEntry(node.getMessageId, node.getReceiptHandle))
          //log.info("Node body: {}", node.getBody)
          filterSupervisor ! node.getBody
        }
      }
      if(deleteEntryList.size() > 0){
        val deleteMessageBatchRequest = new DeleteMessageBatchRequest(queueName, deleteMessageList)
        sqs.deleteMessageBatch(deleteMessageBatchRequest)
      }
    }

    case _ => {
      log.warning("Unknown message")
    }
  }

虽然我不能肯定这是否是最好的实现,它当然可以对这样的请求不会不断冲击空队列,但它适合我的电流能够从同一个轮询消息需要改进队列中。

Though I'm not certain if this is the best implementation, and it could of course be improved upon so that requests are not constantly hitting an empty queue, it does suit my current needs of being able to poll messages from the same queue.

获取有关133(消息/秒)从SQS /演员与此有关。

Getting about 133 (messages/second)/actor from SQS with this.

这篇关于消费者投票房价阿卡,SQS和骆驼的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆