如何限制使用FileIO写入的每个文件的行数 [英] How to limit number of lines per file written using FileIO
问题描述
是否可以使用TextIO限制每个书面分片中的行数,或者可以是FileIO?
Is there a possible way to limit number of lines in each written shard using TextIO or may be FileIO?
示例:
- 从Big Query-Batch Job中读取行(例如,结果为19500行).
- 进行一些转换.
- 将文件写入Google Cloud存储(19个文件,每个文件限制为1000条记录,一个文件有500条记录).
- 触发了Cloud Function,以针对GCS中的每个文件向外部API发出POST请求.
到目前为止,这是我要尝试执行的操作,但是不起作用(尝试限制每个文件1000行):
Here is what I'm trying to do so far but doesn't work (Trying to limit 1000 rows per file):
BQ_DATA = p | 'read_bq_view' >> beam.io.Read(
beam.io.BigQuerySource(query=query,
use_standard_sql=True)) | beam.Map(json.dumps)
BQ_DATA | beam.WindowInto(GlobalWindows(), Repeatedly(trigger=AfterCount(1000)),
accumulation_mode=AccumulationMode.DISCARDING)
| WriteToFiles(path='fileio', destination="csv")
我在概念上是错误的,还是有其他方法可以实现此目的?
Am I conceptually wrong or is there any other way to implement this?
推荐答案
您可以在ParDo中实现对GCS的写入步骤,并限制要包含在批处理"元素中的元素数量.像这样:
You can implement the write to GCS step inside ParDo and limit the number of elements to include in a "batch" like this:
from apache_beam.io import filesystems
class WriteToGcsWithRowLimit(beam.DoFn):
def __init__(self, row_size=1000):
self.row_size = row_size
self.rows = []
def finish_bundle(self):
if len(self.rows) > 0:
self._write_file()
def process(self, element):
self.rows.append(element)
if len(self.rows) >= self.row_size:
self._write_file()
def _write_file(self):
from time import time
new_file = 'gs://bucket/file-{}.csv'.format(time())
writer = filesystems.FileSystems.create(path=new_file)
writer.write(self.rows) # may need to format
self.rows = []
writer.close()
BQ_DATA | beam.ParDo(WriteToGcsWithRowLimit())
请注意,这不会创建少于1000行的任何文件,但是您可以更改process
中的逻辑来做到这一点.
Note that this will not create any files with less than 1000 rows, but you can change the logic in process
to do that.
(编辑1以处理余数)
(编辑2以停止使用计数器,因为文件将被覆盖)
(Edit 2 to stop using counters, as files will be overridden)
这篇关于如何限制使用FileIO写入的每个文件的行数的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!