如何在仍然处理所有源事件的合并中限制flatMap并发性? [英] How to limit flatMap concurrency in Combine still having all source events processed?

查看:65
本文介绍了如何在仍然处理所有源事件的合并中限制flatMap并发性?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

如果我指定 maxPublishers 参数,则第一个maxPublishers事件之后的源事件将不会被平面映射.虽然我只想限制并发性.那就是在某些第一批maxPublishers平面地图发布者完成后,继续处理下一个事件.

If I specify the maxPublishers parameter then source events after first maxPublishers events won't be flat mapped. While I want to limit only concurrency. That is to continue processing next events after some of the first maxPublishers flat map publishers have completed.

Publishers.Merge(
    addImageRequestSubject
        .flatMap(maxPublishers: .max(3)) { self.compressImage($0) }
        .compactMap { $0 }
        .flatMap(maxPublishers: .max(3)) { self.addImage($0) },
    addVideoRequestSubject
        .flatMap(maxPublishers: .max(3)) { self.addVideo(url: $0) }
).sink(receiveCompletion: { _ in }, receiveValue: {})
.store(in: &cancelBag)

我还尝试在OperationQueue的帮助下限制并发性.但是 maxConcurrentOperationCount 似乎没有效果.

I've also tried to limit concurrency with help of OperationQueue. But maxConcurrentOperationCount seems doesn't have an effect.

Publishers.Merge(
    addImageRequestSubject
        .receive(on: imageCompressionQueue)
        .flatMap { self.compressImage($0) }
        .compactMap { $0 }
        .receive(on: mediaAddingQueue)
        .flatMap { self.addImage($0) },
    addVideoRequestSubject
        .receive(on: mediaAddingQueue)
        .flatMap { self.addVideo(url: $0) }
).sink(receiveCompletion: { _ in }, receiveValue: {})
.store(in: &cancelBag)

private lazy var imageCompressionQueue: OperationQueue = {
    var queue = OperationQueue()
    queue.maxConcurrentOperationCount = 3

    return queue
}()

private lazy var mediaAddingQueue: OperationQueue = {
    var queue = OperationQueue()
    queue.maxConcurrentOperationCount = 3

    return queue
}()

平面地图发布者是这样的:

Flat map publishers look this way:

func compressImage(_ image: UIImage) -> Future<Data?, Never> {
    Future { promise in
        DispatchQueue.global().async {
            let result = image.compressTo(15)?.jpegData(compressionQuality: 1)
            promise(Result.success(result))
        }
    }
}

推荐答案

您已经很好地偶然发现了 .buffer 运算符的用例.其目的是通过累积否则会丢失的值来补偿 .flatMap 背压.

You have stumbled very beautifully right into the use case for the .buffer operator. Its purpose is to compensate for .flatMap backpressure by accumulating values that would otherwise be dropped.

我将通过一个完全人工的例子进行说明:

I will illustrate by a completely artificial example:

class ViewController: UIViewController {
    let sub = PassthroughSubject<Int,Never>()
    var storage = Set<AnyCancellable>()
    var timer : Timer!
    override func viewDidLoad() {
        super.viewDidLoad()
        sub
            .flatMap(maxPublishers:.max(3)) { i in
                return Just(i)
                    .delay(for: 3, scheduler: DispatchQueue.main)
                    .eraseToAnyPublisher()
            }
            .sink { print($0) }
            .store(in: &storage)
        
        var count = 0
        self.timer = Timer.scheduledTimer(withTimeInterval: 1, repeats: true) { 
            _ in
            count += 1
            self.sub.send(count)
        }
    }
}

因此,我们的发布者每秒发出一个递增的整数,但是我们的 flatMap 具有 .max(3),并且需要3秒钟来重新发布值.结果是我们开始错过值:

So, our publisher is emitting an incremented integer every second, but our flatMap has .max(3) and takes 3 seconds to republish a value. The result is that we start to miss values:

1
2
3
5
6
7
9
10
11
...

解决方案是在 flatMap 前面放置一个缓冲区.它必须足够大以容纳任何遗漏的值足够长的时间,以便可以请求它们:

The solution is to put a buffer in front of the flatMap. It needs to be large enough to hold any missed values long enough for them to be requested:

        sub
            .buffer(size: 20, prefetch: .keepFull, whenFull: .dropOldest)
            .flatMap(maxPublishers:.max(3)) { i in

结果是,所有的数值实际上都到达了 sink .当然,在现实生活中,如果缓冲区的大小不足以补偿发布者的价值释放率与反压 flatMap的价值释放率之间的差异,我们可能仍然会丢失值.

The result is that all the numeric values do in fact arrive at the sink. Of course in real life we could still lose values if the buffer is not large enough to compensate for disparity between the rate of value emission from the publisher and the rate of value emission from the backpressuring flatMap.

这篇关于如何在仍然处理所有源事件的合并中限制flatMap并发性?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆