运行束流管道时,对象没有窗口属性 [英] 'PBegin' object has no attribute 'windowing' while running beam pipeline

查看:16
本文介绍了运行束流管道时,对象没有窗口属性的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

在运行数据流作业时,我得到‘PBegin’对象没有‘windowing’属性。 我正在调用Pardo函数中的ConnectClass类。

我正在尝试从Beam pythonSDK连接NOSQL数据库,并运行SQL从表中提取数据。然后使用另一个Pardo将输出写入单独的文件。

class Connector(beam.DoFn):
    def __init__(self,username,seeds,keyspace,password,datacenter=None):
    self.username = username
    self.password = password
    self.seeds = seeds
    self.keyspace = keyspace
    self.datacenter = datacenter
    super(self.__class__, self).__init__()

    def process(self, element):

    if datacenter:
        load_balancing_policy = DCAwareRoundRobinPolicy(local_dc=self.datacenter)
    auth_provider = PlainTextAuthProvider(username=self.username, password=self.password)
    cluster = Cluster(contact_points=self.seeds,
                      load_balancing_policy=load_balancing_policy,
                      auth_provider=auth_provider)
    session=cluster.connect(self.seeds,self.keyspace,self.username, self.password, self.datacenter)
    rows = session.execute(SQL Query)
    yield rows

推荐答案

正好遇到了同样的问题。尝试连接到RDBMS源,但我猜在实现设计方面,NoSQL和SQL数据库没有区别。

与Jayadeep Jayaraman建议的不同,这可以通过使用Pardo来实现。实际上,使用Pardo进行连接是beam documentation建议的如果您的用例可以接受这样做的限制:

对于有限制(批处理)的震源,目前有两种创建波束源的选择:

使用Pardo和GroupByKey。

使用Source接口并扩展BoundedSource抽象子类。

推荐使用Pardo,因为实现源代码可能很棘手。有关您可能希望使用源代码的一些用例的列表,请参阅何时使用>源代码>>(例如动态工作重新平衡)。

您没有展示如何使用DoFn。对我来说,记住DoFn作用于已经存在的PCollection的元素是很有帮助的。它本身不能从头开始创建DoFn。因此,要克服您提到的问题,您可能需要从内存创建一个PCollection,它包含一个用于查询的元素,用于从源检索数据。然后将从您的源代码读取的Pardo应用于此PCollection。

BTW:我为每个分区想出一个元素,希望从我的PCollection中的RDBMS中读取--这样就可以从我的SQL数据库中并行读取数据。

解决方案可能如下所示:

p | beam.Create(["Your Query / source object qualifier goes here"]) 
  | "Read from Database" >> beam.ParDo(YourConnector())

我还要提一下使用DoFn的Start_Bundle和Finish_Bundle方法来设置/断开连接可能是个好主意

这篇关于运行束流管道时,对象没有窗口属性的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆