如何为 Apache Beam 中的复合变换提供参数? [英] How to supply parameters to a composite transform in Apache Beam?

查看:19
本文介绍了如何为 Apache Beam 中的复合变换提供参数?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我使用的是 Apache Beam 的 Python SDK.

I am using the Python SDK of Apache Beam.

我有几个转换步骤并希望使它们可重复使用,这使我需要编写一个像这样的自定义复合转换:

I have a few transform steps and want to make them reuseable, which points me to write a custom composite transform like this:

class MyCompositeTransform(beam.PTransform):
def expand(self, pcoll, arg1, kwarg1=u'default'):
    result = (pcoll
              | 'Step 1' >> beam.Map(lambda f: SomeFn(f, arg1))
              | 'Last step' >> beam.Map(lambda f: SomeOtherFn(f, kwarg1))
              )
    return result

我想要的是提供其他转换所需的一些额外参数 arg1kwarg1.但我不知道这是否有效,也不知道如何在管道中使用它.

What I want is to supply some extra parameters arg1 and kwarg1 which are needed by other transforms within. But I do not know if this is a valid way, nor how to use it in pipeline.

谁能给我指点方向?

推荐答案

您可以通过 PTransform 构造函数提供参数.参数也可以采用侧输入的形式(即来自另一个变换的数据输出).这是一个同时使用普通"参数和侧面输入的示例.

You can supply parameters via the PTransform constructor. The parameters can also take the form of side inputs (i.e. the data output from another transform). Here is an example that uses both "normal" parameter and side input.

from typing import Dict, Any, Iterable
import apache_beam as beam


class MyCompositeTransform(beam.PTransform):

    def __init__(self, my_arg, my_side_input):
        super().__init__()
        self.my_arg= my_arg
        self.my_side_input= my_side_input

    @staticmethod
    def transform(
        element: Dict[str, Any], my_arg: int, my_side_input: Iterable[int]
    ) -> Dict[str, Any]:
        pass

    def expand(self, pcoll):
        return pcoll | "MyCompositeTransform" >> beam.Map(
            MyCompositeTransform.transform,
            self.my_arg,
            beam.pvalue.AsIter(self.my_side_input),
        )

使用 beam.pvalue 定义侧输入如何传递给变换,例如它是单个值、Iterable 还是具体化为 List?

Use beam.pvalue to define how the side input is passed to the transform e.g. is it a single value, an Iterable or materialize as a List?

来自 Beam 的其他示例:(请参阅 PTransform)https://beam.apache.org/releases/pydoc/2.20.0/_modules/apache_beam/transforms/stats.html

Additional examples from Beam: (see PTransform) https://beam.apache.org/releases/pydoc/2.20.0/_modules/apache_beam/transforms/stats.html

这篇关于如何为 Apache Beam 中的复合变换提供参数?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆