如何在 apache 光束管道中记录传入消息 [英] How to log incoming messages in apache beam pipeline
问题描述
我正在编写一个简单的 apache 光束流管道,从发布订阅主题获取输入并将其存储到 bigquery 中.几个小时以来,我以为我什至无法阅读消息,因为我只是试图将输入记录到控制台:
I am writing a simple apache beam streaming pipeline, taking input from a pubsub topic and storing this into bigquery. For hours I thought I am not able to even read a message, as I was simply trying to log the input to console:
events = p | 'Read PubSub' >> ReadFromPubSub(subscription=SUBSCRIPTION)
logging.info(events)
当我把它写成文本时,它工作正常!然而,我对 logger
的调用从未发生过.
When I write this to text it works fine! However my call to the logger
never happens.
人们如何开发/调试这些流媒体管道?
How to people develop / debug these streaming pipelines?
我尝试添加以下行:事件 |'日志' >>logging.info(events)
使用 print()
在控制台中也不会产生任何结果.
Using print()
also yields no results in the console.
推荐答案
这是因为 events
是一个 PCollection
所以你需要申请一个 PTransform
到它.
This is because events
is a PCollection
so you need to apply a PTransform
to it.
最简单的方法是将 ParDo
应用到 events
:
The simplest way would be to apply a ParDo
to events
:
events | 'Log results' >> beam.ParDo(LogResults())
定义为:
class LogResults(beam.DoFn):
"""Just log the results"""
def process(self, element):
logging.info("Pub/Sub event: %s", element)
yield element
请注意,如果您想在下游应用进一步的步骤,例如在记录元素后写入接收器,我也会生成元素.例如,请参见此处的问题.
Notice that I also yield the element in case you want to apply further steps downstream, such as writing to a sink after logging the elements. See the issue here, for example.
这篇关于如何在 apache 光束管道中记录传入消息的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!