如何从 PCollection 中过滤掉 None 值 [英] How To Filter None Values Out Of PCollection

查看:33
本文介绍了如何从 PCollection 中过滤掉 None 值的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我的 pubsub pull 订阅正在发送消息和每条消息的 None 值.我需要找到一种方法来过滤掉无值作为我的管道处理的一部分

My pubsub pull subscription is sending over the message and a None value for each message. I need to find a way to filter out the none values as part of my pipeline processing

当然,一些帮助防止从请求订阅中到达 none 值会很好.但我觉得我缺少关于定义 & 的一般工作流程的一些内容.通过 ParDo 应用函数.

Of course some help preventing the none values from arriving from the pull subscription would be nice. But I feel like I'm missing something about the general workflow of defining & applying functions via ParDo.

我已经设置了一个函数来过滤掉无值,这似乎基于打印到控制台检查工作,但是当应用在无类型上崩溃的 lambda 函数时,我仍然收到错误.

I've set up a function to filter out none values which seems to work based on a print to console check, however when applying a lambda function that crashes on none types I still receive errors.

我发现有关 python Apache Beam SDK 的文档有点稀疏,但我一直在寻找答案,但运气不佳.

I've found the documentation on the python Apache Beam SDK a little sparse but I have been looking all through there for answers without much luck.

from __future__ import absolute_import

import argparse
import logging

from past.builtins import unicode

import apache_beam as beam
import apache_beam.transforms.window as window

from apache_beam.examples.wordcount import WordExtractingDoFn
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
from apache_beam.options.pipeline_options import StandardOptions



def print_row(row):
    print row
    print type(row)

def filter_out_nones(row):
  if row is not None:
    yield row
  else:
    print 'we found a none! get it out'


def run(argv=None):
    pipeline_options = PipelineOptions()
    pipeline_options.view_as(SetupOptions).save_main_session = True
    pipeline_options.view_as(StandardOptions).streaming = True


    p = beam.Pipeline(options=pipeline_options)



    data = ['test1 message','test2 message',None,'test3 please work']

## this does seem to return only the values I would hope for based on the console log

    testlogOnly = (p | "makeData" >> beam.Create(data)
               | "filter" >> beam.ParDo(filter_out_nones)
               | "printtesting" >> beam.Map(print_row))
            #  | 'encoding' >> beam.Map(lambda x: x.encode('utf-8')).with_output_types(bytes)
            #  | "writing" >> beam.io.WriteToPubSub("projects/??/topics/??"))


##    testlogAndWrite = (p | "MakeWriteData" >> beam.Create(data)
            #  | "filterHere" >> beam.ParDo(filter_out_nones)
            #   | "printHere" >> beam.Map(print_row)
## below here does not work due to the following message
## AttributeError: 'NoneType' object has no attribute 'encode' [while running 'encodeHere']
            #   | 'encodeHere' >> beam.Map(lambda x: x.encode('utf-8')).with_output_types(bytes)
            # | "writeTest" >> beam.io.WriteToPubSub("projects/??/topics/??"))

    result = p.run()
    result.wait_until_finish()


if __name__ == '__main__':
  logging.getLogger().setLevel(logging.INFO)
  run()

如果我可以记录字节字符串编码的消息而没有 none 结果,我就会在我需要的地方.

If I could log the byte string encoded messages without the none results I'll be where I need to be.

推荐答案

您过滤None 值的方法在我看来不错.

Your approach to filter out None values looks good to me.

但是,如果我理解正确的话,当您使用 testlogAndWrite 并获得 AttributeError 时,您将保留 "printHere" >>beam.Map(print_row) 管道中的步骤.

However, if I understand it correctly, when you are using testlogAndWrite and get the AttributeError you are keeping the "printHere" >> beam.Map(print_row) step in the pipeline.

print_row 读取消息并打印它们,但它不输出任何内容.因此,下一步将没有输入encode_here.

print_row reads the messages and prints them but it does not output anything. Therefore, there will be no input for next step encode_here.

要解决这个问题,您可以注释掉该步骤或确保返回每个元素:

To solve this you can comment out that step or make sure that each element is returned:

def print_row(row):
    print row
    print type(row)
    return row

输出:

test1 message
<type 'str'>
test2 message
<type 'str'>
we found a none! get it out
test3 please work
<type 'str'>

这篇关于如何从 PCollection 中过滤掉 None 值的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆