takeWhileInclusive的此实现安全吗? [英] Is this implementation of takeWhileInclusive safe?

查看:77
本文介绍了takeWhileInclusive的此实现安全吗?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我发现了以下包含性takeWhile的实现(在这里) >

I found the following implementation of an inclusive takeWhile (found here)

fun <T> Sequence<T>.takeWhileInclusive(pred: (T) -> Boolean): Sequence<T> {
    var shouldContinue = true
    return takeWhile {
        val result = shouldContinue
        shouldContinue = pred(it)
        result
    }
}

问题是我不是100%确信如果在并行序列上使用它是安全的.

The problem is I'm not 100% convinced this is safe if used on a parallel sequence.

我担心的是,我们将依靠shouldContinue变量来知道何时停止,但是我们不会同步它的访问.

My concern is that we'd be relying on the shouldContinue variable to know when to stop, but we're not synchronizing it's access.

有什么见解?

推荐答案

这是我到目前为止所发现的.

Here's what I've figured out so far.

问题尚不清楚.没有并行序列之类的东西,我可能将它们与

The question is unclear. There's no such thing as a parallel sequence I probably got them mixed up with Java's parallel streams. What I meant was a sequence that was consumed concurrently.

@LouisWasserman在注释序列中指出,该序列不是为并行执行而设计的.特别是SequenceBuilder@RestrictSuspension注释.引用自科林式协同回购:

As @LouisWasserman pointed out in the comments sequences are not designed for parallel execution. In particular the SequenceBuilder is annotated with @RestrictSuspension. Citing from Kotlin Coroutine repo:

这意味着在其范围内,lambda的SequenceBuilder扩展均无法调用suspendContinuation或其他常规的暂停函数

It means that no SequenceBuilder extension of lambda in its scope can invoke suspendContinuation or other general suspending function

正如@MarkoTopolnik所说,它们仍然可以像其他任何Object一样在并行程序中使用.

Having said that as @MarkoTopolnik commented they can still be used in a parallel program just like any other Object.

作为示例,这是首次尝试并行使用序列

As an example here's a first attempt of using Sequences in parallel

fun launchProcessor(id: Int, iterator: Iterator<Int>) = launch {
    println("[${Thread.currentThread().name}] Processor #$id received ${iterator.next()}")
}

fun main(args: Array<String>) {
    val s = sequenceOf(0, 1, 2, 3, 4, 5, 6, 7, 8, 9)
    runBlocking {
        val iterator = s.iterator()
        repeat(10) { launchProcessor(it, iterator) }
    }
}

此代码显示:

[ForkJoinPool.commonPool-worker-2]处理器#1收到1

[ForkJoinPool.commonPool-worker-2] Processor #1 received 1

[ForkJoinPool.commonPool-worker-1]处理器#0收到0

[ForkJoinPool.commonPool-worker-1] Processor #0 received 0

[ForkJoinPool.commonPool-worker-3]处理器#2收到2

[ForkJoinPool.commonPool-worker-3] Processor #2 received 2

[ForkJoinPool.commonPool-worker-2]处理器#3收到3

[ForkJoinPool.commonPool-worker-2] Processor #3 received 3

[ForkJoinPool.commonPool-worker-1]处理器#4收到3

[ForkJoinPool.commonPool-worker-1] Processor #4 received 3

[ForkJoinPool.commonPool-worker-3]处理器5获得了3

[ForkJoinPool.commonPool-worker-3] Processor #5 received 3

[ForkJoinPool.commonPool-worker-1]处理器#7收到5

[ForkJoinPool.commonPool-worker-1] Processor #7 received 5

[ForkJoinPool.commonPool-worker-2]处理器#6收到4

[ForkJoinPool.commonPool-worker-2] Processor #6 received 4

[ForkJoinPool.commonPool-worker-1]处理器#9收到7

[ForkJoinPool.commonPool-worker-1] Processor #9 received 7

[ForkJoinPool.commonPool-worker-3]处理器#8收到6

[ForkJoinPool.commonPool-worker-3] Processor #8 received 6

当然不是我们想要的.一些数字被消耗了两次.

Which of course is not what we want. As some numbers are consumed twice.

另一方面,如果我们要使用渠道,我们可以这样写:

On the other hand if we were to use channels we could write something like this:

fun produceNumbers() = produce {
    var x = 1 // start from 1
    while (true) {
        send(x++) // produce next
        delay(100) // wait 0.1s
    }
}

fun launchProcessor(id: Int, channel: ReceiveChannel<Int>) = launch {
    channel.consumeEach {
        println("[${Thread.currentThread().name}] Processor #$id received $it")
    }
}

fun main(args: Array<String>) = runBlocking<Unit> {
    val producer = produceNumbers()
    repeat(5) { launchProcessor(it, producer) }
    delay(1000)
    producer.cancel() // cancel producer coroutine and thus kill them all
}

然后输出为:

[ForkJoinPool.commonPool-worker-2]处理器#0收到1

[ForkJoinPool.commonPool-worker-2] Processor #0 received 1

[ForkJoinPool.commonPool-worker-2]处理器#0收到2

[ForkJoinPool.commonPool-worker-2] Processor #0 received 2

[ForkJoinPool.commonPool-worker-1]处理器#1收到3

[ForkJoinPool.commonPool-worker-1] Processor #1 received 3

[ForkJoinPool.commonPool-worker-2]处理器2获得了4

[ForkJoinPool.commonPool-worker-2] Processor #2 received 4

[ForkJoinPool.commonPool-worker-1]处理器#3收到5

[ForkJoinPool.commonPool-worker-1] Processor #3 received 5

[ForkJoinPool.commonPool-worker-2]处理器#4收到6

[ForkJoinPool.commonPool-worker-2] Processor #4 received 6

[ForkJoinPool.commonPool-worker-2]处理器#0收到7

[ForkJoinPool.commonPool-worker-2] Processor #0 received 7

[ForkJoinPool.commonPool-worker-1]处理器1获得了8

[ForkJoinPool.commonPool-worker-1] Processor #1 received 8

[ForkJoinPool.commonPool-worker-1] 2号处理器收到9

[ForkJoinPool.commonPool-worker-1] Processor #2 received 9

[ForkJoinPool.commonPool-worker-2]处理器#3收到10

[ForkJoinPool.commonPool-worker-2] Processor #3 received 10

此外,我们可以为这样的渠道实施takeWhileInclusive方法:

Furthermore we could implement the takeWhileInclusive method for channels like this:

fun <E> ReceiveChannel<E>.takeWhileInclusive(
        context: CoroutineContext = Unconfined,
        predicate: suspend (E) -> Boolean
): ReceiveChannel<E> = produce(context) {
    var shouldContinue = true
    consumeEach {
        val currentShouldContinue = shouldContinue
        shouldContinue = predicate(it)
        if (!currentShouldContinue) return@produce
        send(it)
    }
}

它可以按预期工作.

这篇关于takeWhileInclusive的此实现安全吗?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆