为什么`Publishers.Map`会急切地消耗上游值? [英] Why does `Publishers.Map` consume upstream values eagerly?

查看:43
本文介绍了为什么`Publishers.Map`会急切地消耗上游值?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

假设我有一个自定义订阅者,该订阅者在订阅时请求一个值,然后在收到前一个值三秒钟后再请求一个附加值:

  class MySubscriber:订阅者{typealias输入=整数typealias失败=永不私人var订阅:订阅?func receive(订阅:订阅){打印(已订阅")self.subscription =订阅subscription.request(.max(1))}func receive(_ input:Int)->Subscribers.Demand {print(值:\(输入)")DispatchQueue.main.asyncAfter(最后期限:.now()+ .seconds(3)){self.subscription?.request(.max(1))}返回.none}func receive(completion:Subscribers.Completion< Never>){打印(完成")订阅=零}} 

如果我使用它来订阅无限范围的发布者,则可以很好地处理背压,发布者每次等待3秒钟,直到收到下一个发送值的要求:

 (1 ...).publisher.subscribe(MySubscriber())//无限地打印值,每个值之间间隔〜3秒:////已订阅//值:1//值:2//值:3//... 

但是,如果我添加 map 运算符,则 MySubscriber 甚至都不会收到订阅.收到订阅后, map 似乎已同步请求了 Demand.Unlimited ,并且当 map 尝试耗尽无限范围时,应用程序会无限旋转:

 (1 ...).发布者.map {print("Map:\(value)")返回值* 2}.subscribe(MySubscriber())//`map`转换无限制地无限执行:////地图:1//地图:2//地图:3//... 

我的问题是,为什么 map 这样行?我希望 map 仅将其下游需求传递给上游.由于 map 应该用于转换而不是副作用,所以我不了解其当前行为的用例是什么.

编辑

我实现了一个版本的地图,以显示我认为它应该如何工作:

 扩展发布商{struct MapLazily<上游:发布者,输出> ;:发布者{typealias失败=上游失败让上游:上游让变换:(Upstream.Output)->输出init(上游:上游,转换:@转义(Upstream.Output)->输出){self.upstream =上游self.transform =变换}公共功能接收(订户:S),其中S.Input ==输出,S.Failure == Upstream.Failure {let mapSubscriber = Subscribers.LazyMapSubscriber(下游:订户,变换:变换)上游.接收(订阅者:mapSubscriber)}}}扩展订阅者{类LazyMapSubscriber<输入,下游订阅者:订阅者> ;:订阅者{让下游:下游订阅者让变换:(输入)->下游订户.输入init(下游:DownstreamSubscriber,转换:@转义(Input)-> DownstreamSubscriber.Input){self.downstream =下游self.transform =变换}func receive(订阅:订阅){下游.接收(订阅:订阅)}func receive(_ input:Input)->Subscribers.Demand {下游.接收(转换(输入))}func receive(完成:Subscribers.Completion< DownstreamSubscriber.Failure>){下游.接收(完成:完成)}}}扩展发布商{func mapLazily< Transformed>(transform:@转义(输出)-> Transformed)->AnyPublisher<转换,失败>{Publishers.MapLazily(上游:自我,变换:变换).eraseToAnyPublisher()}} 

使用此运算符, MySubscriber 立即接收预订,并且仅在有需求时执行 mapLazily 转换:

 (1 ...).发布者.mapLazily {print("Map:\(value)")返回值* 2}.subscribe(MySubscriber())//仅在下游订户每3秒钟要求一次时才转换值:////已订阅//地图:1//值:2//地图:2//值:4//地图:3//值:6//地图:4//值:8 

我的猜测是为 Publishers.Sequence 定义的 map 的特定重载正在使用某种捷径来增强性能.这对于无限序列会中断,但即使对于有限序列急于用尽序列,无论下游需求如何使我的直觉陷入混乱.在我看来,以下代码:

 (1 ... 3).发布者.map {print("Map:\(value)")返回值* 2}.subscribe(MySubscriber()) 

应打印:

 已订阅地图:1价值:2地图:2价值:4地图:3价值:6完全的 

但打印:

 地图:1地图:2地图:3已订阅价值:2价值:4价值:6完全的 

解决方案

这是一个更简单的测试,不涉及任何自定义订阅者:

 (1 ...).发布者//.map {$ 0}.flatMap(maxPublishers:.max(1)){(i:Int)->AnyPublisher< Int,Never>在Just< Int>(i).delay(for:3,scheduler:DispatchQueue.main).eraseToAnyPublisher()}.sink {print($ 0)}.store(位于:& storage) 

它可以按预期工作,但是如果您取消注释 .map ,您将一无所获,因为 .map 运算符正在累积无限的上游值,而没有发布任何内容./p>

基于您的假设,即 map 在某种程度上针对先前的序列发布者进行了优化,我尝试了以下解决方法:

 (1 ...).publisher.eraseToAnyPublisher().map {$ 0}//... 

当然可以解决问题!通过对地图操作员隐藏序列发布者,我们无法进行优化.

Suppose I have a custom subscriber that requests one value on subscription and then an additional value three seconds after it receives the previous value:

class MySubscriber: Subscriber {
    typealias Input = Int
    typealias Failure = Never

    private var subscription: Subscription?

    func receive(subscription: Subscription) {
        print("Subscribed")

        self.subscription = subscription
        subscription.request(.max(1))
    }

    func receive(_ input: Int) -> Subscribers.Demand {
        print("Value: \(input)")

        DispatchQueue.main.asyncAfter(deadline: .now() + .seconds(3)) {
            self.subscription?.request(.max(1))
        }

        return .none
    }

    func receive(completion: Subscribers.Completion<Never>) {
        print("Complete")
        subscription = nil
    }
}

If I use this to subscribe to an infinite range publisher, back pressure is handled gracefully, with the publisher waiting 3 seconds each time until it receives the next demand to send a value:

(1...).publisher.subscribe(MySubscriber())

// Prints values infinitely with ~3 seconds between each:
//
//     Subscribed
//     Value: 1
//     Value: 2
//     Value: 3
//     ...

But if I add a map operator then MySubscriber never even receives a subscription; map appears to have synchronously requested Demand.Unlimited upon receiving its subscription and the app infinitely spins as map tries to exhaust the infinite range:

(1...).publisher
    .map { value in
        print("Map: \(value)")
        return value * 2
    }
    .subscribe(MySubscriber())

// The `map` transform is executed infinitely with no delay:
//
//     Map: 1
//     Map: 2
//     Map: 3
//     ...

My question is, why does map behave this way? I would have expected map to just pass its downstream demand to the upstream. Since map is supposed to be for transformation rather than side effects, I don't understand what the use case is for its current behavior.

EDIT

I implemented a version of map to show how I think it ought to work:

extension Publishers {
    struct MapLazily<Upstream: Publisher, Output>: Publisher {
        typealias Failure = Upstream.Failure

        let upstream: Upstream
        let transform: (Upstream.Output) -> Output

        init(upstream: Upstream, transform: @escaping (Upstream.Output) -> Output) {
            self.upstream = upstream
            self.transform = transform
        }

        public func receive<S: Subscriber>(subscriber: S) where S.Input == Output, S.Failure == Upstream.Failure {
            let mapSubscriber = Subscribers.LazyMapSubscriber(downstream: subscriber, transform: transform)
            upstream.receive(subscriber: mapSubscriber)
        }
    }
}

extension Subscribers {
    class LazyMapSubscriber<Input, DownstreamSubscriber: Subscriber>: Subscriber {
        let downstream: DownstreamSubscriber
        let transform: (Input) -> DownstreamSubscriber.Input

        init(downstream: DownstreamSubscriber, transform: @escaping (Input) -> DownstreamSubscriber.Input) {
            self.downstream = downstream
            self.transform = transform
        }

        func receive(subscription: Subscription) {
            downstream.receive(subscription: subscription)
        }

        func receive(_ input: Input) -> Subscribers.Demand {
            downstream.receive(transform(input))
        }

        func receive(completion: Subscribers.Completion<DownstreamSubscriber.Failure>) {
            downstream.receive(completion: completion)
        }
    }
}

extension Publisher {
    func mapLazily<Transformed>(transform: @escaping (Output) -> Transformed) -> AnyPublisher<Transformed, Failure> {
        Publishers.MapLazily(upstream: self, transform: transform).eraseToAnyPublisher()
    }
}

Using this operator, MySubscriber receives the subscription immediately and the mapLazily transform is only executed when there is demand:

(1...).publisher
    .mapLazily { value in
        print("Map: \(value)")
        return value * 2
    }
    .subscribe(MySubscriber())

// Only transforms the values when they are demanded by the downstream subscriber every 3 seconds:
//
//     Subscribed
//     Map: 1
//     Value: 2
//     Map: 2
//     Value: 4
//     Map: 3
//     Value: 6
//     Map: 4
//     Value: 8

My guess is that the particular overload of map defined for Publishers.Sequence is using some kind of shortcut to enhance performance. This breaks for infinite sequences, but even for finite sequences eagerly exhausting the sequence regardless of the downstream demand messes with my intuition. In my view, the following code:

(1...3).publisher
    .map { value in
        print("Map: \(value)")
        return value * 2
    }
    .subscribe(MySubscriber())

ought to print:

Subscribed
Map: 1
Value: 2
Map: 2
Value: 4
Map: 3
Value: 6
Complete

but instead prints:

Map: 1
Map: 2
Map: 3
Subscribed
Value: 2
Value: 4
Value: 6
Complete

解决方案

Here's a simpler test that doesn't involve any custom subscribers:

(1...).publisher
    //.map { $0 }
    .flatMap(maxPublishers: .max(1)) {
        (i:Int) -> AnyPublisher<Int,Never> in
        Just<Int>(i)
            .delay(for: 3, scheduler: DispatchQueue.main)
            .eraseToAnyPublisher()
}
.sink { print($0) }
.store(in: &storage)

It works as expected, but then if you uncomment the .map you get nothing, because the .map operator is accumulating the infinite upstream values without publishing anything.

On the basis of your hypothesis that map is somehow optimizing for a preceding sequence publisher, I tried this workaround:

(1...).publisher.eraseToAnyPublisher()
    .map { $0 }
    // ...

And sure enough, it fixed the problem! By hiding the sequence publisher from the map operator, we prevent the optimization.

这篇关于为什么`Publishers.Map`会急切地消耗上游值?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆