如何从PCollection筛选无值 [英] How To Filter None Values Out Of PCollection
问题描述
我的pubsub pull订阅正在通过消息发送,每条消息的None值.我需要找到一种方法来过滤掉none值,这是我的管道处理的一部分
My pubsub pull subscription is sending over the message and a None value for each message. I need to find a way to filter out the none values as part of my pipeline processing
当然,有些帮助可以防止pull订阅中的none值到达.但是我觉得我缺少有关定义&常规工作流程的信息.通过ParDo应用功能.
Of course some help preventing the none values from arriving from the pull subscription would be nice. But I feel like I'm missing something about the general workflow of defining & applying functions via ParDo.
我已经设置了一个过滤任何值的函数,该值似乎基于从打印到控制台的检查而起作用,但是当应用在没有类型时崩溃的lambda函数时,我仍然会收到错误.
I've set up a function to filter out none values which seems to work based on a print to console check, however when applying a lambda function that crashes on none types I still receive errors.
我发现python Apache Beam SDK上的文档很少,但是我一直在寻找答案,但运气不佳.
I've found the documentation on the python Apache Beam SDK a little sparse but I have been looking all through there for answers without much luck.
from __future__ import absolute_import
import argparse
import logging
from past.builtins import unicode
import apache_beam as beam
import apache_beam.transforms.window as window
from apache_beam.examples.wordcount import WordExtractingDoFn
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
from apache_beam.options.pipeline_options import StandardOptions
def print_row(row):
print row
print type(row)
def filter_out_nones(row):
if row is not None:
yield row
else:
print 'we found a none! get it out'
def run(argv=None):
pipeline_options = PipelineOptions()
pipeline_options.view_as(SetupOptions).save_main_session = True
pipeline_options.view_as(StandardOptions).streaming = True
p = beam.Pipeline(options=pipeline_options)
data = ['test1 message','test2 message',None,'test3 please work']
## this does seem to return only the values I would hope for based on the console log
testlogOnly = (p | "makeData" >> beam.Create(data)
| "filter" >> beam.ParDo(filter_out_nones)
| "printtesting" >> beam.Map(print_row))
# | 'encoding' >> beam.Map(lambda x: x.encode('utf-8')).with_output_types(bytes)
# | "writing" >> beam.io.WriteToPubSub("projects/??/topics/??"))
## testlogAndWrite = (p | "MakeWriteData" >> beam.Create(data)
# | "filterHere" >> beam.ParDo(filter_out_nones)
# | "printHere" >> beam.Map(print_row)
## below here does not work due to the following message
## AttributeError: 'NoneType' object has no attribute 'encode' [while running 'encodeHere']
# | 'encodeHere' >> beam.Map(lambda x: x.encode('utf-8')).with_output_types(bytes)
# | "writeTest" >> beam.io.WriteToPubSub("projects/??/topics/??"))
result = p.run()
result.wait_until_finish()
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
run()
如果我可以记录字节字符串编码的消息而没有结果,那么我将在需要的地方.
If I could log the byte string encoded messages without the none results I'll be where I need to be.
推荐答案
您的用于过滤None
值的方法对我来说很好.
Your approach to filter out None
values looks good to me.
但是,如果我理解正确,当您使用testlogAndWrite
并获取AttributeError
时,您将在管道中保留"printHere" >> beam.Map(print_row)
步骤.
However, if I understand it correctly, when you are using testlogAndWrite
and get the AttributeError
you are keeping the "printHere" >> beam.Map(print_row)
step in the pipeline.
print_row
读取并打印消息,但不输出任何内容.因此,下一步encode_here
将没有任何输入.
print_row
reads the messages and prints them but it does not output anything. Therefore, there will be no input for next step encode_here
.
要解决此问题,您可以注释掉该步骤或确保返回每个元素:
To solve this you can comment out that step or make sure that each element is returned:
def print_row(row):
print row
print type(row)
return row
输出:
test1 message
<type 'str'>
test2 message
<type 'str'>
we found a none! get it out
test3 please work
<type 'str'>
这篇关于如何从PCollection筛选无值的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!