如何仅从反应流中发出累积总和? [英] How to emit cumulative sum only from a reactive stream?

查看:68
本文介绍了如何仅从反应流中发出累积总和?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个用例,流应该只在累积总和"大于等于 1 时才发出.等于或超过给定值 n.我们以 n = 5 的六个整数为例.

+---+------+-------+|我|发出 |总和 |+---+------+---------+|1 |- |1 ||2 |- |3 ||3 |5 |1 ||4 |5 |0 ||5 |5 |0 ||2 |2 |0(结束)|+---+------+---------+

如您所见,除非总和等于或超过 5,否则不会发出任何内容,除了最后一个元素,无论如何它都会发出.

一旦发出一个项目,总和就会减少那个值 (n).实际上,我正在从网络调用中读取数据,然后将它们发送给下游消费者,该消费者只接受固定大小的块,当然除了最后一个(上游已完成).

我正在使用项目反应堆 Flux 作为 Publisher;我找不到任何方法可以让我执行上面显示的操作.scan 最接近,但它也发出需要过滤掉的中间元素.

解决方案

实际上,我正在从网络调用中读取数据,然后将它们发送给只接受固定大小的下游消费者块,当然除了最后一个(上游完成).

我突然想到,自己尝试拆分响应 Flux 可能有点晚,而且相当困难;相反,我可以使用 Netty FixedLengthFrameDecoder 之类的东西,这正是我正在寻找的.

这让我找到了 reactor-netty 源代码,经过大量挖掘,我发现正是我需要的.

fun get(url: String, maxChunkSize: Int): List{返回 HttpClient.create().httpResponseDecoder { it.maxChunkSize(maxChunkSize) }.得到().uri(网址).responseContent().asByteArray().collectList().堵塞()!!}

关键部分是 httpResponseDecoder { it.maxChunkSize(maxChunkSize) };单元测试证明这是有效的:

@Test有趣的 testHonorsMaxChunkSize() {val maxChunkSize = 4096val 块 = FixedLengthResponseFrameClient.get(http://doesnotexist.nowhere/binary",maxChunkSize)assertThat(chunks.subList(0, chunks.size - 1)).allMatch { it.size == maxChunkSize}assertThat(chunks.last().size).isLessThanOrEqualTo(maxChunkSize)}

WebClient 可以使用自定义的 HttpClient(使用 httpResponseDecoder 配置)进行配置,如下所示:

WebClient.builder().clientConnector(ReactorClientHttpConnector(httpClient)).建造().得到().uri(uri").交换().flatMapMany { it.body(BodyExtractors.toDataBuffers()) }...

这些缓冲区的大小将在 HttpClient.httpResponseDecoder 中设置(默认为 8192 Kb).

I've a use case where the stream should only emit when the cumulative "sum" equals or exceeds a given value, n. Let's take the example of six integers with n = 5.

+---+------+---------+
| i | Emit |   Sum   |
+---+------+---------+
| 1 |    - | 1       |
| 2 |    - | 3       |
| 3 |    5 | 1       |
| 4 |    5 | 0       |
| 5 |    5 | 0       |
| 2 |    2 | 0 (end) |
+---+------+---------+

As you can see, nothing is emitted unless the sum equals or exceeds 5, except for the last element, which is emitted anyway.

Once an item is emitted, the sum gets reduced by that value (n). In reality, I'm reading data from a network call, and subsequently sending them to a downstream consumer who only accepts fixed size chunks, except for the last one, of course (upstream completed).

I'm using project Reactor Flux as the Publisher; I couldn't find any method on it that allows me do what is shown above. scan comes closest, but it also emits intermediate elements that need to be filtered out.

解决方案

In reality, I'm reading data from a network call, and subsequently sending them to a downstream consumer who only accepts fixed size chunks, except for the last one, of course (upstream completed).

It occurred to me that trying to split the response Flux myself is probably little late and quite difficult; instead, I could use something like Netty FixedLengthFrameDecoder, which does exactly what I'm looking for.

That led me to reactor-netty source code, and after extensive digging, I found exactly what I needed.

fun get(url: String, maxChunkSize: Int): List<ByteArray> {
    return HttpClient.create()
        .httpResponseDecoder { it.maxChunkSize(maxChunkSize) }
        .get()
        .uri(url)
        .responseContent()
        .asByteArray()
        .collectList()
        .block()!!
}

The crucial part is httpResponseDecoder { it.maxChunkSize(maxChunkSize) }; a unit test proves this to be working:

@Test

fun testHonorsMaxChunkSize() {
    val maxChunkSize = 4096
    val chunks = FixedLengthResponseFrameClient.get(
        "http://doesnotexist.nowhere/binary", maxChunkSize
    )

    assertThat(chunks.subList(0, chunks.size - 1))
        .allMatch { it.size ==  maxChunkSize}
    assertThat(chunks.last().size).isLessThanOrEqualTo(maxChunkSize)
}

WebClient can be configured with a custom HttpClient (configured with httpResponseDecoder) as shown below:

WebClient
  .builder()
  .clientConnector(ReactorClientHttpConnector(httpClient))
  .build()
  .get()
  .uri("uri")
  .exchange()
  .flatMapMany { it.body(BodyExtractors.toDataBuffers()) }
  ...

The size of these buffers would be what's set in the HttpClient.httpResponseDecoder (8192 Kb by default).

这篇关于如何仅从反应流中发出累积总和?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆