有没有办法用 Apache Beam FileIO 为每条记录写一个文件? [英] Is there a way to write one file for each record with Apache Beam FileIO?

查看:23
本文介绍了有没有办法用 Apache Beam FileIO 为每条记录写一个文件?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在学习 Apache Beam 并尝试实现类似于 distcp 的东西.我使用 FileIO.read().filepattern() 来获取输入文件,但是在使用 FileIO.write 写入时,文件有时会合并.

I am learning Apache Beam and trying to implement something similar to distcp. I use FileIO.read().filepattern() to get the input files, but while writing with FileIO.write, the files get coalesced sometimes.

在作业执行之前知道分区数是不可能的.

Knowing the partition count before job execution is not possible.

PCollection<MatchResult.Metadata> pCollection = pipeline.apply(this.name(), FileIO.match().filepattern(path()))
  .apply(FileIO.readMatches())
  .apply(name(), FileIO.<FileIO.ReadableFile>write()
        .via(FileSink.create())
        .to(path()));

接收器代码

@AutoValue
public abstract static class FileSink implements FileIO.Sink<FileIO.ReadableFile> {

    private OutputStream outputStream;

    public static FileSink create() {
      return new AutoValue_FileIOOperator_FileSink();
    }

    @Override
    public void open(WritableByteChannel channel) throws IOException {
      outputStream = Channels.newOutputStream(channel);
    }

    @Override
    public void write(FileIO.ReadableFile element) throws IOException {
      try (final InputStream inputStream = Channels.newInputStream(element.open())) {
        IOUtils.copy(inputStream, outputStream);
      }
    }

    @Override
    public void flush() throws IOException {
      outputStream.flush();
    }
  }

推荐答案

您可以使用 FileIO.writeDynamic 并在 .by 中指定您想如何编写它们.例如,如果您有唯一的键,您可以使用 .by(KV::getKey) 并且每个键元素将被写入一个单独的文件.否则,标准可以是行的哈希值等.您也可以随意调整 .withNaming .作为演示:

You can use FileIO.writeDynamic and specify in .by how you want to write them. For example, if you have unique keys you can use .by(KV::getKey) and each key element will be written to a separate file. Otherwise, the criterion can be the hash of the row, etc. also you can tune .withNaming at will. As a demo:

p.apply("Create Data", Create.of(KV.of("one", "this is row 1"), KV.of("two", "this is row 2"), KV.of("three", "this is row 3"), KV.of("four", "this is row 4")))
 .apply(FileIO.<String, KV<String, String>>writeDynamic()
    .by(KV::getKey)
    .withDestinationCoder(StringUtf8Coder.of())
    .via(Contextful.fn(KV::getValue), TextIO.sink())
    .to(output)
    .withNaming(key -> FileIO.Write.defaultNaming("file-" + key, ".txt")));

这会将四个元素写入四个文件:

This will write the four elements into four files:

$ mvn compile -e exec:java \
 -Dexec.mainClass=com.dataflow.samples.OneRowOneFile \
      -Dexec.args="--project=$PROJECT \
      --output="output/" \
      --runner=DirectRunner"

$ ls output/
file-four-00001-of-00003.txt  file-one-00002-of-00003.txt  file-three-00002-of-00003.txt  file-two-00002-of-00003.txt
$ cat output/file-four-00001-of-00003.txt 
this is row 4

完整代码:

package com.dataflow.samples;

import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.io.FileIO;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.Validation;
import org.apache.beam.sdk.transforms.Contextful;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;


public abstract class OneRowOneFile {

    public interface Options extends PipelineOptions {
        @Validation.Required
        @Description("Output Path i.e. gs://BUCKET/path/to/output/folder")
        String getOutput();
        void setOutput(String s);
    }

    public static void main(String[] args) {

        OneRowOneFile.Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(OneRowOneFile.Options.class);

        Pipeline p = Pipeline.create(options);

        String output = options.getOutput();

        p.apply("Create Data", Create.of(KV.of("one", "this is row 1"), KV.of("two", "this is row 2"), KV.of("three", "this is row 3"), KV.of("four", "this is row 4")))
         .apply(FileIO.<String, KV<String, String>>writeDynamic()
            .by(KV::getKey)
            .withDestinationCoder(StringUtf8Coder.of())
            .via(Contextful.fn(KV::getValue), TextIO.sink())
            .to(output)
            .withNaming(key -> FileIO.Write.defaultNaming("file-" + key, ".txt")));

        p.run().waitUntilFinish();
    }
}

告诉我这是否也适用于您的自定义接收器.

Let me know if that works with your custom sink, too.

这篇关于有没有办法用 Apache Beam FileIO 为每条记录写一个文件?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆