Golang并发,处理一批项目 [英] Golang concurrency, processing batches of items

查看:84
本文介绍了Golang并发,处理一批项目的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在编写一个程序来处理文本文件中的数百万行,500k花费5秒来验证文件,我想加快这一过程。

I am writing a program to process millions of lines from a text file, 500k was taking 5seconds to validate the file, I wanted to speed this up.

I想遍历项目并异步处理其中的x,然后等待响应以查看是否应该继续。

I wanted to loop over the items and process x of them async, then wait for the response to see whether I should continue.

我写了一些伪代码,但我不是确定我写的内容是否有意义,似乎很复杂,是否有更简单,更优雅的方法来做到这一点。

I have written some dummy code, I am not sure whether or not what I have written makes much sense, it just seems rather complicated, is there a simpler more elegant way of doing this.

package main

import (
    "fmt"
    "sync"
    "time"
)

func main() {
    // Need an object to loop over
    // need a loop to read the response
    items := 100000
    concurrency := 20
    sem := make(chan bool, concurrency)
    returnChan := make(chan error)
    finChan := make(chan bool)

    var wg sync.WaitGroup

    go func() {
        for x := 0; x < items; x++ {
            // loop over all items
            // only do maxitems at a time
            wg.Add(1)
            sem <- true
            go delayFunc(x, sem, returnChan, &wg)
        }
        wg.Wait()
        finChan <- true
    }()

    var err error
    finished := false
    for {
        select {
        case err = <-returnChan:
            if err != nil {
                break
            }
        case _ = <-finChan:
            finished = true
            break
        default:
            continue
        }

        if err != nil || finished == true {
            break
        }
    }
    fmt.Println(err)
}

func delayFunc(x int, sem chan bool, returnChan chan error, wg *sync.WaitGroup) {
    //fmt.Printf("PROCESSING (%v)\n", x)
    time.Sleep(10 * time.Millisecond)
    <-sem // release the lock
    wg.Done()
    if x == 95000 {
        returnChan <- fmt.Errorf("Something not right")
    } else {
        returnChan <- nil
    }
}


推荐答案

您的代码看起来不错,您实现了Go模式中常用的代码。缺点是-您为每个项目生成工作程序goroutine。廉价出售goroutine并非免费。另一种方法是生成N个工人并通过渠道向他们提供物品。像这样的东西

Your code looks fine, you implement commonly used in Go pattern. The downside is - you spawn worker goroutine for every item. Spawning goroutine while cheap isn't free. Another approach is to spawn N workers and provide them items thru channel. Something like this

package main
import (
    "fmt"
    "time"
)

func main() {
    items := 100
    concurrency := 10
    in := make(chan int)
    ret := make(chan error)

    for x := 0; x < concurrency; x++ {
        go worker(in, ret)
    }
    go func() {
        for x := 0; x < items; x++ {
            // loop over all items
            in <- x
        }
        close(in)
    }()
    for err := range ret {
        if err != nil {
            fmt.Println(err.Error())
            break
        }
    }
}
func worker(in chan int, returnChan chan error) {
    //fmt.Printf("PROCESSING (%v)\n", x)
    for x := range in {
        if x == 95 {
            returnChan <- fmt.Errorf("Something not right")
        } else {
            returnChan <- nil
        }
        time.Sleep(10 * time.Millisecond)
    }
    returnChan <- fmt.Errorf("The End")
}

游乐场

这篇关于Golang并发,处理一批项目的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆