如何在 apache 光束管道中记录传入消息 [英] How to log incoming messages in apache beam pipeline

查看:27
本文介绍了如何在 apache 光束管道中记录传入消息的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在编写一个简单的 apache 光束流管道,从发布订阅主题获取输入并将其存储到 bigquery 中.几个小时以来,我以为我什至无法阅读消息,因为我只是试图将输入记录到控制台:

I am writing a simple apache beam streaming pipeline, taking input from a pubsub topic and storing this into bigquery. For hours I thought I am not able to even read a message, as I was simply trying to log the input to console:

events = p | 'Read PubSub' >> ReadFromPubSub(subscription=SUBSCRIPTION)
logging.info(events)

当我把它写成文本时,它工作正常!然而,我对 logger 的调用从未发生过.

When I write this to text it works fine! However my call to the logger never happens.

人们如何开发/调试这些流媒体管道?

How to people develop / debug these streaming pipelines?

我尝试添加以下行:事件 |'日志' >>logging.info(events)

使用 print() 在控制台中也不会产生任何结果.

Using print() also yields no results in the console.

推荐答案

这是因为 events 是一个 PCollection 所以你需要申请一个 PTransform 到它.

This is because events is a PCollection so you need to apply a PTransform to it.

最简单的方法是将 ParDo 应用到 events:

The simplest way would be to apply a ParDo to events:

events | 'Log results' >> beam.ParDo(LogResults())

定义为:

class LogResults(beam.DoFn):
  """Just log the results"""
  def process(self, element):
    logging.info("Pub/Sub event: %s", element)
    yield element

请注意,如果您想在下游应用进一步的步骤,例如在记录元素后写入接收器,我也会生成元素.例如,请参见此处的问题.

Notice that I also yield the element in case you want to apply further steps downstream, such as writing to a sink after logging the elements. See the issue here, for example.

这篇关于如何在 apache 光束管道中记录传入消息的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆