akka相关内容

Akka - 对象序列化期间的 StackOverflowError

我遇到了超过 2 天的问题.当我在参与者之间交换消息时,正在指责 JVM 堆栈溢出. 我的消息是一个有很多链接的对象(在一个链表中链接在一起的 10000 多个子对象).即,具有 Neo4J 关系的对象. 错误是这样的: java.lang.StackOverflowError在 java.io.Bits.putLong(Bits.java:108)在 java.io.ObjectO ..
发布时间:2021-12-28 17:27:48 Java开发

使用可运行的 jar 运行 akka

我正在尝试使用 NetBeans 在 java maven 项目中实现 akka.当我从 NetBeans 运行它时它运行良好,但是当我从 NetBeans 运行可运行的 jar 时,它会产生错误. 线程“main"com.typesafe.config.ConfigException$Missing 中的异常:找不到密钥“akka.remote.log-received-messages"的配 ..
发布时间:2021-12-26 15:14:40 Java开发

Akka Stream Source.queue 的背压策略不起作用

我试图理解为什么下面的代码片段正在做它正在做的事情.我会认为,因为接收器不能比源生成内容更快地产生需求,那么我会收到丢弃的消息以响应某些报价(溢出策略设置为丢弃缓冲区)以及错误和队列关闭消息在自毁片之后. 片段: 打包游乐场导入 java.time.LocalDateTime导入 java.util.concurrent.atomic.AtomicInteger导入 akka.actor. ..
发布时间:2021-11-26 22:51:37 其他开发

如何在 Akka Stream 中记录流速?

我有一个带有单个流/图的 Akka Stream 应用程序.我想测量源头的流速并每 5 秒记录一次,例如“在过去 5 秒内收到 3 条消息".我试过了, someOtherFlow.groupedWithin(Integer.MAX_VALUE, 5 秒).runForeach(seq =>log.debug(s"在过去 5 秒内收到 ${seq.length} 条消息")) 但只在有消息时输 ..
发布时间:2021-11-26 22:51:29 其他开发

使用 TCP 流并将其重定向到另一个 Sink(使用 Akka Streams)

我尝试使用 Akka 2.4.3 将 TCP 流重定向/转发到另一个接收器.该程序应该打开一个服务器套接字,侦听传入的连接,然后使用 tcp 流.我们的发件人不期望/接受我们的回复,所以我们从不发回任何东西——我们只是消费流.在对 tcp 流进行成帧后,我们需要将字节转换为更有用的内容并将其发送到 Sink. 到目前为止,我尝试了以下操作,但我尤其在如何不将 tcp 数据包发送回发件人并正确 ..
发布时间:2021-11-26 22:51:18 其他开发

使用 Akka-Streams HTTP 将整个 HttpResponse 主体作为字符串获取

我正在尝试了解如何使用新的 akka.http 库.我想向服务器发送一个 http 请求并将整个响应正文作为单个字符串读取,以生成一个 Source[String,?]. 这是迄今为止我能够产生的最佳解决方案: def get(型号ID:字符串,池:流[(HttpRequest,Int),(Try[HttpResponse],Int),Http.HostConnectionPool]): ..
发布时间:2021-11-26 22:51:07 其他开发

使用 Akka Streams 读取 CSV 文件

我正在读取一个 csv 文件.我正在使用 Akka Streams 来执行此操作,以便我可以创建要在每一行上执行的操作图.我已经启动并运行了以下玩具示例. def main(args: Array[String]): Unit = {隐式 val 系统 = ActorSystem("MyAkkaSystem")隐式 val materializer = ActorMaterializer()v ..
发布时间:2021-11-26 22:50:48 其他开发

端到端反应式流 RESTful 服务(又名 Back-Pressure over HTTP)

我在网上尝试澄清这个问题有一段时间没有成功,所以我会尝试在这里提问. 我想找到一些资源或示例,其中展示了如何构建端到端的完全背压式 REST 服务 + 客户端.我的意思是,我希望看到,给定一个实现 Reactive Streams 的 REST 客户端(无论是在 Akka、JS 还是其他中),我将拥有(并且能够“可视化")整个过程中处理的背压构建的 REST 服务器,例如使用 Akka-Ht ..
发布时间:2021-11-26 22:50:28 其他开发

Akka 流:读取多个文件

我有一个文件列表.我要: 将所有这些作为单一来源读取. 文件应该按顺序读取.(无循环) 任何时候都不应该要求任何文件完全在内存中. 从文件读取错误应该折叠流. 感觉这应该可行:(Scala,akka-streams v2.4.7) val sources = Seq("file1", "file2").map(new File(_)).map(f => FileIO.fromP ..
发布时间:2021-11-26 22:49:14 其他开发

如何将 Source 动态添加到现有 Graph?

有什么可以替代动态改变运行图?这是我的情况.我有将文章摄取到数据库中的图表.文章来自 3 个不同格式的插件.因此我有几个流程 val converterFlow1: Flow[ImpArticle, Article, NotUsed]val converterFlow2: Flow[NewsArticle, Article, NotUsed]val sinkDB:​​Sink[文章,未来[完成] ..
发布时间:2021-11-26 22:49:04 其他开发

基于 akka 流条件的替代流

有一个带有自定义流的流,在某个阶段我想拆分流并有两个替代数据处理,稍后将再次合并. 例如 ->F3 ->F6源 ->F1 ->F2>合并 ->下沉->F4 ->F5 F2 应该有一个条件,如果数据包含格式A,那么它应该去流程F3,否则去F4. 据我所知,每个流在每个方向上只能有一个端口(如果是双向的,则有两个)——那么我如何支持这样的流? 解决方案 您可以使用 Broad ..
发布时间:2021-11-26 22:48:55 其他开发

Akka Stream 的 Keep right/left/both 如何导致不同的输出?

我正在尝试了解 Keep 在 Akka 流中的工作方式.阅读 保持在 akka 流中意味着什么 中的答案,我明白它有助于控制我们从具体化器的左侧/右侧/两侧获得结果.但是,我仍然无法构建一个示例,因为我可以更改左/右的值并获得不同的结果. 例如 隐式val系统:ActorSystem = ActorSystem(“Playground")隐式 val 物化器:ActorMaterializ ..
发布时间:2021-11-26 22:48:43 其他开发

Akka Stream + Akka Http - 获取错误请求

我有以下流,效果很好: 源代码.map(x => HttpRequest(uri = x.rawRequest)).via(Http().outgoingConnection(host, port)).to(Sink.actorRef(myActor, IsDone)).跑() 还有一个简单的actor来处理流完成时的响应状态和最终消息: /*** 一个简单的actor来计算已经处理了多少行 ..
发布时间:2021-11-26 22:46:46 其他开发

如何在响应流中将消息从订阅者发送到 Web 套接字连接中的发布者

我的应用程序有一个 Akka-Websocket 接口.Web 套接字由参与者订阅者和参与者发布者组成.订阅者通过将命令发送给相应的参与者来处理命令.发布者侦听事件流并将更新信息发布回流(并最终发布到客户端).这很好用. 我的问题:订阅者如何将事件发送回流?例如确认接收到的命令的执行. public class WebSocketApp extends HttpApp {private s ..
发布时间:2021-11-26 22:46:24 其他开发

使用 Akka HTTP(正式名称为 Spray)的 GET 请求的查询参数

Akka HTTP(正式名称为 Spray)的功能之一是它能够自动将数据从 json 来回编组和解组到 case 类等中.我已经成功地让它正常工作. > 目前,我正在尝试制作一个使用查询参数执行 GET 请求的 HTTP 客户端.目前的代码如下所示: val httpResponse: Future[HttpResponse] =Http().singleRequest(HttpReques ..
发布时间:2021-11-26 22:46:12 其他开发