使用 RxSwift 限制对服务类的并发访问 [英] Limiting concurrent access to a service class with RxSwift

查看:45
本文介绍了使用 RxSwift 限制对服务类的并发访问的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

给定一个这样的服务类:

Given a service class like this:

class Service {
    let networkService = NetworkService()

    func handleJobA(input: String) -> Observable<ResultA> {
        return networkService
            .computeA(input)
            .map { $0.a }
    }
}

当我像这样从调用方使用它时:

And when I use it from the caller side like this:

let service = Service()

Observable
    .from(["Hello", "World"])
    .flatMap {
        service.handleJobA($0)
    }
    .subscribe()

然后这会同时向 service 发送多个请求.我希望流等待每个请求完成.这可以使用 merge 运算符实现.

Then this would send multiple requests to service at the same time. I wanted for the stream to wait until each request is done. That was achievable using the merge operator.

Observable
    .from(["Hello", "World"])
    .flatMap {
        Observable.just(
            service.handleJobA($0)
        )
    }
    .merge(maxConcurrent: 1)
    .subscribe()

到目前为止,一切都很好 - 该服务不会同时执行多个 handleJobA 任务.

So far, so good - the service will not perform multiple handleJobA tasks at the same time.

然而,并发是一个服务细节,调用者不应该关心它.事实上,服务在稍后阶段可能会决定允许不同的并发值.

However, the concurrency is a service detail and the caller should NOT care about it. In fact, the service, at a later stage, might decide to allow for difference concurrency values.

其次,当我添加一个新方法 handleJobB 时,它不能与作业 A 同时处于活动状态,反之亦然.

Secondly, when I add a new method handleJobB, it must not be active at the same time as job A, and vice versa.

所以我的问题是:

  1. 如何将 maxConcurrency 限制为 handleJobA observable 作为实现细节?
  2. 哪种 RxSwift 模式允许对任何服务方法进行限制?

推荐答案

您需要一个专用于该服务的串行调度程序.这是一个可以粘贴到游乐场的示例:

You need a serial Scheduler that is dedicated to that service. Here is an example that can be pasted to a playground:

/// playground

import RxSwift

class Service {

    func handleJobA(input: String) -> Observable<String> {

        return Observable.create { observer in
            print("start job a")
            sleep(3)
            observer.onNext(input)
            print("complete job a")
            observer.onCompleted()
            return Disposables.create()
        }.subscribeOn(scheduler)
    }

    func handleJobB(input: String) -> Observable<String> {
        return Observable.create { observer in
            print("start job b")
            sleep(3)
            observer.onNext(input)
            print("complete job b")
            observer.onCompleted()
            return Disposables.create()
            return Disposables.create()
        }.subscribeOn(scheduler)
    }

    let scheduler = SerialDispatchQueueScheduler(internalSerialQueueName: "Service")
}


let service = Service()

_ = Observable.from(["hello","world","swift"])
    .flatMap { service.handleJobA(input: $0) }
    .subscribe(onNext:{
        print("result " + $0)
    })

_ = Observable.from(["hello","world","swift"])
    .flatMap { service.handleJobB(input: $0) }
    .subscribe(onNext:{
        print("result " + $0)
    })

import PlaygroundSupport

PlaygroundPage.current.needsIndefiniteExecution = true

这篇关于使用 RxSwift 限制对服务类的并发访问的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆