并行执行与顺序执行之间的闭合不一致 [英] Closure inconsistency between parallel and sequential execution

查看:41
本文介绍了并行执行与顺序执行之间的闭合不一致的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我试图编写一个可以并行或顺序执行功能的通用功能.在测试它时,我发现了一些关于闭包的非常意外的行为.在下面的代码中,我定义了一个不接受任何参数并返回错误的函数列表.这些函数还在闭包中使用了for循环变量,但是我正在使用在循环内定义新变量的技巧,以试图避免捕获.

I have attempted to write a generic function that can execute functions in parallel or sequentially. While testing it, I have found some very unexpected behavior regarding closures. In the code below, I define a list of functions that accept no parameters and return an error. The functions also use a for loop variable in a closure but I'm using the trick of defining a new variable within the loop in an attempt to avoid capture.

我期望可以顺序或并发调用这些函数,但效果相同,但是结果却有所不同.好像正在捕获闭包变量,但仅在同时运行时才被捕获.

I'm expecting that I can call these functions sequentially or concurrently with the same effect but I'm seeing different results. It's as if the closure variable is being captured but only when run concurrently.

据我所知,这不是捕获循环变量的通常情况.如前所述,我正在循环中定义一个新变量.另外,我不在循环中运行闭包函数.我在循环内生成函数列表,但在循环后执行函数.

As far as I can tell, this is not the usual case of capturing a loop variable. As I mentioned, I'm defining a new variable within the loop. Also, I'm not running the closure function within the loop. I'm generating a list of functions within the loop but I'm executing the functions after the loop.

我正在使用go版本go1.8.3 linux/amd64.

I'm using go version go1.8.3 linux/amd64.

package closure_test

import (
    "sync"
    "testing"
)

// MergeErrors merges multiple channels of errors.
// Based on https://blog.golang.org/pipelines.
func MergeErrors(cs ...<-chan error) <-chan error {
    var wg sync.WaitGroup
    out := make(chan error)

    // Start an output goroutine for each input channel in cs.  output
    // copies values from c to out until c is closed, then calls wg.Done.
    output := func(c <-chan error) {
        for n := range c {
            out <- n
        }
        wg.Done()
    }
    wg.Add(len(cs))
    for _, c := range cs {
        go output(c)
    }

    // Start a goroutine to close out once all the output goroutines are
    // done.  This must start after the wg.Add call.
    go func() {
        wg.Wait()
        close(out)
    }()
    return out
}

// WaitForPipeline waits for results from all error channels.
// It returns early on the first error.
func WaitForPipeline(errs ...<-chan error) error {
    errc := MergeErrors(errs...)
    for err := range errc {
        if err != nil {
            return err
        }
    }
    return nil
}

func RunInParallel(funcs ...func() error) error {
    var errcList [](<-chan error)
    for _, f := range funcs {
        errc := make(chan error, 1)
        errcList = append(errcList, errc)
        go func() {
            err := f()
            if err != nil {
                errc <- err
            }
            close(errc)
        }()
    }
    return WaitForPipeline(errcList...)
}

func RunSequentially(funcs ...func() error) error {
    for _, f := range funcs {
        err := f()
        if err != nil {
            return err
        }
    }
    return nil
}

func validateOutputChannel(t *testing.T, out chan int, n int) {
    m := map[int]bool{}
    for i := 0; i < n; i++ {
        m[<-out] = true
    }
    if len(m) != n {
        t.Errorf("Output channel has %v unique items; wanted %v", len(m), n)
    }
}

// This fails because j is being captured.
func TestClosure1sp(t *testing.T) {
    n := 4
    out := make(chan int, n*2)
    var funcs [](func() error)
    for i := 0; i < n; i++ {
        j := i // define a new variable that has scope only inside the current loop iteration
        t.Logf("outer i=%v, j=%v", i, j)
        f := func() error {
            t.Logf("inner i=%v, j=%v", i, j)
            out <- j
            return nil
        }
        funcs = append(funcs, f)
    }
    t.Logf("Running funcs sequentially")
    if err := RunSequentially(funcs...); err != nil {
        t.Fatal(err)
    }
    validateOutputChannel(t, out, n)
    t.Logf("Running funcs in parallel")
    if err := RunInParallel(funcs...); err != nil {
        t.Fatal(err)
    }
    close(out)
    validateOutputChannel(t, out, n)
}

下面是上面测试功能的输出.

Below is the output from the test function above.

closure_test.go:91: outer i=0, j=0
closure_test.go:91: outer i=1, j=1
closure_test.go:91: outer i=2, j=2
closure_test.go:91: outer i=3, j=3
closure_test.go:99: Running funcs sequentially
closure_test.go:93: inner i=4, j=0
closure_test.go:93: inner i=4, j=1
closure_test.go:93: inner i=4, j=2
closure_test.go:93: inner i=4, j=3
closure_test.go:104: Running funcs in parallel
closure_test.go:93: inner i=4, j=3
closure_test.go:93: inner i=4, j=3
closure_test.go:93: inner i=4, j=3
closure_test.go:93: inner i=4, j=3
closure_test.go:80: Output channel has 1 unique items; wanted 4

有什么想法吗?这是Go中的错误吗?

Any ideas? Is this a bug in Go?

推荐答案

始终使用-race运行测试.在您的情况下,您忘记了在 RunInParallel 中的每次迭代中重新创建 f :

Always run your tests with -race. In your case, you forgot to recreate f on each iteration in RunInParallel:

func RunInParallel(funcs ...func() error) error {
    var errcList [](<-chan error)
    for _, f := range funcs {

        f := f // << HERE

        errc := make(chan error, 1)
        errcList = append(errcList, errc)
        go func() {
            err := f()
            if err != nil {
                errc <- err
            }
            close(errc)
        }()
    }
    return WaitForPipeline(errcList...)
}

因此,您总是启动了最后一个 f 而不是每个.

As a result, you always launched the last f instead of each one.

这篇关于并行执行与顺序执行之间的闭合不一致的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆