数据流/Apache Beam-传递模式时如何访问当前文件名? [英] Dataflow/apache beam - how to access current filename when passing in pattern?

查看:113
本文介绍了数据流/Apache Beam-传递模式时如何访问当前文件名?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我之前在堆栈溢出时已经看到了这个问题的答案( https://stackoverflow.com/questions/29983621/how-to-get-filename-when-using-file-pattern-match-in-google-cloud-dataflow ),但由于添加了Apache Beam而没有适用于python的splittable dofn功能.将文件模式传递到gcs存储桶时,如何访问当前正在处理的文件的文件名?

I have seen this question answered before on stack overflow (https://stackoverflow.com/questions/29983621/how-to-get-filename-when-using-file-pattern-match-in-google-cloud-dataflow), but not since apache beam has added splittable dofn functionality for python. How would I access the filename of the current file being processed when passing in a file pattern to a gcs bucket?

我想将文件名传递到我的转换函数中:

I want to pass the filename into my transform function:

with beam.Pipeline(options=pipeline_options) as p:                              
    lines = p | ReadFromText('gs://url to file')                                        


    data = (                                                                    
        lines                                                                   
        | 'Jsonify' >> beam.Map(jsonify)                                        
        | 'Unnest' >> beam.FlatMap(unnest)                                      
        | 'Write to BQ' >> beam.io.Write(beam.io.BigQuerySink(                  
            'project_id:dataset_id.table_name', schema=schema,                     
            create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,    
            write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)       
        )                                                   

最终,我要做的是在转换json的每一行时将文件名传递到转换函数中(请参阅

Ultimately, what I want to do is pass the filename into my transform function when I transform each row of the json (see this and then use the filename to do a lookup in a different BQ table to get a value). I think once I manage to know how to get the filename I will be able to figure out the side input part in order to do the lookup in the bq table and get the unique value.

推荐答案

我尝试使用先前引用的此文件,它们还会获取文件名列表,但会将所有文件加载到单个元素中,这对于大文件而言可能无法很好地扩展.因此,我考虑将文件名添加到每个记录中.

I tried to implement a solution with the previously cited case. There, as well as in other approaches such as this one they also get a list of file names but load all the file into a single element which might not scale well with large files. Therefore, I looked into adding the filename to each record.

作为输入,我使用了两个csv文件:

As input I used two csv files:

$ gsutil cat gs://$BUCKET/countries1.csv
id,country
1,sweden
2,spain

gsutil cat gs://$BUCKET/countries2.csv
id,country
3,italy
4,france

使用GCSFileSystem.match,我们可以访问metadata_list来检索FileMetadata,其中包含文件路径和大小(以字节为单位).在我的示例中:

Using GCSFileSystem.match we can access metadata_list to retrieve FileMetadata containing the file path and size in bytes. In my example:

[FileMetadata(gs://BUCKET_NAME/countries1.csv, 29),
 FileMetadata(gs://BUCKET_NAME/countries2.csv, 29)]

代码是:

result = [m.metadata_list for m in gcs.match(['gs://{}/countries*'.format(BUCKET)])]

我们将把每个匹配的文件读入一个不同的PCollection中.由于我们不知道先验文件的数量,因此我们需要以编程方式为每个PCollection (p0, p1, ..., pN-1)创建一个名称列表,并确保每个步骤('Read file 0', 'Read file 1', etc.)都有唯一的标签:

We will read each of the matching files into a different PCollection. As we don't know the number of files a priori we need to create programmatically a list of names for each PCollection (p0, p1, ..., pN-1) and ensure that we have unique labels for each step ('Read file 0', 'Read file 1', etc.):

variables = ['p{}'.format(i) for i in range(len(result))]
read_labels = ['Read file {}'.format(i) for i in range(len(result))]
add_filename_labels = ['Add filename {}'.format(i) for i in range(len(result))]

然后,我们继续使用ReadFromText将每个不同的文件读取到其对应的PCollection中,然后调用AddFilenamesFn ParDo将每个记录与文件名相关联.

Then we proceed to read each different file into its corresponding PCollection with ReadFromText and then we call the AddFilenamesFn ParDo to associate each record with the filename.

for i in range(len(result)):   
  globals()[variables[i]] = p | read_labels[i] >> ReadFromText(result[i].path) | add_filename_labels[i] >> beam.ParDo(AddFilenamesFn(), result[i].path)

其中AddFilenamesFn是:

class AddFilenamesFn(beam.DoFn):
    """ParDo to output a dict with filename and row"""
    def process(self, element, file_path):
        file_name = file_path.split("/")[-1]
        yield {'filename':file_name, 'row':element}

我的第一种方法是直接使用Map函数,这可以简化代码.但是,result[i].path在循环结束时已解决,并且每条记录被错误地映射到列表的最后一个文件:

My first approach was using a Map function directly which results in simpler code. However, result[i].path was resolved at the end of the loop and each record was incorrectly mapped to the last file of the list:

globals()[variables[i]] = p | read_labels[i] >> ReadFromText(result[i].path) | add_filename_labels[i] >> beam.Map(lambda elem: (result[i].path, elem))

最后,我们将所有PCollection展平为一个:

Finally, we flatten all the PCollections into one:

merged = [globals()[variables[i]] for i in range(len(result))] | 'Flatten PCollections' >> beam.Flatten()

,我们通过记录以下元素来检查结果:

and we check the results by logging the elements:

INFO:root:{'filename': u'countries2.csv', 'row': u'id,country'}
INFO:root:{'filename': u'countries2.csv', 'row': u'3,italy'}
INFO:root:{'filename': u'countries2.csv', 'row': u'4,france'}
INFO:root:{'filename': u'countries1.csv', 'row': u'id,country'}
INFO:root:{'filename': u'countries1.csv', 'row': u'1,sweden'}
INFO:root:{'filename': u'countries1.csv', 'row': u'2,spain'}

我针对Python SDK 2.8.0使用DirectRunnerDataflowRunner对此进行了测试.

I tested this with both DirectRunner and DataflowRunner for Python SDK 2.8.0.

我希望这能解决这里的主要问题,您现在可以继续将BigQuery集成到您的完整用例中.为此,您可能需要使用Python客户端库,我编写了一个类似的Java 示例.

I hope this addresses the main issue here and you can continue by integrating BigQuery into your full use case now. You might need to use the Python Client Library for that, I wrote a similar Java example.

完整代码:

import argparse, logging
from operator import add

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.io import ReadFromText
from apache_beam.io.filesystem import FileMetadata
from apache_beam.io.filesystem import FileSystem
from apache_beam.io.gcp.gcsfilesystem import GCSFileSystem

class GCSFileReader:
  """Helper class to read gcs files"""
  def __init__(self, gcs):
      self.gcs = gcs

class AddFilenamesFn(beam.DoFn):
    """ParDo to output a dict with filename and row"""
    def process(self, element, file_path):
        file_name = file_path.split("/")[-1]
        # yield (file_name, element) # use this to return a tuple instead
        yield {'filename':file_name, 'row':element}

# just logging output to visualize results
def write_res(element):
  logging.info(element)
  return element

def run(argv=None):
  parser = argparse.ArgumentParser()
  known_args, pipeline_args = parser.parse_known_args(argv)

  p = beam.Pipeline(options=PipelineOptions(pipeline_args))
  gcs = GCSFileSystem(PipelineOptions(pipeline_args))
  gcs_reader = GCSFileReader(gcs)

  # in my case I am looking for files that start with 'countries'
  BUCKET='BUCKET_NAME'
  result = [m.metadata_list for m in gcs.match(['gs://{}/countries*'.format(BUCKET)])]
  result = reduce(add, result)

  # create each input PCollection name and unique step labels
  variables = ['p{}'.format(i) for i in range(len(result))]
  read_labels = ['Read file {}'.format(i) for i in range(len(result))]
  add_filename_labels = ['Add filename {}'.format(i) for i in range(len(result))]

  # load each input file into a separate PCollection and add filename to each row
  for i in range(len(result)):
    # globals()[variables[i]] = p | read_labels[i] >> ReadFromText(result[i].path) | add_filename_labels[i] >> beam.Map(lambda elem: (result[i].path, elem))
    globals()[variables[i]] = p | read_labels[i] >> ReadFromText(result[i].path) | add_filename_labels[i] >> beam.ParDo(AddFilenamesFn(), result[i].path)

  # flatten all PCollections into a single one
  merged = [globals()[variables[i]] for i in range(len(result))] | 'Flatten PCollections' >> beam.Flatten() | 'Write results' >> beam.Map(write_res)

  p.run()

if __name__ == '__main__':
  run()

这篇关于数据流/Apache Beam-传递模式时如何访问当前文件名?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆