使用 Apache Beam 笔记本启动 Dataflow 作业时处理名称错误 [英] Handle nameerrors when launching Dataflow jobs with Apache Beam notebooks

查看:38
本文介绍了使用 Apache Beam 笔记本启动 Dataflow 作业时处理名称错误的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

当我运行 示例笔记本 Dataflow_Word_count.ipynb 在 Google Cloud Platform 的网站上提供,我可以使用 Apache Beam notebooks 启动一个 Dataflow 作业并且该作业成功完成.管道定义如下.

When I run the example notebook Dataflow_Word_count.ipynb available on Google Cloud Platform's website, I can launch a Dataflow job using Apache Beam notebooks and the job completes successfully. The pipeline is define as follows.

class ReadWordsFromText(beam.PTransform):
    
    def __init__(self, file_pattern):
        self._file_pattern = file_pattern
    
    def expand(self, pcoll):
        return (pcoll.pipeline
                | beam.io.ReadFromText(self._file_pattern)
                | beam.FlatMap(lambda line: re.findall(r'[\w\']+', line.strip(), re.UNICODE)))
    
p = beam.Pipeline(InteractiveRunner())

words = p | 'read' >> ReadWordsFromText('gs://apache-beam-samples/shakespeare/kinglear.txt')

counts = (words 
          | 'count' >> beam.combiners.Count.PerElement())

lower_counts = (words
                | "lower" >> beam.Map(lambda word: word.lower())
                | "lower_count" >> beam.combiners.Count.PerElement()

如果我使用新函数重构提取单词的部分,如下所示

If I refactor the part that extract the words using a new function as follows

def extract_words(line):
    return re.findall(r'[\w\']+', line.strip(), re.UNICODE)

class ReadWordsFromText(beam.PTransform):
    
    def __init__(self, file_pattern):
        self._file_pattern = file_pattern
    
    def expand(self, pcoll):
        return (pcoll.pipeline
                | beam.io.ReadFromText(self._file_pattern)
                | beam.FlatMap(lambda line: extract_words(line)))

并运行笔记本,我收到以下错误消息:

and run the notebook I get the following error message:

DataflowRuntimeException: Dataflow pipeline failed. State: FAILED, Error:
Traceback (most recent call last):
  File "apache_beam/runners/common.py", line 1213, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 570, in apache_beam.runners.common.SimpleInvoker.invoke_process
  File "<ipython-input-3-d48b3d7d5e4f>", line 12, in <lambda>
NameError: name 'extract_words' is not defined

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/lib/python3.7/site-packages/dataflow_worker/batchworker.py", line 638, in do_work
    work_executor.execute()
  File "/usr/local/lib/python3.7/site-packages/dataflow_worker/executor.py", line 179, in execute
    op.start()
  File "dataflow_worker/native_operations.py", line 38, in dataflow_worker.native_operations.NativeReadOperation.start
  File "dataflow_worker/native_operations.py", line 39, in dataflow_worker.native_operations.NativeReadOperation.start
  File "dataflow_worker/native_operations.py", line 44, in dataflow_worker.native_operations.NativeReadOperation.start
  File "dataflow_worker/native_operations.py", line 54, in dataflow_worker.native_operations.NativeReadOperation.start
  File "apache_beam/runners/worker/operations.py", line 356, in apache_beam.runners.worker.operations.Operation.output
  File "apache_beam/runners/worker/operations.py", line 218, in apache_beam.runners.worker.operations.SingletonConsumerSet.receive
  File "apache_beam/runners/worker/operations.py", line 703, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/worker/operations.py", line 704, in apache_beam.runners.worker.operations.DoOperation.process
  File "apache_beam/runners/common.py", line 1215, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 1294, in apache_beam.runners.common.DoFnRunner._reraise_augmented
  File "/usr/local/lib/python3.7/site-packages/future/utils/__init__.py", line 446, in raise_with_traceback
    raise exc.with_traceback(traceback)
  File "apache_beam/runners/common.py", line 1213, in apache_beam.runners.common.DoFnRunner.process
  File "apache_beam/runners/common.py", line 570, in apache_beam.runners.common.SimpleInvoker.invoke_process
  File "<ipython-input-3-d48b3d7d5e4f>", line 12, in <lambda>
NameError: name 'extract_words' is not defined [while running '[3]: read/FlatMap(<lambda at <ipython-input-3-d48b3d7d5e4f>:12>)']

Note: imports, functions and other variables defined in the global context of your __main__ file of your Dataflow pipeline are, by default, not available in the worker execution environment, and such references will cause a NameError, unless the --save_main_session pipeline option is set to True. Please see https://cloud.google.com/dataflow/faq#how-do-i-handle-nameerrors for additional documentation on configuring your worker execution environment.

为了处理名称错误,我按照说明并添加以下行

To handle nameerrors I follow the instructions and add the following line

options.view_as(SetupOptions).save_main_session=True

但是当我运行笔记本时出现以下错误

but I get the following error when I run the notebook

DataflowRuntimeException: Dataflow pipeline failed. State: FAILED, Error:
Traceback (most recent call last):
  File "/usr/local/lib/python3.7/site-packages/dataflow_worker/batchworker.py", line 760, in run
    self._load_main_session(self.local_staging_directory)
  File "/usr/local/lib/python3.7/site-packages/dataflow_worker/batchworker.py", line 501, in _load_main_session
    pickler.load_session(session_file)
  File "/usr/local/lib/python3.7/site-packages/apache_beam/internal/pickler.py", line 307, in load_session
    return dill.load_session(file_path)
  File "/usr/local/lib/python3.7/site-packages/dill/_dill.py", line 368, in load_session
    module = unpickler.load()
  File "/usr/local/lib/python3.7/site-packages/dill/_dill.py", line 472, in load
    obj = StockUnpickler.load(self)
  File "/usr/local/lib/python3.7/site-packages/dill/_dill.py", line 462, in find_class
    return StockUnpickler.find_class(self, module, name)
ModuleNotFoundError: No module named 'IPython'

有没有简单的方法可以解决这个问题?

Is there an easy way to fix this issue?

推荐答案

不是使用 save_main_session,而是在 ReadWordsFromText 复合转换之外解压缩提取的单词.例子如下:

Instead of using save_main_session, unpack the extract words outside ReadWordsFromText composite transform. Here is the example:

def extract_words(line):
    return re.findall(r'[\w\']+', line.strip(), re.UNICODE)

class ReadWordsFromText(beam.PTransform):
    
    def __init__(self, file_pattern):
        self._file_pattern = file_pattern
    
    def expand(self, pcoll):
        return (pcoll.pipeline
                | beam.io.ReadFromText(self._file_pattern)    
                )
    
words = (p | 'read' >> ReadWordsFromText('gs://apache-beam-samples/shakespeare/kinglear.txt')
           | 'Extract' >> beam.FlatMap(extract_words)
        )
counts = (words 
          | 'count' >> beam.combiners.Count.PerElement())

这篇关于使用 Apache Beam 笔记本启动 Dataflow 作业时处理名称错误的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆