如何在 Apache Beam 中写入多个文件? [英] How do I write to multiple files in Apache Beam?
问题描述
让我简化一下我的案例.我正在使用 Apache Beam 0.6.0.我最终的处理结果是PCollection
.我想将值写入与其键对应的不同文件.
Let me simplify my case. I'm using Apache Beam 0.6.0. My final processed result is PCollection<KV<String, String>>
. And I want to write values to different files corresponding to their keys.
例如,假设结果包括
(key1, value1)
(key2, value2)
(key1, value3)
(key1, value4)
那我想把value1
、value3
和value4
写成key1.txt
,然后写value4
到 key2.txt
.
Then I want to write value1
, value3
and value4
to key1.txt
, and write value4
to key2.txt
.
就我而言:
- 密钥集是在管道运行时确定的,而不是在构建管道时确定.
- 键集可能很小,但每个键对应的值的数量可能非常大.
有什么想法吗?
推荐答案
前几天,我写了一个这个案例的示例.
Handily, I wrote a sample of this case just the other day.
这个例子是数据流 1.x 风格
基本上,您按每个键分组,然后您可以使用连接到云存储的自定义转换来完成此操作.需要注意的是,每个文件的行列表不应很大(它必须适合单个实例的内存,但考虑到您可以运行高内存实例,该限制相当高).
Basically you group by each key, and then you can do this with a custom transform that connects to cloud storage. Caveat being that your list of lines per-file shouldn't be massive (it's got to fit into memory on a single instance, but considering you can run high-mem instances, that limit is pretty high).
...
PCollection<KV<String, List<String>>> readyToWrite = groupedByFirstLetter
.apply(Combine.perKey(AccumulatorOfWords.getCombineFn()));
readyToWrite.apply(
new PTransformWriteToGCS("dataflow-experiment", TonyWordGrouper::derivePath));
...
然后进行大部分工作的转换是:
public class PTransformWriteToGCS
extends PTransform<PCollection<KV<String, List<String>>>, PCollection<Void>> {
private static final Logger LOG = Logging.getLogger(PTransformWriteToGCS.class);
private static final Storage STORAGE = StorageOptions.getDefaultInstance().getService();
private final String bucketName;
private final SerializableFunction<String, String> pathCreator;
public PTransformWriteToGCS(final String bucketName,
final SerializableFunction<String, String> pathCreator) {
this.bucketName = bucketName;
this.pathCreator = pathCreator;
}
@Override
public PCollection<Void> apply(final PCollection<KV<String, List<String>>> input) {
return input
.apply(ParDo.of(new DoFn<KV<String, List<String>>, Void>() {
@Override
public void processElement(
final DoFn<KV<String, List<String>>, Void>.ProcessContext arg0)
throws Exception {
final String key = arg0.element().getKey();
final List<String> values = arg0.element().getValue();
final String toWrite = values.stream().collect(Collectors.joining("\n"));
final String path = pathCreator.apply(key);
BlobInfo blobInfo = BlobInfo.newBuilder(bucketName, path)
.setContentType(MimeTypes.TEXT)
.build();
LOG.info("blob writing to: {}", blobInfo);
Blob result = STORAGE.create(blobInfo,
toWrite.getBytes(StandardCharsets.UTF_8));
}
}));
}
}
这篇关于如何在 Apache Beam 中写入多个文件?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!