Akka流如何持续实现? [英] How can Akka streams be materialized continually?
问题描述
我正在使用 Akka在Scala中流式传输,以使用
class SQSSubscriber(name:String)扩展了ActorPublisher [Message] {
隐式val实现器= ActorMaterializer()
val调度= context.system.scheduler.schedule(0秒,2秒,自身,出队)
val客户端=新的AmazonSQSClient()
client.setRegion(RegionUtils.getRegion( us-east-1))
val url = client.getQueueUrl(name).getQueueUrl
val MaxBufferSize = 100
var buf = Vector.empty [Message]
覆盖def接收:Receive = {
case dequeue =>
val消息= iterableAsScalaIterable(client.receiveMessage(新ReceiveMessageRequest(url).getMessages).toList
messages.foreach(self!_)
案例消息:buf.size == MaxBufferSize时的消息=>
log.error(缓冲区已满)
案例消息:消息=>
if(buf.isEmpty&& totalDemand> 0)
onNext(消息)
else {
buf:+ =消息
deliveryBuf()
}
case Request(_)=>
deliveryBuf( )
case Cancel =>
context.stop(self)
}
@tailrec final def deliveryBuf():Unit =
if(totalDemand > 0){
if(totalDemand< = Int.MaxValue){
val(use,keep)= buf.splitAt(totalDemand.toInt)
buf =保持
使用foreach onNext
}否则{
val(use,keep)= buf.splitAt(Int.MaxValue)
buf =保持
使用foreach onNext
DeliveryBuf()
}
}
}
在我的应用程序中,我也尝试每隔2秒运行一次该流:
val system = ActorSystem( system)
val sqsSource = Source.actorPublisher [Message](SQSSubscriber.props( queue-name))
val flow = Flow [Message]
.map {elem => system.log.debug(s $ {elem.getBody}($ {elem.getMessageId})); elem}
.to(Sink.ignore)
system.scheduler.schedule(0 seconds,2 seconds){
flow.runWith(sqsSource)(ActorMaterializer()(system ))
}
但是,当我运行我的应用程序时,我收到 java.util.concurrent.TimeoutException:期货在[20000毫秒]
和随后的 ActorMaterializer
引起的死信通知之后超时。 / p>
是否存在推荐的方法来持续实现Akka流?
我认为您不需要每2秒创建一个新的 ActorPublisher
。这似乎是多余的,浪费内存。另外,我认为不需要ActorPublisher。据我所了解的代码,您的实现将具有越来越多的Streams,它们都查询相同的数据。来自客户端的每条消息
将由N个不同的akka流进行处理,更糟糕的是,N将随着时间的流逝而增长。
无限循环查询迭代器
您可以使用scala的 Iterator从ActorPublisher中获得相同的行为
。可以创建一个不断查询客户端的迭代器:
//设置客户端
val client = {
val sqsClient =新的AmazonSQSClient()
sqsClient setRegion(RegionUtils getRegion us-east-1)
sqsClient
}
val url = client .getQueueUrl(name).getQueueUrl
//单个查询
def queryClientForMessages:Iterable [Message] = iterableAsScalaIterable {
client receiveMessage(new ReceiveMessageRequest(url).getMessages)
}
def messageListIteartor:Iterator [Iterable [Message]] =
迭代器不断messageListStream
//一次按需发送消息,没有计时器会迫使您绕
def messageIterator():Iterator [Message] = messageListIterator flatMap身份
此实现仅在消耗完所有先前的消息后才查询客户端,因此真正是反应性。无需跟踪固定大小的缓冲区。您的解决方案需要一个缓冲区,因为创建消息(通过计时器)与使用消息(通过println)是分离的。在我的实现中,创建和消耗是通过反压紧密耦合的。 / p>
Akka流源
然后,您可以使用此Iterator生成器函数进行供稿一个akka流来源:
def messageSource:Source [Message,_] =来源fromIterator messageIterator
流形成
最后,您可以使用此源执行 println
(作为旁注:您的流量
值实际上是 Sink
,因为 Flow + Sink = Sink
)。使用问题中的 flow
值:
messageSource runWith流
一个akka流处理所有消息。
I am using Akka Streams in Scala to poll from an AWS SQS queue using the AWS Java SDK. I created an ActorPublisher which dequeues messages on a two second interval:
class SQSSubscriber(name: String) extends ActorPublisher[Message] {
implicit val materializer = ActorMaterializer()
val schedule = context.system.scheduler.schedule(0 seconds, 2 seconds, self, "dequeue")
val client = new AmazonSQSClient()
client.setRegion(RegionUtils.getRegion("us-east-1"))
val url = client.getQueueUrl(name).getQueueUrl
val MaxBufferSize = 100
var buf = Vector.empty[Message]
override def receive: Receive = {
case "dequeue" =>
val messages = iterableAsScalaIterable(client.receiveMessage(new ReceiveMessageRequest(url).getMessages).toList
messages.foreach(self ! _)
case message: Message if buf.size == MaxBufferSize =>
log.error("The buffer is full")
case message: Message =>
if (buf.isEmpty && totalDemand > 0)
onNext(message)
else {
buf :+= message
deliverBuf()
}
case Request(_) =>
deliverBuf()
case Cancel =>
context.stop(self)
}
@tailrec final def deliverBuf(): Unit =
if (totalDemand > 0) {
if (totalDemand <= Int.MaxValue) {
val (use, keep) = buf.splitAt(totalDemand.toInt)
buf = keep
use foreach onNext
} else {
val (use, keep) = buf.splitAt(Int.MaxValue)
buf = keep
use foreach onNext
deliverBuf()
}
}
}
In my application, I am attempting to run the flow at a 2 second interval as well:
val system = ActorSystem("system")
val sqsSource = Source.actorPublisher[Message](SQSSubscriber.props("queue-name"))
val flow = Flow[Message]
.map { elem => system.log.debug(s"${elem.getBody} (${elem.getMessageId})"); elem }
.to(Sink.ignore)
system.scheduler.schedule(0 seconds, 2 seconds) {
flow.runWith(sqsSource)(ActorMaterializer()(system))
}
However, when I run my application I receive java.util.concurrent.TimeoutException: Futures timed out after [20000 milliseconds]
and subsequent dead letter notices which is caused by the ActorMaterializer
.
Is there a recommended approach for continually materializing an Akka Stream?
I don't think you need to create a new ActorPublisher
every 2 seconds. This seems redundant and wasteful of memory. Also, I don't think an ActorPublisher is necessary. From what I can tell of the code, your implementation will have an ever growing number of Streams all querying the same data. Each Message
from the client will be processed by N different akka Streams and, even worse, N will grow over time.
Iterator For Infinite Loop Querying
You can get the same behavior from your ActorPublisher by using scala's Iterator
. It is possible to create an Iterator which continuously queries the client:
//setup the client
val client = {
val sqsClient = new AmazonSQSClient()
sqsClient setRegion (RegionUtils getRegion "us-east-1")
sqsClient
}
val url = client.getQueueUrl(name).getQueueUrl
//single query
def queryClientForMessages : Iterable[Message] = iterableAsScalaIterable {
client receiveMessage (new ReceiveMessageRequest(url).getMessages)
}
def messageListIteartor : Iterator[Iterable[Message]] =
Iterator continually messageListStream
//messages one-at-a-time "on demand", no timer pushing you around
def messageIterator() : Iterator[Message] = messageListIterator flatMap identity
This implementation only queries the client when all previous Messages have been consumed and is therefore truly reactive. No need to keep track of a buffer with fixed size. Your solution needs a buffer because the creation of Messages (via a timer) is de-coupled from the consumption of Messages (via println). In my implementation, creation & consumption are tightly coupled via back-pressure.
Akka Stream Source
You can then use this Iterator generator-function to feed an akka stream Source:
def messageSource : Source[Message, _] = Source fromIterator messageIterator
Flow Formation
And finally you can use this Source to perform the println
(As a side note: your flow
value is actually a Sink
since Flow + Sink = Sink
). Using your flow
value from the question:
messageSource runWith flow
One akka Stream processing all messages.
这篇关于Akka流如何持续实现?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!