如何实现基于行的文件内容的并行处理 [英] How to implement parallel processing of line-based file content

查看:61
本文介绍了如何实现基于行的文件内容的并行处理的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在编写POC来处理非常大的文本文件〜10亿+行,并为此进行了Go的实验;

I'm writing a POC to process a very large text file ~1 billion+ lines and am experimenting with Go for this;

package main

import (
        "bufio"
        "fmt"
        "log"
        "os"
        "time"
)

func main() {
        start := time.Now()
        file, err := os.Open("dump10.txt")
        if err != nil {
                log.Fatal(err)
        }
        defer file.Close()

        scanner := bufio.NewScanner(file)
        for scanner.Scan() {
                go fmt.Println(scanner.Text())
        }

        if err := scanner.Err(); err != nil {
                log.Fatal(err)
        }
        secs := time.Since(start).Seconds()
        fmt.Printf("Took %.2fs", secs)
}

但是运行此命令时却出现此错误;

However when running this I get this error;

紧急:单个文件或套接字上的并发操作过多(最大1048575)

panic: too many concurrent operations on a single file or socket (max 1048575)

我没有在网上找到任何可以解决此特定错误的信息.我不确定这是否是文件描述符问题,错误中列出的最大值远高于我的ulimit -n限制500,000.

I haven't found anything online that deals with this specific error. I'm not sure if it's a file descriptors issue, the maximum listed in the error is much higher than my ulimit -n limit of 500,000.

做到这一点的最佳方法是什么?

What is the best way to do this?

不太明显,fmt.Println是我在处理数据时将调用的实际函数的替身.

As it's not obvious, fmt.Println is a stand-in for the actual function I will call when processing the data.

推荐答案

在考虑并行化进程之前,您应该研究输入和计算以确保它有意义.

Before considering to parallelize a process, you should study your input and computations to make sure that it makes sense.

需要按顺序处理的输入不是很好的匹配,因为并行处理将需要其他复杂的指令来使顺序保持顺序,因此很难预先评估这种策略是否会获胜.

An input that requires to be processed in order is not a good match because parallel processing would require additional complex instructions to keep things in order, it is difficult to evaluate upfront if this strategy will be a win.

此外,为了利用并行化,要运行的计算必须比同步并行任务所需的时间更长.通过批量处理数据可能会超过此成本,但是生成的算法将更加复杂,并会产生其他不利的副作用(例如分配).

Also in order to take advantage of parallelization, the computations to run must take longer than the time required to synchronize the parallel tasks. It is possible to outweigh this cost by bulking the data, but the resulting algorithm will be more complex and creates additional undesired side effects (like allocations).

否则,不要并行化.

请参见下面的各种实现示例,这些示例具有长/短的计算时间及其基准.

See below example of various implementations with long/short computations times and their resulting benchmark.

结论是,除非您计算出一个长时间运行的异步任务,该任务显然会超过同步成本,否则顺序处理会更快.

The conclusion is that unless you compute a long running asynchronous task that will clearly outweigh the synchronization costs, sequential processing is faster.

main.go

package main

import (
    "bufio"
    "fmt"
    "io"
    "runtime"
    "strings"
    "sync"
    "time"
)

func main() {
    data := strings.Repeat(strings.Repeat("a", 1000)+"\n", 1000)
    run_line_short(data, true)
    run_line_long(data, true)
    run_line_short_workers(data, true)
    run_line_long_workers(data, true)
    run_bulk_short(data, true)
    run_bulk_long(data, true)
    run_seq_short(data, true)
    run_seq_long(data, true)
}

func run_line_short(data string, stat bool) {
    if stat {
        s := stats("run_line_short")
        defer s()
    }
    r := strings.NewReader(data)
    err := process(r, line_handler_short)
    if err != nil {
        panic(err)
    }
}
func run_line_long(data string, stat bool) {
    if stat {
        s := stats("run_line_long")
        defer s()
    }
    r := strings.NewReader(data)
    err := process(r, line_handler_long)
    if err != nil {
        panic(err)
    }
}
func run_line_short_workers(data string, stat bool) {
    if stat {
        s := stats("run_line_short_workers")
        defer s()
    }
    r := strings.NewReader(data)
    err := processWorkers(r, line_handler_short)
    if err != nil {
        panic(err)
    }
}
func run_line_long_workers(data string, stat bool) {
    if stat {
        s := stats("run_line_long_workers")
        defer s()
    }
    r := strings.NewReader(data)
    err := processWorkers(r, line_handler_long)
    if err != nil {
        panic(err)
    }
}
func run_bulk_short(data string, stat bool) {
    if stat {
        s := stats("run_bulk_short")
        defer s()
    }
    r := strings.NewReader(data)
    err := processBulk(r, bulk_handler_short)
    if err != nil {
        panic(err)
    }
}
func run_bulk_long(data string, stat bool) {
    if stat {
        s := stats("run_bulk_long")
        defer s()
    }
    r := strings.NewReader(data)
    err := processBulk(r, bulk_handler_long)
    if err != nil {
        panic(err)
    }
}
func run_seq_short(data string, stat bool) {
    if stat {
        s := stats("run_seq_short")
        defer s()
    }
    r := strings.NewReader(data)
    err := processSeq(r, line_handler_short)
    if err != nil {
        panic(err)
    }
}
func run_seq_long(data string, stat bool) {
    if stat {
        s := stats("run_seq_long")
        defer s()
    }
    r := strings.NewReader(data)
    err := processSeq(r, line_handler_long)
    if err != nil {
        panic(err)
    }
}

func line_handler_short(k string) error {
    _ = len(k)
    return nil
}

func line_handler_long(k string) error {
    <-time.After(time.Millisecond * 5)
    _ = len(k)
    return nil
}

func bulk_handler_short(b []string) error {
    for _, k := range b {
        _ = len(k)
    }
    return nil
}

func bulk_handler_long(b []string) error {
    <-time.After(time.Millisecond * 5)
    for _, k := range b {
        _ = len(k)
    }
    return nil
}

func stats(name string) func() {
    fmt.Printf("======================\n")
    fmt.Printf("%v\n", name)
    start := time.Now()
    return func() {
        fmt.Printf("time to run %v\n", time.Since(start))
        var ms runtime.MemStats
        runtime.ReadMemStats(&ms)
        fmt.Printf("Alloc: %d MB, TotalAlloc: %d MB, Sys: %d MB\n",
            ms.Alloc/1024/1024, ms.TotalAlloc/1024/1024, ms.Sys/1024/1024)
        fmt.Printf("Mallocs: %d, Frees: %d\n",
            ms.Mallocs, ms.Frees)
        fmt.Printf("HeapAlloc: %d MB, HeapSys: %d MB, HeapIdle: %d MB\n",
            ms.HeapAlloc/1024/1024, ms.HeapSys/1024/1024, ms.HeapIdle/1024/1024)
        fmt.Printf("HeapObjects: %d\n", ms.HeapObjects)
        fmt.Printf("\n")
    }
}

func process(r io.Reader, h func(string) error) error {
    errs := make(chan error)
    workers := make(chan struct{}, 4)
    var wg sync.WaitGroup
    go func() {
        scanner := bufio.NewScanner(r)
        for scanner.Scan() {
            workers <- struct{}{} // acquire a token
            wg.Add(1)
            go func(line string) {
                defer wg.Done()
                if err := h(line); err != nil {
                    errs <- err
                }
                <-workers
            }(scanner.Text())
        }
        wg.Wait()
        if err := scanner.Err(); err != nil {
            errs <- err
        }
        close(errs)
    }()
    var err error
    for e := range errs {
        if e != nil && err == nil {
            err = e
        }
    }
    return err
}

func processWorkers(r io.Reader, h func(string) error) error {
    errs := make(chan error)
    input := make(chan string)
    y := 4
    var wg sync.WaitGroup
    for i := 0; i < y; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            for line := range input {
                if err := h(line); err != nil {
                    errs <- err
                }
            }
        }()
    }
    go func() {
        scanner := bufio.NewScanner(r)
        for scanner.Scan() {
            input <- scanner.Text()
        }
        close(input)
        wg.Wait()
        if err := scanner.Err(); err != nil {
            errs <- err
        }
        close(errs)
    }()
    var err error
    for e := range errs {
        if err == nil && e != nil {
            err = e
        }
    }
    return err
}

func processBulk(r io.Reader, h func([]string) error) error {
    errs := make(chan error)
    input := make(chan []string)
    y := 4
    var wg sync.WaitGroup
    for i := 0; i < y; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            for bulk := range input {
                if err := h(bulk); err != nil {
                    errs <- err
                }
            }
        }()
    }
    go func() {
        scanner := bufio.NewScanner(r)
        l := 50
        bulk := make([]string, l)
        i := 0
        for scanner.Scan() {
            text := scanner.Text()
            bulk[i] = text
            i++
            if i == l {
                copied := make([]string, l, l)
                copy(copied, bulk)
                i = 0
                input <- copied
            }
        }
        if i > 0 {
            input <- bulk[:i]
        }
        close(input)
        if err := scanner.Err(); err != nil {
            errs <- err
        }
    }()
    go func() {
        wg.Wait()
        close(errs)
    }()
    var err error
    for e := range errs {
        if err == nil && e != nil {
            err = e
        }
    }
    return err
}

func processSeq(r io.Reader, h func(string) error) error {
    scanner := bufio.NewScanner(r)
    for scanner.Scan() {
        text := scanner.Text()
        if err := h(text); err != nil {
            return err
        }
    }
    return scanner.Err()
}

main_test.go

package main

import (
    "strings"
    "testing"
)

func Benchmark_run_line_short(b *testing.B) {
    data := strings.Repeat(strings.Repeat("a", 1000)+"\n", 1000)
    for i := 0; i < b.N; i++ {
        run_line_short(data, false)
    }
}

func Benchmark_run_line_long(b *testing.B) {
    data := strings.Repeat(strings.Repeat("a", 1000)+"\n", 1000)
    for i := 0; i < b.N; i++ {
        run_line_long(data, false)
    }
}
func Benchmark_run_line_short_workers(b *testing.B) {
    data := strings.Repeat(strings.Repeat("a", 1000)+"\n", 1000)
    for i := 0; i < b.N; i++ {
        run_line_short_workers(data, false)
    }
}
func Benchmark_run_line_long_workers(b *testing.B) {
    data := strings.Repeat(strings.Repeat("a", 1000)+"\n", 1000)
    for i := 0; i < b.N; i++ {
        run_line_long_workers(data, false)
    }
}
func Benchmark_run_bulk_short(b *testing.B) {
    data := strings.Repeat(strings.Repeat("a", 1000)+"\n", 1000)
    for i := 0; i < b.N; i++ {
        run_bulk_short(data, false)
    }
}
func Benchmark_run_bulk_long(b *testing.B) {
    data := strings.Repeat(strings.Repeat("a", 1000)+"\n", 1000)
    for i := 0; i < b.N; i++ {
        run_bulk_long(data, false)
    }
}
func Benchmark_run_seq_short(b *testing.B) {
    data := strings.Repeat(strings.Repeat("a", 1000)+"\n", 1000)
    for i := 0; i < b.N; i++ {
        run_seq_short(data, false)
    }
}
func Benchmark_run_seq_long(b *testing.B) {
    data := strings.Repeat(strings.Repeat("a", 1000)+"\n", 1000)
    for i := 0; i < b.N; i++ {
        run_seq_long(data, false)
    }
}

结果

$ go run main.go 
======================
run_line_short
time to run 2.747827ms
Alloc: 2 MB, TotalAlloc: 2 MB, Sys: 68 MB
Mallocs: 1378, Frees: 1
HeapAlloc: 2 MB, HeapSys: 63 MB, HeapIdle: 61 MB
HeapObjects: 1377

======================
run_line_long
time to run 1.30987804s
Alloc: 3 MB, TotalAlloc: 3 MB, Sys: 68 MB
Mallocs: 5619, Frees: 5
HeapAlloc: 3 MB, HeapSys: 63 MB, HeapIdle: 59 MB
HeapObjects: 5614

======================
run_line_short_workers
time to run 4.54926ms
Alloc: 1 MB, TotalAlloc: 4 MB, Sys: 68 MB
Mallocs: 6648, Frees: 5743
HeapAlloc: 1 MB, HeapSys: 63 MB, HeapIdle: 61 MB
HeapObjects: 905

======================
run_line_long_workers
time to run 1.29874118s
Alloc: 2 MB, TotalAlloc: 5 MB, Sys: 68 MB
Mallocs: 10670, Frees: 5747
HeapAlloc: 2 MB, HeapSys: 63 MB, HeapIdle: 60 MB
HeapObjects: 4923

======================
run_bulk_short
time to run 1.279059ms
Alloc: 3 MB, TotalAlloc: 6 MB, Sys: 68 MB
Mallocs: 11695, Frees: 5751
HeapAlloc: 3 MB, HeapSys: 63 MB, HeapIdle: 59 MB
HeapObjects: 5944

======================
run_bulk_long
time to run 31.328652ms
Alloc: 1 MB, TotalAlloc: 7 MB, Sys: 68 MB
Mallocs: 12728, Frees: 11361
HeapAlloc: 1 MB, HeapSys: 63 MB, HeapIdle: 61 MB
HeapObjects: 1367

======================
run_seq_short
time to run 956.991µs
Alloc: 3 MB, TotalAlloc: 8 MB, Sys: 68 MB
Mallocs: 13746, Frees: 11160
HeapAlloc: 3 MB, HeapSys: 63 MB, HeapIdle: 59 MB
HeapObjects: 2586

======================
run_seq_long
time to run 5.195705859s
Alloc: 1 MB, TotalAlloc: 9 MB, Sys: 68 MB
Mallocs: 17766, Frees: 15973
HeapAlloc: 1 MB, HeapSys: 63 MB, HeapIdle: 61 MB
HeapObjects: 1793

[mh-cbon@Host-001 bulk] $ go test -bench=. -benchmem -count=4
goos: linux
goarch: amd64
pkg: test/bulk
Benchmark_run_line_short-4                  1000       1750824 ns/op     1029354 B/op       1005 allocs/op
Benchmark_run_line_short-4                  1000       1747408 ns/op     1029348 B/op       1005 allocs/op
Benchmark_run_line_short-4                  1000       1757826 ns/op     1029352 B/op       1005 allocs/op
Benchmark_run_line_short-4                  1000       1758427 ns/op     1029352 B/op       1005 allocs/op
Benchmark_run_line_long-4                      1    1303037704 ns/op     2253776 B/op       4075 allocs/op
Benchmark_run_line_long-4                      1    1305074974 ns/op     2247792 B/op       4032 allocs/op
Benchmark_run_line_long-4                      1    1305353658 ns/op     2246320 B/op       4013 allocs/op
Benchmark_run_line_long-4                      1    1305725817 ns/op     2247792 B/op       4031 allocs/op
Benchmark_run_line_short_workers-4          1000       2148354 ns/op     1029366 B/op       1005 allocs/op
Benchmark_run_line_short_workers-4          1000       2139629 ns/op     1029370 B/op       1005 allocs/op
Benchmark_run_line_short_workers-4          1000       1983352 ns/op     1029359 B/op       1005 allocs/op
Benchmark_run_line_short_workers-4          1000       1909968 ns/op     1029363 B/op       1005 allocs/op
Benchmark_run_line_long_workers-4              1    1298321093 ns/op     2247856 B/op       4013 allocs/op
Benchmark_run_line_long_workers-4              1    1299846127 ns/op     2246384 B/op       4012 allocs/op
Benchmark_run_line_long_workers-4              1    1300003625 ns/op     2246288 B/op       4011 allocs/op
Benchmark_run_line_long_workers-4              1    1302779911 ns/op     2246256 B/op       4011 allocs/op
Benchmark_run_bulk_short-4                  2000        704358 ns/op     1082154 B/op       1011 allocs/op
Benchmark_run_bulk_short-4                  2000        708563 ns/op     1082147 B/op       1011 allocs/op
Benchmark_run_bulk_short-4                  2000        714687 ns/op     1082148 B/op       1011 allocs/op
Benchmark_run_bulk_short-4                  2000        705546 ns/op     1082156 B/op       1011 allocs/op
Benchmark_run_bulk_long-4                     50      31411412 ns/op     1051497 B/op       1088 allocs/op
Benchmark_run_bulk_long-4                     50      31513018 ns/op     1051544 B/op       1088 allocs/op
Benchmark_run_bulk_long-4                     50      31539311 ns/op     1051502 B/op       1088 allocs/op
Benchmark_run_bulk_long-4                     50      31564940 ns/op     1051505 B/op       1088 allocs/op
Benchmark_run_seq_short-4                   2000        574346 ns/op     1028632 B/op       1002 allocs/op
Benchmark_run_seq_short-4                   3000        572857 ns/op     1028464 B/op       1002 allocs/op
Benchmark_run_seq_short-4                   2000        580493 ns/op     1028632 B/op       1002 allocs/op
Benchmark_run_seq_short-4                   3000        572240 ns/op     1028464 B/op       1002 allocs/op
Benchmark_run_seq_long-4                       1    5196313302 ns/op     2245792 B/op       4005 allocs/op
Benchmark_run_seq_long-4                       1    5199995649 ns/op     2245792 B/op       4005 allocs/op
Benchmark_run_seq_long-4                       1    5200460425 ns/op     2245792 B/op       4005 allocs/op
Benchmark_run_seq_long-4                       1    5201080570 ns/op     2245792 B/op       4005 allocs/op
PASS
ok      test/bulk   68.944s

注意:令我惊讶的是,run_line_short_workersrun_line_short慢一点,我没有解释这个结果,但是使用pprof进行更深入的分析应该可以提供答案.

notes: to my surprise, run_line_short_workers is slightly slower than run_line_short, i don't explain that result, however a deeper analysis using pprof should provide the answer.

这篇关于如何实现基于行的文件内容的并行处理的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆