为什么for循环中并发队列上的.async行为与DispatchQueue.concurrentPerform相同? [英] Why doesn't .async on a concurrent queue in a for loop behave the same as DispatchQueue.concurrentPerform?

查看:83
本文介绍了为什么for循环中并发队列上的.async行为与DispatchQueue.concurrentPerform相同?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

import Dispatch

class SynchronizedArray<T> {
    private var array: [T] = []
    private let accessQueue = DispatchQueue(label: "SynchronizedArrayAccess", attributes: .concurrent)
    
    var get: [T] {
        accessQueue.sync {
            array
        }
    }
    
    func append(newElement: T) {
        accessQueue.async(flags: .barrier) {
            self.array.append(newElement)
        }
    }
}

如果我运行以下代码,即使同时阅读,也会按预期将10,000个元素添加到数组中.

If I run the following code, 10,000 elements are appended to the array as expected even if I am reading concurrently:

DispatchQueue.concurrentPerform(iterations: 10000) { i in
    _ threadSafeArray.get
    threadSafeArray.append(newElement: i)
}

但是,当我这样做时,只有它几乎无法添加10,000个元素(上次运行时仅在计算机上添加了92个元素).

But when I do this, only it never comes close to adding 10,000 elements (only added 92 elements on my computer the last time I ran it).

let concurrent = DispatchQueue(label: "com.concurrent", attributes: .concurrent)
for i in 0..<10000 {
    concurrent.async {
        _ = threadSafeArray.get
        threadSafeArray.append(newElement: i)
    }
}

为什么前者起作用,而为什么后者不起作用?

Why does the former work, and why doesn't the latter work?

推荐答案

很高兴找到线程爆炸的解决方案.查看有关WWDC 2015线程爆炸的讨论构建响应高效的应用程序与GCD一起,然后再次在WWDC 2016中在Swift中使用GCD并发编程3 .

It's good that you found a solution to the thread explosion. See a discussion on thread explosion WWDC 2015 Building Responsive and Efficient Apps with GCD and again in WWDC 2016 Concurrent Programming With GCD in Swift 3.

话虽如此,鉴于存在 concurrentPerform (或 OperationQueue 与其 maxConcurrentOperationCount 或与其 maxPublishers 结合使用).与调度信号量相比,所有这些方法都可以更好地管理并发度.

That having been said, DispatchSemaphore is a bit of an anti-pattern, nowadays, given the presence of concurrentPerform (or OperationQueue with its maxConcurrentOperationCount or Combine with its maxPublishers). All of these manage degrees of concurrency more elegantly than dispatch semaphores.

所有这些,关于您的信号量模式的一些观察:

  1. 在使用此 DispatchSemaphore 模式时,通常将 wait 放在 concurrent.async {...之前.} (因为按照书面规定,您将获得9个并发操作,而不是8个,这会引起误解).

  1. When using this DispatchSemaphore pattern, you generally put the wait before the concurrent.async { ... } (because, as written, you're getting nine concurrent operations, not eight, which is a bit misleading).

这里更深的问题是,您已经减少了计数问题的问题,但是仍然存在.考虑:

The deeper problem here is that you've diminished the problem of the count issue, but it still persists. Consider:

let threadSafeArray = SynchronizedArray<Int>()

let concurrent = DispatchQueue(label: "com.concurrent", attributes: .concurrent)
let semaphore = DispatchSemaphore(value: 8)
for i in 0..<10000 {
    semaphore.wait()
    concurrent.async {
        threadSafeArray.append(newElement: i)
        semaphore.signal()
    }
}

print(threadSafeArray.get.count)

当离开 for 循环时,仍可以在 concurrent 上继续运行多达八个异步任务,并且 count (相对于 concurrent 队列不同步)仍可以小于10,000.您必须添加另一个 concurrent.async(flags:.barrier){...} ,这只是添加第二层同步.例如

When you leave the for loop, you can still have up to eight of the async tasks on concurrent still running, and the count (unsynchronized with respect to concurrent queue) can still be less than 10,000. You have to add another concurrent.async(flags: .barrier) { ... }, which is just adding a second layer of synchronization. E.g.

let semaphore = DispatchSemaphore(value: 8)
for i in 0..<10000 {
    semaphore.wait()
    concurrent.async {
        threadSafeArray.append(newElement: i)
        semaphore.signal()
    }
}

concurrent.async(flags: .barrier) {
    print(threadSafeArray.get.count)
}

或者您可以使用 DispatchGroup ,这是确定一系列异步分配的块何时结束的经典机制:

Or you can use a DispatchGroup, the classical mechanism for determining when a series of asynchronously dispatched blocks finish:

let semaphore = DispatchSemaphore(value: 8)
let group = DispatchGroup()

for i in 0..<10000 {
    semaphore.wait()
    concurrent.async(group: group) {
        threadSafeArray.append(newElement: i)
        semaphore.signal()
    }
}

group.notify(queue: .main) {
    print(threadSafeArray.get.count)
}

使用 concurrentPerform 无需使用任何一种模式,因为在完成所有并发任务之前,它不会继续执行.(它还将自动优化设备上内核数的并发度.)

Using of concurrentPerform eliminates the need for either of these patterns because it won’t continue execution until all of the concurrent tasks are done. (It will also automatically optimize the degree of concurrency for the number of cores on your device.)

FWIW,是 SynchronizedArray 的一种更好的替代方法,是根本不公开底层数组,而只是实现要公开的任何方法,并集成必要的同步.它使呼叫站点更整洁,并解决了许多问题.

FWIW, a much better alternative to to SynchronizedArray is to not expose the underlying array at all, and just implement whatever methods you want to exposed, integrating the necessary synchronization. It makes for cleaner call site, and solves many issues.

例如,假设您要公开下标运算符和一个 count 变量,则可以执行以下操作:

For example, assuming you wanted to expose subscript operator and a count variable, you would do:

class SynchronizedArray<T> {
    private var array: [T]
    private let accessQueue = DispatchQueue(label: "com.domain.app.reader-writer", attributes: .concurrent)

    init(_ array: [T] = []) {
        self.array = array
    }

    subscript(index: Int) -> T {
        get { reader { $0[index] } }
        set { writer { $0[index] = newValue } }
    }

    var count: Int {
        reader { $0.count }
    }

    func append(newElement: T) {
        writer { $0.append(newElement) }
    }

    func reader<U>(_ block: ([T]) throws -> U) rethrows -> U {
        try accessQueue.sync { try block(array) }
    }

    func writer(_ block: @escaping (inout [T]) -> Void) {
        accessQueue.async(flags: .barrier) { block(&self.array) }
    }
}

这解决了许多问题.例如,您现在可以执行以下操作:

This solves a variety of issues. For example, you can now do:

print(threadSafeArray.count) // get the count
print(threadSafeArray[500])  // get the 500th item

您现在还可以执行以下操作:

You also now can also do things like:

let average = threadSafeArray.reader { array -> Double in
    let sum = array.reduce(0, +)
    return Double(sum) / Double(array.count)
}

但是,最重要的是,在处理集合(或任何可变对象)时,您始终不希望公开可变对象本身,而是希望为常见操作编写自己的同步方法(下标, count removeAll 等),并可能在应用程序开发人员可能需要更广泛的同步机制的情况下公开读取器/写入器界面.

But, bottom line, when dealing with collections (or any mutable object), you invariably do not want to expose the mutable object, itself, but rather write your own synchronized methods for common operations (subscripts, count, removeAll, etc.), and possibly also expose the reader/writer interface for those cases where the app developer might need a broader synchronization mechanism.

(FWIW,对此 SynchronizedArray 的更改适用于信号量或 concurrentPerform 方案;在这种情况下,信号量恰好体现了问题所在.)

(FWIW, the changes to this SynchronizedArray apply both to the semaphore or concurrentPerform scenarios; it is just that the semaphore just happens to manifest the problem in this case.)

不用说,您通常也需要在每个线程上完成更多工作,因为与上下文切换开销相比,这很适中,足以抵消从并行处理中获得的任何优势.(但我知道,这可能只是问题的概念说明,而不是拟议的实现.)仅供未来读者参考.

Needless to say, you would generally have more work being done on each thread, too, because as modest as the context switching overhead, it is likely enough here to offset any advantages gained from parallel processing. (But I understand that this was likely just a conceptual demonstration of a problem, not a proposed implementation.) Just a FYI to future readers.

这篇关于为什么for循环中并发队列上的.async行为与DispatchQueue.concurrentPerform相同?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
相关文章
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆