akka-stream相关内容

akka-http:完整的请求流

假设我已经设置了一个任意复杂的Flow[HttpRequest, HttpResponse, Unit]. 我已经可以使用上述流程来处理传入的请求 Http().bindAndHandle(flow, "0.0.0.0", 8080) 现在我想添加日志,利用一些现有的指令,比如 logRequestResult("my-service"){...}有没有办法将此指令与我的流程结合起来?我 ..
发布时间:2021-11-26 22:43:20 其他开发

如何限制 Akka Stream 每秒仅执行和发送一条消息一次?

我有一个 Akka Stream,我希望该流大约每秒向下游发送消息. 我尝试了两种方法来解决这个问题,第一种方法是让流开始的生产者只在有 Continue 消息进入这个 actor 时每秒发送一次消息. //当在 ActorPublisher 中收到 Continue 消息时//做工作然后...如果(总需求 > 0){导入 scala.concurrent.duration._cont ..
发布时间:2021-11-26 22:42:19 其他开发

使用 akka 流有条件地跳过流

我正在使用 akka 流,并且我有一段我需要有条件地跳过的图表,因为流程无法处理某些值.具体来说,我有一个接受字符串并发出 http 请求的流,但是当字符串为空时,服务器无法处理这种情况.但我只需要返回一个空字符串.有没有办法做到这一点而不必通过 http 请求知道它会失败?我基本上有这个: val source = Source("1", "2", "", "3", "4")val httpR ..
发布时间:2021-11-26 22:40:13 其他开发

如何在 PlayFramework 中使用 Akka Streams SourceQueue

我想使用 SourceQueue 将元素动态推送到 Akka Stream 源中.播放控制器需要一个 Source 才能使用 chunked 方法流式传输结果. 由于 Play 在幕后使用自己的 Akka Stream Sink,我无法使用 Sink 自己实现源队列,因为源将在被 chunked 方法使用之前被消耗(除非我使用跟随黑客). 如果我使用反应流发布者预先实现源队列,我就能让它工 ..
发布时间:2021-11-26 22:39:45 其他开发

使用 Akka Stream 从数据库流式传输记录

我有一个使用 Akka 的系统,该系统当前通过消息队列处理传入的流数据.当一条记录到达时,它被处理,mq 被确认并传递记录以在系统内进一步处理. 现在我想添加对使用数据库作为输入的支持. 输入源能够处理数据库的方法是什么(应该以接收器可以处理的速度流式传输 > 100M 记录 - 所以我假设是反应性/akka-streams?)? 解决方案 Slick Library 流畅的 ..
发布时间:2021-11-26 22:38:35 其他开发

Akka Streams:Mat 在 Source[out, Mat] 中代表什么

在 Akka 流中,Source[Out, Mat] 或 Sink[In, Mat] 中的 Mat 代表什么.什么时候会实际使用? 解决方案 Mat 类型参数表示该流的物化值的类型. 请记住,在 Akka Source、Flow、Sink(好吧,所有图)只是蓝图——它们不做任何处理他们自己,他们只描述了应该如何构造流.将这些蓝图转换为带有实时数据的工作流的过程称为物化. 实现流 ..
发布时间:2021-11-26 22:37:02 其他开发

如何创建一个可以稍后通过方法调用接收元素的源?

我想创建一个 Source 并稍后在其上推送元素,例如: val src = ...//在此处创建源//然后,做这样的事情推元素(x1,src)推元素(x2,src) 推荐的方法是什么? 谢谢! 解决方案 可以通过三种方式实现: 1.使用 SourceQueue 后期实现 您可以使用 Source.queue 将 Flow 实体化为 SourceQueue: cas ..
发布时间:2021-11-26 22:36:04 其他开发

Akka Stream Kafka 与 Kafka Streams

我目前正在使用 Akka Stream Kafka与 kafka 交互,我想知道与 Kafka Streams 有什么区别. 我知道基于 Akka 的方法实现了响应式规范并处理了 kafka 流似乎缺乏的背压功能. 与 akka 流 kafka 相比,使用 kafka 流有什么优势? 解决方案 你的问题很笼统,我就从我的角度给出一个笼统的答案. 首先,我有两个使用场景: ..

如何将 akka 流 kafka(reactive-kafka)集成到 akka http 应用程序中?

我有一个基本的 scala akka http CRUD 应用程序.相关课程见下文. 我只想在例如创建/更新实体时将实体 ID 和一些数据(作为 json)写入 Kafka 主题. 我正在查看 http://doc.akka.io/docs/akka-stream-kafka/current/producer.html,但我是 scala 和 akka 的新手,不确定如何将它集成到我的 ..
发布时间:2021-11-12 02:43:43 其他开发

使用 akka-stream 两次使用源代码

我正在将 Play 框架用于我构建的 Web 应用程序.Play 2.5 使用 Akka Stream API 来允许流式传输请求/响应. 我有一个端点,可以将传入的文件直接流式传输到 Google 云端硬盘. 我定义了一个看起来像这样的 BodyParser: BodyParser("toDrive") { request =>Accumulator.source[ByteStri ..
发布时间:2021-10-26 18:31:58 其他开发

为什么Akka TCP流服务器在connection.handlewith没有流量时断开客户端?

我正在寻找对我使用以下代码看到的行为的解释.当 conn.handleWith 被注释掉,我用 netcat 建立的 TCP 客户端连接,连接,并在几秒钟内显示被对等断开(即服务器断开连接).当代码中存在 conn.handleWith 时,我看不到断开连接.我最初认为这与为服务器设置的空闲超时有关,但事实并非如此. 那么为什么服务器在没有流量处理连接的情况下会断开客户端? 包com.e ..
发布时间:2021-10-26 18:31:55 其他开发

如何读取akka-http中的查询参数?

我知道 akka-http 库在处理请求时对类类型进行编组和解组.但是现在,我需要读取 GET 请求的请求参数.我尝试了 parameter() 方法,它返回 ParamDefAux 类型,但我需要这些值作为字符串类型 我检查以下问题的答案. 我该如何解析在喷雾路由中获取请求参数? 查询参数对于使用 Akka HTTP(正式名称为 Spray)的 GET 请求 但不能做我需 ..
发布时间:2021-10-26 18:31:40 其他开发

使用 Scala 和 Akka 在不丢失数据的情况下使用服务器发送的事件(SSE)

我想在生产速率为 > 时消费 SSE 事件而不丢失任何数据.消费率.由于 SSE 支持背压 Akka 应该可以做到.我尝试了几种不同的方法,但额外的消息被丢弃了. @单身人士类 SseConsumer @Inject()()(隐式 ec: ExecutionContext) {隐式 val 系统 = ActorSystem()val 发送:HttpRequest =>未来[HttpRespons ..

如何在 Akka Streams 中的 GraphStage 内计算聚合?

我在 Akka 流中有一个运算符/组件,旨在在 5 秒的窗口内计算一个值.因此,我使用 TimerGraphStageLogic 创建了我的操作符/组件,您可以在下面的代码中看到它.为了测试它,我创建了 2 个源,一个递增,另一个递减,然后使用 Merge 形状合并它们,然后使用 windowFlowShape,最后以 Sink 形状发射它们.我确保 TimerGraphStageLogic 正在 ..
发布时间:2021-10-26 18:31:16 其他开发

Akka-Streams 收集数据 (Source -> Flow -> Flow (collect) -> Sink)

我对 Scala 和 Akka 完全陌生.我有一个简单的 RunnableFlow: 源代码 ->流程(做一些转换)->Sink.runForeach 现在我想要这样的东西: 源代码 ->Flow1(做一些转换)->Flow2(做一些转换)->Sink.runForeach 但是 Flow2 应该等到 Flow1 中的 100 个元素可用,然后将这 100 个元素转换为一个新元素(需要 F ..
发布时间:2021-07-15 20:56:20 其他开发

Akka Stream 连接到多个接收器

我在 akka 流中实现了一个自定义组件,它将元素作为输入,根据一个键对它们进行分组和合并,然后通过十几个出口之一将它们发送出去.您可以将此组件视为一种 GroupBy 组件,它不会将流划分为子流,而是实际的流.除了对传入元素进行分区之外,它还将它们合并为一个元素,即在组件内部发生了一些缓冲,使得 1 个元素进入并不一定意味着 1 个元素通过出口传出. 以下是所述组件的简化实现. clas ..
发布时间:2021-07-15 20:50:18 其他开发