如何在从Datalab运行的数据流管道中使用Google云存储 [英] How to use google cloud storage in dataflow pipeline run from datalab

查看:56
本文介绍了如何在从Datalab运行的数据流管道中使用Google云存储的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我们一直在datalab中运行一个Python管道,该管道从Google云存储中的存储桶中读取图像文件(导入google.datalab.storage).最初,我们使用DirectRunner,但效果很好,但是现在,我们尝试使用DataflowRunner,并且遇到导入错误.即使我们在管道运行的函数中包括"import google.datalab.storage"或其任何变体,也会出现诸如没有名为'datalab.storage'的模块"之类的错误.我们还尝试了使用save_main_session,requirements_file和setup_file标志,但没有成功.我们如何正确访问数据流管道中云存储桶中的图像文件?

We've been running a Python pipeline in datalab that reads image files from a bucket in google cloud storage (importing google.datalab.storage). Originally we were using DirectRunner and this worked fine, but now we're trying to use DataflowRunner, and we're having import errors. Even if we include "import google.datalab.storage" or any variant thereof inside the function run by the pipeline, we get errors such as "No module named 'datalab.storage'". We've also tried using the save_main_session, requirements_file, and setup_file flags with no luck. How would we correctly access image files in cloud storage buckets in a dataflow pipeline?

我最初的错误是由于使用错误的语法(例如"--requirements_file ./requirements.txt")指定了requirements_file标志.我想我已经在那里修复了语法,但是现在我遇到了另一个错误.这是我们尝试运行的代码的基本版本-我们有一个管道,可以从Google Cloud的存储桶中读取文件. 我们有一个datalab笔记本,其中的单元格包含以下Python代码:

My original error was due to specifying the requirements_file flag with incorrect syntax (i.e. "--requirements_file ./requirements.txt"). I think I've fixed the syntax there, but now I'm getting a different error. Here's a basic version of the code we're trying to run- we have a pipeline that reads files from a storage bucket in Google Cloud. We have a datalab notebook with a cell containing the following Python code:

import apache_beam as beam
from apache_beam.utils.pipeline_options import PipelineOptions
from apache_beam.utils.pipeline_options import GoogleCloudOptions
from apache_beam.utils.pipeline_options import StandardOptions
import google.datalab.storage as storage

bucket = "BUCKET_NAME"
shared_bucket = storage.Bucket(bucket)

# Create and set PipelineOptions. 
options = PipelineOptions(flags = ["--requirements_file", "./requirements.txt"])
google_cloud_options = options.view_as(GoogleCloudOptions)
google_cloud_options.project = "PROJECT_NAME"
google_cloud_options.job_name = 'test-pipeline-requirements'
google_cloud_options.staging_location = 'gs://BUCKET_NAME/binaries'
google_cloud_options.temp_location = 'gs://BUCKET_NAME/temp'
options.view_as(StandardOptions).runner = 'DataflowRunner'

def read_file(input_tuple):
  filepath = input_tuple[0]
  shared_object = shared_bucket.object(filepath)
  f = shared_object.read_stream()
  # More processing of f's contents
  return input_tuple

# File paths relative to the bucket
input_tuples = [("FILEPATH_1", "UNUSED_FILEPATH_2")]
p = beam.Pipeline(options = options)
all_files = (p | "Create file path tuple" >> beam.Create(input_tuples))
all_files = (all_files | "Read file" >> beam.FlatMap(read_file))
p.run()

与此同时,在与笔记本相同的目录中有一个名为"requirements.txt"的文件,只有一行

Meanwhile there is a file named "requirements.txt" in the same directory as the notebook, with only the line

datalab==1.0.1

如果我使用DirectRunner,此代码可以正常工作.但是,当我使用DataflowRunner时,在"p.run()"处出现CalledProcessError,堆栈跟踪以以下结尾:

This code works fine if I use DirectRunner. However, when I use DataflowRunner, I get a CalledProcessError at "p.run()", with stack trace ending with the following:

_populate_requirements_cache(requirements_file,cache_dir)中的

/usr/local/lib/python2.7/dist-packages/apache_beam/runners/dataflow/internal/dependency.pyc
224'--no-binary',':all:']
225 logging.info('执行命令:%s',cmd_args)
-> 226个进程.check_call(cmd_args)
227
228

/usr/local/lib/python2.7/dist-packages/apache_beam/runners/dataflow/internal/dependency.pyc in _populate_requirements_cache(requirements_file, cache_dir)
224 '--no-binary', ':all:']
225 logging.info('Executing command: %s', cmd_args)
--> 226 processes.check_call(cmd_args)
227
228

/usr/local/lib/python2.7/dist-packages/apache_beam/utils/processes.pyc in check_call(* args,** kwargs)
38 if force_shell:
39 kwargs ['shell'] =真实
---> 40 return subprocess.check_call(* args,** kwargs)
41
42

/usr/local/lib/python2.7/dist-packages/apache_beam/utils/processes.pyc in check_call(*args, **kwargs)
38 if force_shell:
39 kwargs['shell'] = True
---> 40 return subprocess.check_call(*args, **kwargs)
41
42

/usr/lib/python2.7/subprocess.pyc(* popenargs,** kwargs)
538如果cmd为None:
539 cmd = popenargs [0]
-> 540提高CalledProcessError(retcode,cmd)
541返回0
542

/usr/lib/python2.7/subprocess.pyc in check_call(*popenargs, **kwargs)
538 if cmd is None:
539 cmd = popenargs[0]
--> 540 raise CalledProcessError(retcode, cmd)
541 return 0
542

CalledProcessError:命令'['/usr/bin/python','-m','pip','install','--download','/tmp/dataflow-requirements-cache','-r ','./requirements.txt','--no-binary',':all:']'返回非零退出状态1

CalledProcessError: Command '['/usr/bin/python', '-m', 'pip', 'install', '--download', '/tmp/dataflow-requirements-cache', '-r', './requirements.txt', '--no-binary', ':all:']' returned non-zero exit status 1

似乎不建议对pip使用"--download"选项,但这是apache_beam代码的一部分.我也尝试过使用指定"requirements.txt"的不同方法,使用和不使用"--save_main_session"标志,使用和不使用"--setup_file"标志,但没有骰子.

It seems like the "--download" option is deprecated for pip, but that's part of the apache_beam code. I've also tried this with different ways of specifying "requirements.txt", with and without the "--save_main_session" flag, and with and without the "--setup_file" flag, but no dice.

推荐答案

如果您唯一的pydatalab用法是从GCS读取的,那么我建议您使用Dataflow的gcsio.代码示例:

If your only usage of pydatalab is to read from GCS, then I would suggest using Dataflow's gcsio. Code example:

def read_file(input_tuple):
  filepath = input_tuple[0]
  with beam.io.gcp.gcsio.GcsIO().open(filepath, 'r') as f:
    # process f content
    pass

# File paths relative to the bucket
input_tuples = [("gs://bucket/file.jpg", "UNUSED_FILEPATH_2")]
p = beam.Pipeline(options = options)
all_files = (p | "Create file path tuple" >> beam.Create(input_tuples))
all_files = (all_files | "Read file" >> beam.FlatMap(read_file))
p.run()

pydatalab非常繁重,因为它更多地是与Datalab或Jupyter一起使用的数据探索库.另一方面,管道本身就支持Dataflow的GCSIO.

pydatalab is pretty heavy since it is more of an data exploration library used with Datalab or Jupyter. On the other hand, Dataflow's GCSIO is natively supported in pipeline.

这篇关于如何在从Datalab运行的数据流管道中使用Google云存储的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆