如何在Apache Beam管道中记录传入消息 [英] How to log incoming messages in apache beam pipeline

查看:81
本文介绍了如何在Apache Beam管道中记录传入消息的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在编写一个简单的Apache Beam流管道,从pubsub主题获取输入并将其存储到bigquery中。几个小时以来,我认为我什至无法阅读消息,因为我只是试图将输入记录到控制台:

I am writing a simple apache beam streaming pipeline, taking input from a pubsub topic and storing this into bigquery. For hours I thought I am not able to even read a message, as I was simply trying to log the input to console:

events = p | 'Read PubSub' >> ReadFromPubSub(subscription=SUBSCRIPTION)
logging.info(events)

当我将其写入文字效果很好!但是,我对 logger 的调用从未发生。

When I write this to text it works fine! However my call to the logger never happens.

人们如何开发/调试这些流传输管道?

How to people develop / debug these streaming pipelines?

我尝试添加以下行:
events | 日志>> logging.info(事件)

使用 print()也不会产生任何结果

Using print() also yields no results in the console.

推荐答案

这是因为事件 PCollection ,因此您需要应用 PTransform

This is because events is a PCollection so you need to apply a PTransform to it.

最简单的方法是应用 ParDo 事件

The simplest way would be to apply a ParDo to events:

events | 'Log results' >> beam.ParDo(LogResults())

其定义为:

class LogResults(beam.DoFn):
  """Just log the results"""
  def process(self, element):
    logging.info("Pub/Sub event: %s", element)
    yield element

请注意,如果您想在下游应用进一步的步骤,例如记录日志元素后写入接收器,我也会生成该元素。例如,请参见此处

Notice that I also yield the element in case you want to apply further steps downstream, such as writing to a sink after logging the elements. See the issue here, for example.

这篇关于如何在Apache Beam管道中记录传入消息的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆