筛选GCS存储桶文件夹中的文件,并使用Dataflow删除0字节文件 [英] Filter through files in GCS bucket folder and delete 0 byte files with Dataflow

查看:45
本文介绍了筛选GCS存储桶文件夹中的文件,并使用Dataflow删除0字节文件的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我目前正在尝试删除Google Cloud Storage存储桶中所有0字节的文件.我希望能够使用apache beam和将在其上运行的数据流运行器来执行此操作谷歌云项目.我现在所拥有的是这个(我用< ***> 隐藏了一些细节):

I am currently trying to delete all the files that are 0 Bytes within a Google Cloud Storage bucket. I want to be able to do this with apache beam and a dataflow runner that will run on a google cloud project. What I have right now is this (I have hidden some details with <***>):

import apache_beam as beam
import apache_beam.io.gcp.gcsfilesystem as gcs
from apache_beam.options.pipeline_options import PipelineOptions

class DetectEmpty(beam.DoFn):
    def process(self, file_path):
        if gfs.size(file_path) == 0:
            yield file_path

def run(argv=None):

    parser = argparse.ArgumentParser()
    parser.add_argument('--input', dest='input', default=<***>, help='<***>')

    options = PipelineOptions()
    google_cloud_options = options.view_as(GoogleCloudOptions)
    google_cloud_options.project = '<***>'
    google_cloud_options.job_name = '<***>'
    options.view_as(StandardOptions).runner = 'DataflowRunner'

    gfs = gcs.GCSFileSystem(pipeline_options)
    p = beam.Pipeline(options=pipeline_options)

    images = p | 'read directory' >> ReadFromText(known_args.input)
    empty_images = images | 'discover empty files' >> beam.ParDo(DetectEmpty())

    p.run()

我的一些问题是:

  • 这是执行此任务的正确方法吗?
  • 如何将 apache_beam.io.gcp.gcsfilesystem.GCSFileSystem 传递给 DoFn ?
  • 此外,我想删除所有仅包含0字节文件的文件夹.我该怎么办?

推荐答案

您无需实际读取文件即可检测到空文件,您可以直接使用FileSystem对象检查文件大小并删除为需要.match()函数返回的FileMetadata对象包括文件的大小.

You don't need to actually read the files in order to detect empty ones, you can just use the FileSystem object directly to check the file sizes and delete as needed. The FileMetadata object returned by the match() function includes the size of the files.

类似

class DeleteEmpty(beam.DoFn):
  def __init__(self, gfs):
    self.gfs = gfs

  def process(self, file_metadata):
    if file_metadata.size_in_bytes == 0:
      gfs.delete([file_metadata.path])

files = p | 'Filenames' >> beam.Create(gfs.match([<directory glob pattern>]).metadata_list)
          | 'Reshuffle' >> beam.Reshuffle() # this allows the downstream code to be parallelized after the Create
          | 'Delete empty files' >> beam.ParDo(DeleteEmpty(gfs))

GCS确实没有文件夹;它们只是在使用UI或gsutil时添加的一种便利.如果文件夹中没有对象,则该文件夹不存在.请参阅 https://cloud.google.com/storage/docs/gsutil/addlhelp/HowSubdirectoriesWork

GCS doesn't really have folders; they are just a convenience added when using the UI or gsutil. When there are no objects in a folder, that folder just doesn't exist. See https://cloud.google.com/storage/docs/gsutil/addlhelp/HowSubdirectoriesWork

这篇关于筛选GCS存储桶文件夹中的文件,并使用Dataflow删除0字节文件的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆