合并管道不接收值 [英] Combine pipeline not receiving values

查看:97
本文介绍了合并管道不接收值的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

在下面的代码中,它是更为复杂的管道的简化版本,其中完成处理"从来没有要求2.

In the following code, which is a simplified version of a more elaborate pipeline, "Done processing" is never called for 2.

那是为什么?

我怀疑这是由于需求引起的问题,但我无法找出原因.

I suspect this is a problem due to the demand, but I cannot figure out the cause.

请注意,如果我删除 combineLatest() compactMap(),则正确处理了值2(但是我需要这些 combineLatest compactMap 的正确性,在我的真实示例中,它们涉及更多).

Note that if I remove the combineLatest() or the compactMap(), the value 2 is properly processed (but I need these combineLatest and compactMap for correctness, in my real example they are more involved).

var cancellables = Set<AnyCancellable>([])

func process<T>(_ value: T) -> AnyPublisher<T, Never> {
    return Future<T, Never> { promise in
        print("Starting processing of \(value)")
        DispatchQueue.global(qos: .userInitiated).asyncAfter(deadline: .now() + 0.1) {
            promise(.success(value))
        }
    }.eraseToAnyPublisher()
}

let s = PassthroughSubject<Int?, Never>()

s
    .print("Combine->Subject")
    .combineLatest(Just(true))
    .print("Compact->Combine")
    .compactMap { value, _ in value }
    .print("Sink->Compact")
    .flatMap(maxPublishers: .max(1)) { process($0) }
    .sink {
        print("Done processing \($0)")
    }
    .store(in: &cancellables)

s.send(nil)

// Give time for flatMap to finish
Thread.sleep(forTimeInterval: 1)
s.send(2)

推荐答案

这听起来像是 combineLatest 的错误.当下游请求同步地"附加需求时,则请求附加".(根据- print 发布者的输出),该需求不会向上游流动.

It sounds like a bug of combineLatest. When a downstream request additional demand "synchronously" (as per-print publisher output), that demand doesn't flow upstream.

克服此问题的一种方法是将 combineLatest 的下游包装在 flatMap 中:

One way to overcome this is to wrap the downstream of combineLatest in a flatMap:

s
    .combineLatest(Just(true))
    .flatMap(maxPublishers: .max(1)) {
        Just($0)
          .compactMap { value, _ in value }
          .flatMap { process($0) }
    }
    .sink {
        print("Done processing \($0)")
    }
    .store(in: &cancellables)

外部 flatMap 现在会创建背压,而内部 flatMap 不再需要背压.

The outer flatMap now creates the back pressure, and the inner flatMap doesn't need it anymore.

这篇关于合并管道不接收值的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆