Akka流如何持续实现? [英] How can Akka streams be materialized continually?

查看:80
本文介绍了Akka流如何持续实现?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在使用 Akka在Scala中流式传输,以使用 AWS SQS 队列进行轮询://aws.amazon.com/sdk-for-java/ rel = noreferrer> AWS Java SDK 。我创建了 ActorPublisher 会在两秒钟的时间间隔内使消息出队:

  class SQSSubscriber(name:String)扩展了ActorPublisher [Message] {
隐式val实现器= ActorMaterializer()

val调度= context.system.scheduler.schedule(0秒,2秒,自身,出队)

val客户端=新的AmazonSQSClient()
client.setRegion(RegionUtils.getRegion( us-east-1))
val url = client.getQueueUrl(name).getQueueUrl

val MaxBufferSize = 100
var buf = Vector.empty [Message]

覆盖def接收:Receive = {
case dequeue =>
val消息= iterableAsScalaIterable(client.receiveMessage(新ReceiveMessageRequest(url).getMessages).toList
messages.foreach(self!_)
案例消息:buf.size == MaxBufferSize时的消息=>
log.error(缓冲区已满)
案例消息:消息=>
if(buf.isEmpty&& totalDemand> 0)
onNext(消息)
else {
buf:+ =消息
deliveryBuf()
}
case Request(_)=>
deliveryBuf( )
case Cancel =>
context.stop(self)
}

@tailrec final def deliveryBuf():Unit =
if(totalDemand > 0){
if(totalDemand< = Int.MaxValue){
val(use,keep)= buf.splitAt(totalDemand.toInt)
buf =保持
使用foreach onNext
}否则{
val(use,keep)= buf.splitAt(Int.MaxValue)
buf =保持
使用foreach onNext
DeliveryBuf()
}
}
}

在我的应用程序中,我也尝试每隔2秒运行一次该流:

  val system = ActorSystem( system)
val sqsSource = Source.actorPublisher [Message](SQSSubscriber.props( queue-name))
val flow = Flow [Message]
.map {elem => system.log.debug(s $ {elem.getBody}($ {elem.getMessageId})); elem}
.to(Sink.ignore)

system.scheduler.schedule(0 seconds,2 seconds){
flow.runWith(sqsSource)(ActorMaterializer()(system ))
}

但是,当我运行我的应用程序时,我收到 java.util.concurrent.TimeoutException:期货在[20000毫秒] 和随后的 ActorMaterializer 引起的死信通知之后超时。 / p>

是否存在推荐的方法来持续实现Akka流?

解决方案

我认为您不需要每2秒创建一个新的 ActorPublisher 。这似乎是多余的,浪费内存。另外,我认为不需要ActorPublisher。据我所了解的代码,您的实现将具有越来越多的Streams,它们都查询相同的数据。来自客户端的每条消息将由N个不同的akka​​流进行处理,更糟糕的是,N将随着时间的流逝而增长。



无限循环查询迭代器



您可以使用scala的 Iterator从ActorPublisher中获得相同的行为。可以创建一个不断查询客户端的迭代器:

  //设置客户端
val client = {
val sqsClient =新的AmazonSQSClient()
sqsClient setRegion(RegionUtils getRegion us-east-1)
sqsClient
}

val url = client .getQueueUrl(name).getQueueUrl

//单个查询
def queryClientForMessages:Iterable [Message] = iterableAsScalaIterable {
client receiveMessage(new ReceiveMessageRequest(url).getMessages)
}

def messageListIteartor:Iterator [Iterable [Message]] =
迭代器不断messageListStream

//一次按需发送消息,没有计时器会迫使您绕
def messageIterator():Iterator [Message] = messageListIterator flatMap身份

此实现仅在消耗完所有先前的消息后才查询客户端,因此真正是反应性。无需跟踪固定大小的缓冲区。您的解决方案需要一个缓冲区,因为创建消息(通过计时器)与使用消息(通过println)是分离的。在我的实现中,创建和消耗是通过反压紧密耦合的。 / p>

Akka流源



然后,您可以使用此Iterator生成器函数进行供稿一个akka流来源:

  def messageSource:Source [Message,_] =来源fromIterator messageIterator 

流形成



最后,您可以使用此源执行 println (作为旁注:您的流量值实际上是 Sink ,因为 Flow + Sink = Sink )。使用问题中的 flow 值:

  messageSource runWith流

一个akka流处理所有消息。


I am using Akka Streams in Scala to poll from an AWS SQS queue using the AWS Java SDK. I created an ActorPublisher which dequeues messages on a two second interval:

class SQSSubscriber(name: String) extends ActorPublisher[Message] {
  implicit val materializer = ActorMaterializer()

  val schedule = context.system.scheduler.schedule(0 seconds, 2 seconds, self, "dequeue")

  val client = new AmazonSQSClient()
  client.setRegion(RegionUtils.getRegion("us-east-1"))
  val url = client.getQueueUrl(name).getQueueUrl

  val MaxBufferSize = 100
  var buf = Vector.empty[Message]

  override def receive: Receive = {
    case "dequeue" =>
      val messages = iterableAsScalaIterable(client.receiveMessage(new ReceiveMessageRequest(url).getMessages).toList
      messages.foreach(self ! _)
    case message: Message if buf.size == MaxBufferSize =>
      log.error("The buffer is full")
    case message: Message =>
      if (buf.isEmpty && totalDemand > 0)
        onNext(message)
      else {
        buf :+= message
        deliverBuf()
      }
    case Request(_) =>
      deliverBuf()
    case Cancel =>
      context.stop(self)
  }

  @tailrec final def deliverBuf(): Unit =
    if (totalDemand > 0) {
      if (totalDemand <= Int.MaxValue) {
        val (use, keep) = buf.splitAt(totalDemand.toInt)
        buf = keep
        use foreach onNext
      } else {
        val (use, keep) = buf.splitAt(Int.MaxValue)
        buf = keep
        use foreach onNext
        deliverBuf()
      }
    }
}

In my application, I am attempting to run the flow at a 2 second interval as well:

val system = ActorSystem("system")
val sqsSource = Source.actorPublisher[Message](SQSSubscriber.props("queue-name"))
val flow = Flow[Message]
  .map { elem => system.log.debug(s"${elem.getBody} (${elem.getMessageId})"); elem }
  .to(Sink.ignore)

system.scheduler.schedule(0 seconds, 2 seconds) {
  flow.runWith(sqsSource)(ActorMaterializer()(system))
}

However, when I run my application I receive java.util.concurrent.TimeoutException: Futures timed out after [20000 milliseconds] and subsequent dead letter notices which is caused by the ActorMaterializer.

Is there a recommended approach for continually materializing an Akka Stream?

解决方案

I don't think you need to create a new ActorPublisher every 2 seconds. This seems redundant and wasteful of memory. Also, I don't think an ActorPublisher is necessary. From what I can tell of the code, your implementation will have an ever growing number of Streams all querying the same data. Each Message from the client will be processed by N different akka Streams and, even worse, N will grow over time.

Iterator For Infinite Loop Querying

You can get the same behavior from your ActorPublisher by using scala's Iterator. It is possible to create an Iterator which continuously queries the client:

//setup the client
val client = {
  val sqsClient = new AmazonSQSClient()
  sqsClient setRegion (RegionUtils getRegion "us-east-1")
  sqsClient
}

val url = client.getQueueUrl(name).getQueueUrl

//single query
def queryClientForMessages : Iterable[Message] = iterableAsScalaIterable {
  client receiveMessage (new ReceiveMessageRequest(url).getMessages)
}

def messageListIteartor : Iterator[Iterable[Message]] = 
  Iterator continually messageListStream

//messages one-at-a-time "on demand", no timer pushing you around
def messageIterator() : Iterator[Message] = messageListIterator flatMap identity

This implementation only queries the client when all previous Messages have been consumed and is therefore truly reactive. No need to keep track of a buffer with fixed size. Your solution needs a buffer because the creation of Messages (via a timer) is de-coupled from the consumption of Messages (via println). In my implementation, creation & consumption are tightly coupled via back-pressure.

Akka Stream Source

You can then use this Iterator generator-function to feed an akka stream Source:

def messageSource : Source[Message, _] = Source fromIterator messageIterator

Flow Formation

And finally you can use this Source to perform the println (As a side note: your flow value is actually a Sink since Flow + Sink = Sink). Using your flow value from the question:

messageSource runWith flow

One akka Stream processing all messages.

这篇关于Akka流如何持续实现?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆