Dataflow 作业中的 ModuleNotFoundError [英] ModuleNotFoundError in Dataflow job

查看:36
本文介绍了Dataflow 作业中的 ModuleNotFoundError的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试在 Google Cloud Platform 中将 apache 光束管道作为数据流作业执行.

I am trying to execute a apache beam pipeline as a dataflow job in Google Cloud Platform.

我的项目结构如下:

root_dir/
  __init__.py
  ​setup.py
  ​main.py
  ​utils/
    __init__.py
    log_util.py
    config_util.py

这是我的 setup.py

Here's my setup.py

setuptools.setup(
   name='dataflow_example',
   version='1.0',
   install_requires=[
        "google-cloud-tasks==2.2.0",
        "google-cloud-pubsub>=0.1.0",
        "google-cloud-storage==1.39.0",
        "google-cloud-bigquery==2.6.2",
        "google-cloud-secret-manager==2.0.0",
        "google-api-python-client==2.3.0",
        "oauth2client==4.1.3",
        "apache-beam[gcp]>=2.20.0",
        "wheel>=0.36.2"
   ],
   packages=setuptools.find_packages()
)

这是我的管道代码:

import math
import apache_beam as beam

from datetime import datetime
from apache_beam.options.pipeline_options import PipelineOptions

from utils.log_util import LogUtil
from utils.config_util import ConfigUtil


class DataflowExample:
    config = {}

    def __init__(self):
        self.config = ConfigUtil.get_config(module_config=["config"])
        self.project = self.config['project']
        self.region = self.config['location']
        self.bucket = self.config['core_bucket']
        self.batch_size = 10

    def execute_pipeline(self):
        try:
            LogUtil.log_n_notify(log_type="info", msg=f"Dataflow started")

            query = "SELECT id, name, company FROM `<bigquery_table>` LIMIT 10"

            beam_options = {
                "project": self.project,
                "region": self.region,
                "job_name": "dataflow_example",
                "runner": "DataflowRunner",
                "temp_location": f"gs://{self.bucket}/temp_location/"
            }

            options = PipelineOptions(**beam_options, save_main_session=True)

            with beam.Pipeline(options=options) as pipeline:
                data = (
                        pipeline
                        | 'Read from BQ ' >> beam.io.Read(beam.io.ReadFromBigQuery(query=query, use_standard_sql=True))
                        | 'Count records' >> beam.combiners.Count.Globally()
                        | 'Print ' >> beam.ParDo(PrintCount(), self.batch_size)
                )

            LogUtil.log_n_notify(log_type="info", msg=f"Dataflow completed")
        except Exception as e:
            LogUtil.log_n_notify(log_type="error", msg=f"Exception in execute_pipeline - {str(e)}")


class PrintCount(beam.DoFn):

    def __init__(self):
        self.logger = LogUtil()

    def process(self, row_count, batch_size):
        try:
            current_date = datetime.today().date()
            total = int(math.ceil(row_count / batch_size))

            self.logger.log_n_notify(log_type="info", msg=f"Records pulled from table on {current_date} is {row_count}")

            self.logger.log_n_notify(log_type="info", msg=f"Records per batch: {batch_size}. Total batches: {total}")
        except Exception as e:
            self.logger.log_n_notify(log_type="error", msg=f"Exception in PrintCount.process  - {str(e)}")


if __name__ == "__main__":
    df_example = DataflowExample()
    df_example.execute_pipeline()

管道的功能是

  1. 针对 BigQuery 表进行查询.
  2. 计算从查询中获取的总记录数.
  3. 使用 utils 文件夹中的自定义日志模块打印.

我正在使用云外壳运行作业,使用 command - python3 - main.py

I am running the job using cloud shell using command - python3 - main.py

虽然 Dataflow 作业开始,但工作节点在几分钟后抛出错误,提示ModuleNotFoundError: No module named 'utils'"

Though the Dataflow job starts, the worker nodes throws error after few mins saying "ModuleNotFoundError: No module named 'utils'"

实用程序"文件夹可用,并且相同的代码在使用DirectRunner"执行时可以正常工作.

"utils" folder is available and the same code works fine when executed with "DirectRunner".

log_utilconfig_util 文件分别是用于日志记录和配置获取的自定义 util 文件.

log_util and config_util files are custom util files for logging and config fetching respectively.

另外,我尝试使用 setup_file 选项作为 python3 - main.py --setup_file </path/of/setup.py> 运行,这使得工作只是冻结,即使在 15 分钟后也不会继续.

Also, I tried running with setup_file options as python3 - main.py --setup_file </path/of/setup.py> which makes the job to just freeze and does not proceed even after 15 mins.

如何使用DataflowRunner"解决 ModuleNotFoundError 问题?

How do I resolve the ModuleNotFoundError with "DataflowRunner"?

推荐答案

作为社区 wiki 发布.正如@GopinathS 所确认的,错误和修复如下:

Posting as community wiki. As confirmed by @GopinathS the error and fix are as follows:

工作人员遇到的错误是 Beam SDK 基础版本 2.32.0 与 Dataflow Python 工作人员版本 2.28.0 不匹配.请检查 Dataflow 工作器启动日志并确保安装了正确版本的 Beam SDK.

The error encountered by the workers is Beam SDK base version 2.32.0 does not match Dataflow Python worker version 2.28.0. Please check Dataflow worker startup logs and make sure that correct version of Beam SDK is installed.

要修复此apache-beam[gcp]>=2.20.0"已从 setup.py 的 install_requires 中删除,因为 '>=' 正在分配最新的可用版本(截至撰写本文时为 2.32.0),同时工人版本只有 2.28.0.

To fix this "apache-beam[gcp]>=2.20.0" is removed from install_requires of setup.py since, the '>=' is assigning the latest available version (2.32.0 as of this writing) while the workers version are only 2.28.0.

更新 setup.py:

Updated setup.py:

setuptools.setup(
   name='dataflow_example',
   version='1.0',
   install_requires=[
        "google-cloud-tasks==2.2.0",
        "google-cloud-pubsub>=0.1.0",
        "google-cloud-storage==1.39.0",
        "google-cloud-bigquery==2.6.2",
        "google-cloud-secret-manager==2.0.0",
        "google-api-python-client==2.3.0",
        "oauth2client==4.1.3", # removed apache-beam[gcp]>=2.20.0
        "wheel>=0.36.2"
   ],
   packages=setuptools.find_packages()
)

更新了管道代码中的beam_options:

    beam_options = {
        "project": self.project,
        "region": self.region,
        "job_name": "dataflow_example",
        "runner": "DataflowRunner",
        "temp_location": f"gs://{self.bucket}/temp_location/",
        "setup_file": "./setup.py"
    }

还要确保一次传递所有管道选项,而不是部分传递.

Also make sure that you pass all the pipeline options at once and not partially.

如果您在命令中传递 --setup_file </path/of/setup.py> 然后确保读取安装文件路径并将其附加到已定义的 beam_options 在代码中使用 argument_parser 的变量.

If you pass --setup_file </path/of/setup.py> in the command then make sure to read and append the setup file path into the already defined beam_options variable using argument_parser in your code.

为了避免解析参数并将其附加到 beam_options 中,我将其直接添加到 beam_options 中作为 setup_file":./setup.py";

To avoid parsing the argument and appending into beam_options I instead added it directly in beam_options as "setup_file": "./setup.py"

这篇关于Dataflow 作业中的 ModuleNotFoundError的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆