大 numpy 矩阵作为数据流侧输入 [英] large numpy matrix as dataflow side input

查看:26
本文介绍了大 numpy 矩阵作为数据流侧输入的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试用 Python 编写一个 Dataflow 管道,它需要一个大的 numpy 矩阵作为辅助输入.矩阵保存在云存储中.理想情况下,每个 Dataflow 工作器都会直接从云存储中加载矩阵.

I'm trying to write a Dataflow pipeline in Python that requires a large numpy matrix as a side input. The matrix is saved in cloud storage. Ideally, each Dataflow worker would load the matrix directly from cloud storage.

我的理解是,如果我说matrix = np.load(LOCAL_PATH_TO_MATRIX),然后

My understanding is that if I say matrix = np.load(LOCAL_PATH_TO_MATRIX), and then

p | "computation" >> beam.Map(computation, matrix)

矩阵从我的笔记本电脑运送到每个 Datflow 工作人员.

the matrix get shipped from my laptop to each Datflow worker.

我该如何指导每个工作人员直接从云存储中加载矩阵?是否有二进制 blob"的光束源?

How could I instead direct each worker to load the matrix directly from cloud storage? Is there a beam source for "binary blob"?

推荐答案

你的方法是正确的.

在这种情况下,Dataflow 所做的是将 NumPy 矩阵作为侧输入进行处理.这意味着它从您的机器上传到服务一次,Dataflow 服务会将其发送给每个工作人员.

What Dataflow does, in this case, is handle the NumPy matrix as a side input. This means that it's uploaded once from your machine to the service, and the Dataflow service will send it to each worker.

鉴于矩阵很大,这将使您的工作人员使用 I/O 从服务接收它,并承担将整个矩阵保存在内存中的负担,但它应该可以工作.

Given that the matrix is large, this will make your workers use I/O to receive it from the service, and carry the burden of keeping the whole matrix in memory, but it should work.

如果您想避免在您的机器中计算/加载矩阵,您可以将您的矩阵作为文本文件上传到 GCS,读入该文件并获取矩阵.你可以这样做:

If you want to avoid computing/loading the matrix in your machine, you can upload your matrix to GCS as a text file, read that file in, and obtain the matrix. You can do something like so:

matrix_file = 'gs://mybucket/my/matrix'
p | beam.ParDo(ComputationDoFn(matrix_file))

而您的 DoFn 可能类似于:

And your DoFn could be something like:

class ComputationDoFn(beam.DoFn):
  def __init__(self, matrix_file):
    self._matrix_file = matrix_file
    self._matrix = None

  def start_bundle(self, element):
    # We check because one DoFn instance may be reused
    # for different bundles.
    if self._matrix is None:
      self.load_matrix(self._matrix_file)

  def process(self, element):
    # Now process the element

  def load_matrix(self, matrix_file):
    # Load the file from GCS using the GCS API

我希望这是有道理的.如果您觉得需要更多帮助,我可以充实这些功能.

I hope this makes sense. I can flesh up the functions if you feel like you need some more help.

这篇关于大 numpy 矩阵作为数据流侧输入的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆