akka-stream相关内容
假设我已经设置了一个任意复杂的Flow[HttpRequest, HttpResponse, Unit]. 我已经可以使用上述流程来处理传入的请求 Http().bindAndHandle(flow, "0.0.0.0", 8080) 现在我想添加日志,利用一些现有的指令,比如 logRequestResult("my-service"){...}有没有办法将此指令与我的流程结合起来?我
..
我正在按照 webSocketClientFlow. 示例代码是: import akka.actor.ActorSystem导入 akka.Done导入 akka.http.scaladsl.Http导入 akka.stream.ActorMaterializer导入 akka.stream.scaladsl._导入 akka.http.scaladsl.model._导入 akka.h
..
我有一个 Akka Stream,我希望该流大约每秒向下游发送消息. 我尝试了两种方法来解决这个问题,第一种方法是让流开始的生产者只在有 Continue 消息进入这个 actor 时每秒发送一次消息. //当在 ActorPublisher 中收到 Continue 消息时//做工作然后...如果(总需求 > 0){导入 scala.concurrent.duration._cont
..
我正在使用 akka 流,并且我有一段我需要有条件地跳过的图表,因为流程无法处理某些值.具体来说,我有一个接受字符串并发出 http 请求的流,但是当字符串为空时,服务器无法处理这种情况.但我只需要返回一个空字符串.有没有办法做到这一点而不必通过 http 请求知道它会失败?我基本上有这个: val source = Source("1", "2", "", "3", "4")val httpR
..
我想使用 SourceQueue 将元素动态推送到 Akka Stream 源中.播放控制器需要一个 Source 才能使用 chunked 方法流式传输结果. 由于 Play 在幕后使用自己的 Akka Stream Sink,我无法使用 Sink 自己实现源队列,因为源将在被 chunked 方法使用之前被消耗(除非我使用跟随黑客). 如果我使用反应流发布者预先实现源队列,我就能让它工
..
我有一个使用 Akka 的系统,该系统当前通过消息队列处理传入的流数据.当一条记录到达时,它被处理,mq 被确认并传递记录以在系统内进一步处理. 现在我想添加对使用数据库作为输入的支持. 输入源能够处理数据库的方法是什么(应该以接收器可以处理的速度流式传输 > 100M 记录 - 所以我假设是反应性/akka-streams?)? 解决方案 Slick Library 流畅的
..
我正在尝试使用 Akka HTTP 2.0-M2 编写用于批量数据上传的工具.但我面临 akka.stream.OverflowStrategy$Fail$BufferOverflowException: Exceeded configured max-open-requests value of [32] 错误. 我试图隔离一个问题,这里是同样失败的示例代码: 公共类 TestMaxRe
..
在 Akka 流中,Source[Out, Mat] 或 Sink[In, Mat] 中的 Mat 代表什么.什么时候会实际使用? 解决方案 Mat 类型参数表示该流的物化值的类型. 请记住,在 Akka Source、Flow、Sink(好吧,所有图)只是蓝图——它们不做任何处理他们自己,他们只描述了应该如何构造流.将这些蓝图转换为带有实时数据的工作流的过程称为物化. 实现流
..
我正在尝试使用 Source.actorRef 方法来创建一个 akka.stream.scaladsl.Source 对象.形式的东西 import akka.stream.OverflowStrategy.fail导入 akka.stream.scaladsl.Sourcecase class Weather(zip : String, temp : Double,raining : Boo
..
我想创建一个 Source 并稍后在其上推送元素,例如: val src = ...//在此处创建源//然后,做这样的事情推元素(x1,src)推元素(x2,src) 推荐的方法是什么? 谢谢! 解决方案 可以通过三种方式实现: 1.使用 SourceQueue 后期实现 您可以使用 Source.queue 将 Flow 实体化为 SourceQueue: cas
..
我目前正在使用 Akka Stream Kafka与 kafka 交互,我想知道与 Kafka Streams 有什么区别. 我知道基于 Akka 的方法实现了响应式规范并处理了 kafka 流似乎缺乏的背压功能. 与 akka 流 kafka 相比,使用 kafka 流有什么优势? 解决方案 你的问题很笼统,我就从我的角度给出一个笼统的答案. 首先,我有两个使用场景:
..
我有一个从 SupervisorActor 内部创建的 Actor,这个 Actor 负责将它获取的消息推送到流中.这是演员: class KafkaPublisher[T log.info(s"Activated stream for Kafka Producer with ActorName >> ${self.path.name} > ${self.path}
..
我有一个基本的 scala akka http CRUD 应用程序.相关课程见下文. 我只想在例如创建/更新实体时将实体 ID 和一些数据(作为 json)写入 Kafka 主题. 我正在查看 http://doc.akka.io/docs/akka-stream-kafka/current/producer.html,但我是 scala 和 akka 的新手,不确定如何将它集成到我的
..
我正在将 Play 框架用于我构建的 Web 应用程序.Play 2.5 使用 Akka Stream API 来允许流式传输请求/响应. 我有一个端点,可以将传入的文件直接流式传输到 Google 云端硬盘. 我定义了一个看起来像这样的 BodyParser: BodyParser("toDrive") { request =>Accumulator.source[ByteStri
..
我正在寻找对我使用以下代码看到的行为的解释.当 conn.handleWith 被注释掉,我用 netcat 建立的 TCP 客户端连接,连接,并在几秒钟内显示被对等断开(即服务器断开连接).当代码中存在 conn.handleWith 时,我看不到断开连接.我最初认为这与为服务器设置的空闲超时有关,但事实并非如此. 那么为什么服务器在没有流量处理连接的情况下会断开客户端? 包com.e
..
我知道 akka-http 库在处理请求时对类类型进行编组和解组.但是现在,我需要读取 GET 请求的请求参数.我尝试了 parameter() 方法,它返回 ParamDefAux 类型,但我需要这些值作为字符串类型 我检查以下问题的答案. 我该如何解析在喷雾路由中获取请求参数? 查询参数对于使用 Akka HTTP(正式名称为 Spray)的 GET 请求 但不能做我需
..
我想在生产速率为 > 时消费 SSE 事件而不丢失任何数据.消费率.由于 SSE 支持背压 Akka 应该可以做到.我尝试了几种不同的方法,但额外的消息被丢弃了. @单身人士类 SseConsumer @Inject()()(隐式 ec: ExecutionContext) {隐式 val 系统 = ActorSystem()val 发送:HttpRequest =>未来[HttpRespons
..
我在 Akka 流中有一个运算符/组件,旨在在 5 秒的窗口内计算一个值.因此,我使用 TimerGraphStageLogic 创建了我的操作符/组件,您可以在下面的代码中看到它.为了测试它,我创建了 2 个源,一个递增,另一个递减,然后使用 Merge 形状合并它们,然后使用 windowFlowShape,最后以 Sink 形状发射它们.我确保 TimerGraphStageLogic 正在
..
我对 Scala 和 Akka 完全陌生.我有一个简单的 RunnableFlow: 源代码 ->流程(做一些转换)->Sink.runForeach 现在我想要这样的东西: 源代码 ->Flow1(做一些转换)->Flow2(做一些转换)->Sink.runForeach 但是 Flow2 应该等到 Flow1 中的 100 个元素可用,然后将这 100 个元素转换为一个新元素(需要 F
..
我在 akka 流中实现了一个自定义组件,它将元素作为输入,根据一个键对它们进行分组和合并,然后通过十几个出口之一将它们发送出去.您可以将此组件视为一种 GroupBy 组件,它不会将流划分为子流,而是实际的流.除了对传入元素进行分区之外,它还将它们合并为一个元素,即在组件内部发生了一些缓冲,使得 1 个元素进入并不一定意味着 1 个元素通过出口传出. 以下是所述组件的简化实现. clas
..