使用Go SDK的云数据流上的并行性问题 [英] Parallelism Problem on Cloud Dataflow using Go SDK

查看:63
本文介绍了使用Go SDK的云数据流上的并行性问题的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我在Go SDK上有Apache Beam代码实现,如下所述.管道包含3个步骤.一个是textio.Read,另一个是CountLines,最后一步是ProcessLines. ProcessLines步骤大约需要10秒钟的时间.为了简洁起见,我只是添加了一个睡眠功能.

I have Apache Beam code implementation on Go SDK as described below. The pipeline has 3 steps. One is textio.Read, other one is CountLines and the last step is ProcessLines. ProcessLines step takes around 10 seconds time. I just added a Sleep function for the sake of brevity.

我正在与20名工人通电话.当我运行管道时,我期望有20个工作程序可以并行运行,并且textio.Read从文件中读取20行,而ProcessLines将在10秒内执行20个并行执行.但是,管道无法像这样工作.当前,它的工作方式是textio.Read从文件中读取一行,将数据推送到下一步,并等待,直到ProcessLines步骤完成其10秒钟的工作.没有并行性,并且整个管道中文件中只有一个行字符串.您能否澄清一下我在并行处理方面做错了什么?我应该如何更新代码以实现如上所述的并行性?

I am calling the pipeline with 20 workers. When I run the pipeline, my expectation was 20 workers would run in parallel and textio.Read read 20 lines from the file and ProcessLines would do 20 parallel executions in 10 seconds. However, the pipeline did not work like that. It's currently working in a way that textio.Read reads one line from the file, pushes the data to the next step and waits until ProcessLines step completes its 10 seconds work. There is no parallelism and there is only one line string from the file throughout the pipeline. Could you please clarify me what I'm doing wrong for parallelism? How should I update the code to achieve parallelism as described above?

package main

import (
    "context"
    "flag"
    "time"

    "github.com/apache/beam/sdks/go/pkg/beam"
    "github.com/apache/beam/sdks/go/pkg/beam/io/textio"
    "github.com/apache/beam/sdks/go/pkg/beam/log"
    "github.com/apache/beam/sdks/go/pkg/beam/x/beamx"
)

// metrics to be monitored
var (
    input         = flag.String("input", "", "Input file (required).")
    numberOfLines = beam.NewCounter("extract", "numberOfLines")
    lineLen       = beam.NewDistribution("extract", "lineLenDistro")
)

func countLines(ctx context.Context, line string) string {
    lineLen.Update(ctx, int64(len(line)))
    numberOfLines.Inc(ctx, 1)

    return line
}

func processLines(ctx context.Context, line string) {
    time.Sleep(10 * time.Second)
}

func CountLines(s beam.Scope, lines beam.PCollection) beam.PCollection 
{
    s = s.Scope("Count Lines")

    return beam.ParDo(s, countLines, lines)
}

func ProcessLines(s beam.Scope, lines beam.PCollection) {
    s = s.Scope("Process Lines")

    beam.ParDo0(s, processLines, lines)
}

func main() {
    // If beamx or Go flags are used, flags must be parsed first.
    flag.Parse()
    // beam.Init() is an initialization hook that must be called on startup. On
    // distributed runners, it is used to intercept control.
    beam.Init()

    // Input validation is done as usual. Note that it must be after Init().
    if *input == "" {
        log.Fatal(context.Background(), "No input file provided")
    }

    p := beam.NewPipeline()
    s := p.Root()

    l := textio.Read(s, *input)
    lines := CountLines(s, l)
    ProcessLines(s, lines)

    // Concept #1: The beamx.Run convenience wrapper allows a number of
    // pre-defined runners to be used via the --runner flag.
    if err := beamx.Run(context.Background(), p); err != nil {
        log.Fatalf(context.Background(), "Failed to execute job: %v", err.Error())
    }
}

在获得关于问题的答案可能是由融合引起的答案后,我更改了代码的相关部分,但此代码再次无法正常工作.

After I got the answer about the problem might be caused by fusion, I changed the related part of the code but it did not work again.

现在,第一步和第二步并行运行,但是第三步ProcessLines不能并行运行.我只做了以下更改.谁能告诉我问题出在哪里?

Now the first and second step is working in parallel, however the third step ProcessLines is not working in parallel. I only made the following changes. Can anyone tell me what the problem is?

func AddRandomKey(s beam.Scope, col beam.PCollection) beam.PCollection {
    return beam.ParDo(s, addRandomKeyFn, col)
}

func addRandomKeyFn(elm beam.T) (int, beam.T) {
    return rand.Int(), elm
}

func countLines(ctx context.Context, _ int, lines func(*string) bool, emit func(string)) {
    var line string
    for lines(&line) {
        lineLen.Update(ctx, int64(len(line)))
        numberOfLines.Inc(ctx, 1)
        emit(line)
    }
}
func processLines(ctx context.Context, _ int, lines func(*string) bool) {
    var line string
    for lines(&line) {
        time.Sleep(10 * time.Second)
        numberOfLinesProcess.Inc(ctx, 1)
    }
}

func CountLines(s beam.Scope, lines beam.PCollection) beam.PCollection {
    s = s.Scope("Count Lines")
    keyed := AddRandomKey(s, lines)
    grouped := beam.GroupByKey(s, keyed)

    return beam.ParDo(s, countLines, grouped)
}

func ProcessLines(s beam.Scope, lines beam.PCollection) {
    s = s.Scope("Process Lines")
    keyed := AddRandomKey(s, lines)
    grouped := beam.GroupByKey(s, keyed)

    beam.ParDo0(s, processLines, grouped)
}

推荐答案

MapReduce类型管道的许多高级运行器融合了可以在内存中一起运行的阶段. Apache Beam和Dataflow也不例外.

Many advanced runners of MapReduce-type pipelines fuse stages that can be run in memory together. Apache Beam and Dataflow are no exception.

这里发生的是流水线的三个步骤融合在一起,并且发生在同一台机器上.此外,不幸的是,Go SDK当前不支持拆分Read转换.

What's happening here is that the three steps of your pipeline are fused, and happening in the same machine. Furthermore, the Go SDK does not currently support splitting the Read transform, unfortunately.

要在第三次转换中实现并行性,可以 break ReadProcessLines之间的融合.为此,您可以在行中添加随机键,并进行GroupByKey转换.

To achieve parallelism in the third transform, you can break the fusion between Read and ProcessLines. You can do that adding a random key to your lines, and a GroupByKey transform.

在Python中,它将是:

In Python, it would be:

(p | beam.ReadFromText(...)
   | CountLines()
   | beam.Map(lambda x: (random.randint(0, 1000), x))
   | beam.GroupByKey()
   | beam.FlatMap(lambda k, v: v)  # Discard the key, and return the values
   | ProcessLines())

这将允许您并行化ProcessLines.

这篇关于使用Go SDK的云数据流上的并行性问题的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆