如何将我的腌制ML模型从GCS加载到Dataflow/Apache梁 [英] How to load my pickeled ML model from GCS to Dataflow/Apache beam

查看:69
本文介绍了如何将我的腌制ML模型从GCS加载到Dataflow/Apache梁的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我已经在本地开发了一个Apache光束管道,在这里我可以对样本文件进行预测.

I've developed an apache beam pipeline locally where I run predictions on a sample file.

在本地计算机上,我可以像这样加载模型:

Locally on my computer I can load the model like this:

with open('gs://newbucket322/my_dumped_classifier.pkl', 'rb') as fid:
     gnb_loaded = cPickle.load(fid)

但是在google dataflow上运行时显然不起作用.我尝试将路径更改为GS://,但是显然也行不通.

but when running on google dataflow that doesn't obviously work. I tried changing the path to GS:// but that also obviously does not work.

我也尝试了此代码段(

I also tried this code snippet (from here) that was used to load files:

class ReadGcsBlobs(beam.DoFn):
    def process(self, element, *args, **kwargs):
        from apache_beam.io.gcp import gcsio
        gcs = gcsio.GcsIO()
        yield (element, gcs.open(element).read())

model = (p
     | "Initialize" >> beam.Create(["gs://bucket/file.pkl"])
     | "Read blobs" >> beam.ParDo(ReadGcsBlobs())
    )

但是当要加载我的模型时这不起作用,或者至少我不能使用此模型变量来调用预测方法.

but that doesn't work when wanting to load my model, or atleast I cannot use this model variable to call the predict method.

应该是一个非常简单的任务,但是我似乎找不到一个简单的答案.

Should be a pretty straightforward task but I can't seem to find a straightforward answer to this.

推荐答案

您可以按以下方式定义ParDo

You can define a ParDo as below

class PerdictOutcome(beam.DoFn):
    """ Format the input to the desired shape"""

    def __init__(self, project=None, bucket_name=None, model_path=None, destination_name=None):
        self._model = None
        self._project = project
        self._bucket_name = bucket_name
        self._model_path = model_path
        self._destination_name = destination_name

    def download_blob(bucket_name=None, source_blob_name=None, project=None, destination_file_name=None):
        """Downloads a blob from the bucket."""
        destination_file_name = source_blob_name
        storage_client = storage.Client(<gs://path">)
        bucket = storage_client.get_bucket(bucket_name)
        blob = bucket.blob(source_blob_name)

        blob.download_to_filename(destination_file_name)
    # Load once or very few times
    def setup(self):
        logging.info(
            "Model Initialization {}".format(self._model_path))
        download_blob(bucket_name=self._bucket_name, source_blob_name=self._model_path,
                      project=self._project, destination_file_name=self._destination_name)
        # unpickle model model
        self._model = pickle.load(open(self._destination_name, 'rb'))

    def process(self, element):
        element["prediction"] = self._model.predict(element["data"])
        return [element]

然后您可以在管道中调用此ParDo,如下所示:-

Then you can invoke this ParDo in your pipeline as below:-

    model = (p
         | "Read Files" >> TextIO...
         | "Run Predictions" >> beam.ParDo(PredictSklearn(project=known_args.bucket_project_id, bucket_name=known_args.bucket_name, model_path=known_args.model_path, destination_name=known_args.destination_name)
      )

这篇关于如何将我的腌制ML模型从GCS加载到Dataflow/Apache梁的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆