如何激活Dataflow Shuffle服务? [英] How to activate Dataflow Shuffle service?

查看:102
本文介绍了如何激活Dataflow Shuffle服务?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试使用数据流随机播放服务在python环境中,但是shuffle服务似乎无法正常运行,如下所示.

I am trying to use Dataflow Shuffle service in python environment, but it seems shuffle service is not working, as you can see below.

我将SDK版本设置为2.1以上,且区域为us-central1. 我以为我们可以通过添加实验选项来激活Dataflow Shuffle服务,我错过了什么吗?

I set the SDK version above 2.1, and the region is us-central1. I thought we can activate Dataflow Shuffle service by just adding experiments option, am I missing something?

下面是我测试过的代码,可以重现这种现象.

Following is the code I tested, you can reproduce the phenomenon.

import apache_beam as beam

options = beam.options.pipeline_options.PipelineOptions()
gcloud_options = options.view_as(
    beam.options.pipeline_options.GoogleCloudOptions)
gcloud_options.job_name = 'dataflow-shuffle-test'
gcloud_options.project = 'PROJECTID'
gcloud_options.staging_location = 'gs://BUCKET/staging'
gcloud_options.temp_location = 'gs://BUCKET/temp'

# maybe this is the wrong way?
debug_options = options.view_as(beam.options.pipeline_options.DebugOptions)
debug_options.experiments = 'shuffle_mode=service'

worker_options = options.view_as(beam.options.pipeline_options.WorkerOptions)
worker_options.disk_size_gb = 20
worker_options.max_num_workers = 2

options.view_as(beam.options.pipeline_options.StandardOptions).runner = 'DataflowRunner'

def modify_data2(kvpair):
    return {'name': kvpair[0],
            'sum': sum(kvpair[1])
            }


p = beam.Pipeline(options=options)

query = 'SELECT * FROM [bigquery-public-data:usa_names.usa_1910_current]'
(p | 'read' >> beam.io.Read(beam.io.BigQuerySource(project='PROJECTID', 
                                                   use_standard_sql=False, 
                                                   query=query))
   | 'pair' >> beam.Map(lambda x: (x['name'], x['number']))
   | "groupby" >> beam.GroupByKey()
   | 'modify' >> beam.Map(modify_data2)
   | 'write' >> beam.io.WriteToText('gs://BUCKET/test.txt', num_shards=1)
 )

p.run()

作业成功完成,没有任何错误.任何评论都会有所帮助!

The job finishes successfully, without any errors. Any comment would be helpful!

编辑
多亏了谢尔盖(Sergei)的回答,我发现了自己的错误.我误以为是实验选项.如下设置实验选项.

EDIT
Thanks to Sergei's answer, I found my mistake. What I mistook is the experiments option. Set the experiments option as below.

# set as list, instead of string.
debug_options.experiments = ['shuffle_mode=service']

我还想通过一个随机的服务运行一个简单的管道,该笔记本可以在Datalab上运行. https://gist.github.com/hayatoy/f6664f965a2519ec406e11235faf75b6

Also I made a gist to run a simple pipeline with shuffle service, the notebook can be runnable on Datalab. https://gist.github.com/hayatoy/f6664f965a2519ec406e11235faf75b6

推荐答案

@HayatoY,仅指定实验标记(--experiments shuffle_mode = service)就足够了.

@HayatoY, it should be enough to just specify the experiments flag (--experiments shuffle_mode=service).

Python SDK的Dataflow Shuffle服务从us-central1和europe-west1地区的2.1版本开始提供.

The Dataflow Shuffle Service is available with the Python SDK starting with the 2.1 version in the us-central1 and europe-west1 regions.

您可以检查UI的作业详细信息"页面中管道选项"窗格下是否看到实验行吗? (请参阅我的屏幕截图)

Can you check if you see the experiments line under the "Pipeline options" pane in the Job Details page in the UI? (see my screenshot)

我刚刚从命令行启动了一个简单的单词计数管道,并验证了它使用了Shuffle(度量标准为0,但这是正常的,因为wordcount管道只使用了很少的shuffle).只要该度量标准不是-",您就可以证明使用了随机播放服务.

I just launched a simple wordcount pipeline from the command line and validated that Shuffle was used by it (the metric is 0, but that's normal because the wordcount pipeline uses very little shuffle). As long as the metric is not a "-" you have proof that the Shuffle Service was used.

python -m apache_beam.examples.wordcount \ --project $ PROJECT_ID \ --runner DataflowRunner \ --staging_location $ BUCKET/staging \ --temp_location $ BUCKET/temp \ -输出$ BUCKET/输出\ --experiments shuffle_mode = service

python -m apache_beam.examples.wordcount \ --project $PROJECT_ID \ --runner DataflowRunner \ --staging_location $BUCKET/staging \ --temp_location $BUCKET/temp \ --output $BUCKET/output \ --experiments shuffle_mode=service

这篇关于如何激活Dataflow Shuffle服务?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆