Dataflow/apache beam - 传入模式时如何访问当前文件名? [英] Dataflow/apache beam - how to access current filename when passing in pattern?

查看:27
本文介绍了Dataflow/apache beam - 传入模式时如何访问当前文件名?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我在堆栈溢出之前看到过这个问题的回答(https://stackoverflow.com/questions/29983621/how-to-get-filename-when-using-file-pattern-match-in-google-cloud-dataflow),但不是因为 apache beam 添加了python的可拆分自由度功能.将文件模式传递给 gcs 存储桶时,如何访问正在处理的当前文件的文件名?

I have seen this question answered before on stack overflow (https://stackoverflow.com/questions/29983621/how-to-get-filename-when-using-file-pattern-match-in-google-cloud-dataflow), but not since apache beam has added splittable dofn functionality for python. How would I access the filename of the current file being processed when passing in a file pattern to a gcs bucket?

我想将文件名传递给我的转换函数:

I want to pass the filename into my transform function:

with beam.Pipeline(options=pipeline_options) as p:                              
    lines = p | ReadFromText('gs://url to file')                                        


    data = (                                                                    
        lines                                                                   
        | 'Jsonify' >> beam.Map(jsonify)                                        
        | 'Unnest' >> beam.FlatMap(unnest)                                      
        | 'Write to BQ' >> beam.io.Write(beam.io.BigQuerySink(                  
            'project_id:dataset_id.table_name', schema=schema,                     
            create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,    
            write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)       
        )                                                   

最终,我想做的是在转换 json 的每一行时将文件名传递到我的转换函数中(参见 this 然后使用文件名在不同的 BQ 表中进行查找以获取值).我想一旦我设法知道如何获取文件名,我将能够找出侧面输入部分,以便在 bq 表中进行查找并获得唯一值.

Ultimately, what I want to do is pass the filename into my transform function when I transform each row of the json (see this and then use the filename to do a lookup in a different BQ table to get a value). I think once I manage to know how to get the filename I will be able to figure out the side input part in order to do the lookup in the bq table and get the unique value.

推荐答案

我尝试使用之前引用的 case.在那里,以及其他方法,例如 这个 他们还获得了一个文件名列表,但将所有文件加载到一个元素中,该元素可能无法很好地扩展大文件.因此,我考虑将文件名添加到每条记录中.

I tried to implement a solution with the previously cited case. There, as well as in other approaches such as this one they also get a list of file names but load all the file into a single element which might not scale well with large files. Therefore, I looked into adding the filename to each record.

我使用了两个 csv 文件作为输入:

As input I used two csv files:

$ gsutil cat gs://$BUCKET/countries1.csv
id,country
1,sweden
2,spain

gsutil cat gs://$BUCKET/countries2.csv
id,country
3,italy
4,france

使用 GCSFileSystem.match 我们可以访问 metadata_list 以检索包含文件路径和大小(以字节为单位)的 FileMetadata.在我的例子中:

Using GCSFileSystem.match we can access metadata_list to retrieve FileMetadata containing the file path and size in bytes. In my example:

[FileMetadata(gs://BUCKET_NAME/countries1.csv, 29),
 FileMetadata(gs://BUCKET_NAME/countries2.csv, 29)]

代码是:

result = [m.metadata_list for m in gcs.match(['gs://{}/countries*'.format(BUCKET)])]

我们会将每个匹配的文件读入不同的 PCollection.由于我们不知道先验的文件数量,我们需要以编程方式为每个 PCollection (p0, p1, ..., pN-1) 创建一个名称列表,并确保我们有唯一的每个步骤的标签('Read file 0', 'Read file 1', etc.):

We will read each of the matching files into a different PCollection. As we don't know the number of files a priori we need to create programmatically a list of names for each PCollection (p0, p1, ..., pN-1) and ensure that we have unique labels for each step ('Read file 0', 'Read file 1', etc.):

variables = ['p{}'.format(i) for i in range(len(result))]
read_labels = ['Read file {}'.format(i) for i in range(len(result))]
add_filename_labels = ['Add filename {}'.format(i) for i in range(len(result))]

然后我们继续使用 ReadFromText 将每个不同的文件读入其对应的 PCollection,然后调用 AddFilenamesFn ParDo 将每个记录与文件名关联.

Then we proceed to read each different file into its corresponding PCollection with ReadFromText and then we call the AddFilenamesFn ParDo to associate each record with the filename.

for i in range(len(result)):   
  globals()[variables[i]] = p | read_labels[i] >> ReadFromText(result[i].path) | add_filename_labels[i] >> beam.ParDo(AddFilenamesFn(), result[i].path)

其中 AddFilenamesFn 是:

class AddFilenamesFn(beam.DoFn):
    """ParDo to output a dict with filename and row"""
    def process(self, element, file_path):
        file_name = file_path.split("/")[-1]
        yield {'filename':file_name, 'row':element}

我的第一种方法是直接使用 Map 函数,这会产生更简单的代码.但是,result[i].path 在循环结束时被解析,并且每条记录都被错误地映射到列表的最后一个文件:

My first approach was using a Map function directly which results in simpler code. However, result[i].path was resolved at the end of the loop and each record was incorrectly mapped to the last file of the list:

globals()[variables[i]] = p | read_labels[i] >> ReadFromText(result[i].path) | add_filename_labels[i] >> beam.Map(lambda elem: (result[i].path, elem))

最后,我们将所有 PCollections 合并为一个:

Finally, we flatten all the PCollections into one:

merged = [globals()[variables[i]] for i in range(len(result))] | 'Flatten PCollections' >> beam.Flatten()

我们通过记录元素来检查结果:

and we check the results by logging the elements:

INFO:root:{'filename': u'countries2.csv', 'row': u'id,country'}
INFO:root:{'filename': u'countries2.csv', 'row': u'3,italy'}
INFO:root:{'filename': u'countries2.csv', 'row': u'4,france'}
INFO:root:{'filename': u'countries1.csv', 'row': u'id,country'}
INFO:root:{'filename': u'countries1.csv', 'row': u'1,sweden'}
INFO:root:{'filename': u'countries1.csv', 'row': u'2,spain'}

我使用 DirectRunnerDataflowRunner 对 Python SDK 2.8.0 进行了测试.

I tested this with both DirectRunner and DataflowRunner for Python SDK 2.8.0.

我希望这能解决这里的主要问题,您现在可以继续将 BigQuery 集成到您的完整用例中.您可能需要为此使用 Python 客户端库,我编写了一个类似的 Java 示例.

I hope this addresses the main issue here and you can continue by integrating BigQuery into your full use case now. You might need to use the Python Client Library for that, I wrote a similar Java example.

完整代码:

import argparse, logging
from operator import add

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.io import ReadFromText
from apache_beam.io.filesystem import FileMetadata
from apache_beam.io.filesystem import FileSystem
from apache_beam.io.gcp.gcsfilesystem import GCSFileSystem

class GCSFileReader:
  """Helper class to read gcs files"""
  def __init__(self, gcs):
      self.gcs = gcs

class AddFilenamesFn(beam.DoFn):
    """ParDo to output a dict with filename and row"""
    def process(self, element, file_path):
        file_name = file_path.split("/")[-1]
        # yield (file_name, element) # use this to return a tuple instead
        yield {'filename':file_name, 'row':element}

# just logging output to visualize results
def write_res(element):
  logging.info(element)
  return element

def run(argv=None):
  parser = argparse.ArgumentParser()
  known_args, pipeline_args = parser.parse_known_args(argv)

  p = beam.Pipeline(options=PipelineOptions(pipeline_args))
  gcs = GCSFileSystem(PipelineOptions(pipeline_args))
  gcs_reader = GCSFileReader(gcs)

  # in my case I am looking for files that start with 'countries'
  BUCKET='BUCKET_NAME'
  result = [m.metadata_list for m in gcs.match(['gs://{}/countries*'.format(BUCKET)])]
  result = reduce(add, result)

  # create each input PCollection name and unique step labels
  variables = ['p{}'.format(i) for i in range(len(result))]
  read_labels = ['Read file {}'.format(i) for i in range(len(result))]
  add_filename_labels = ['Add filename {}'.format(i) for i in range(len(result))]

  # load each input file into a separate PCollection and add filename to each row
  for i in range(len(result)):
    # globals()[variables[i]] = p | read_labels[i] >> ReadFromText(result[i].path) | add_filename_labels[i] >> beam.Map(lambda elem: (result[i].path, elem))
    globals()[variables[i]] = p | read_labels[i] >> ReadFromText(result[i].path) | add_filename_labels[i] >> beam.ParDo(AddFilenamesFn(), result[i].path)

  # flatten all PCollections into a single one
  merged = [globals()[variables[i]] for i in range(len(result))] | 'Flatten PCollections' >> beam.Flatten() | 'Write results' >> beam.Map(write_res)

  p.run()

if __name__ == '__main__':
  run()

这篇关于Dataflow/apache beam - 传入模式时如何访问当前文件名?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆