GCP Dataflow 2.0 PubSub到GCS [英] GCP Dataflow 2.0 PubSub to GCS

查看:339
本文介绍了GCP Dataflow 2.0 PubSub到GCS的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我很难理解TextIO.write()的.withFileNamePolicy的概念。提供FileNamePolicy的要求看起来非常复杂,只需指定一个GCS存储桶来写流媒体文件即可。



在较高层次上,我的JSON消息正在流式传输发布到PubSub主题,我想将这些原始消息写入GCS中的文件进行永久存储(我还将对消息进行其他处理)。我最初开始使用这个Pipeline,认为它会非常简单:


$ b

  public static void main String [] args){

PipelineOptions选项= PipelineOptionsFactory.fromArgs(args).withValidation()。create();

管道p = Pipeline.create(选项);

p.apply(Read from PubSub,PubsubIO.readStrings()。fromTopic(topic))
.apply(Write to GCS,TextIO.write()。 gcs_bucket);

p.run();

}

我得到了有关需要WindowedWrites的错误,这是我应用的,然后需要一个FileNamePolicy。这是事情变得毛茸茸的地方。



我去了Beam docs并检出 FilenamePolicy 。看起来我需要扩展这个类,然后需要扩展其他抽象类来完成这个工作。不幸的是,关于Apache的文档有点缺乏,我找不到任何Dataflow 2.0的例子这样做,除 Wordcount示例,它甚至用于在帮助类中实现这些细节。



这项工作只是通过复制大量的WordCount示例,但我试图更好地理解这些细节。我有几个问题:

1)是否有任何路线图项目可以抽象出很多这种复杂性?看来我应该能够像在nonWindowedWrite中一样提供GCS存储桶,然后只提供一些基本选项,如时间和文件命名规则。我知道将流式窗口化数据写入文件比打开文件指针(或对象存储等效物)更复杂。 2)它看起来像这样工作,我需要创建一个WindowedContext对象,它需要提供一个BoundedWindow抽象类,和PaneInfo对象类,然后一些分片信息。可用于这些信息是相当裸露的,我很难知道所有这些实际需要什么,特别是考虑到我的简单用例。有没有可以实现这些的好例子?此外,它也看起来像我需要设置碎片的#作为TextIO.write的一部分,但也提供#碎片作为fileNamePolicy的一部分?



谢谢帮助我理解背后的细节,希望能学到一些东西!



编辑7/20/17
所以我终于通过扩展FilenamePolicy来运行这个管道。我的挑战是需要从Pu​​bSub中定义流数据的窗口。这是一个非常接近的代码表示形式:

  public class ReadData {
public static void main(String [] args){

PipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation()。create();

管道p = Pipeline.create(选项);

p.apply(Read From PubSub,PubsubIO.readStrings()。fromTopic(topic))
.apply(Window.into(FixedWindows.of(Duration.standardMinutes(1) ))
.apply(Write to GCS,TextIO.write()。to(gcs_bucket)
.withWindowedWrites()
.withFilenamePolicy(new TestPolicy())
.withNumShards(10));

p.run();



$ b class TestPolicy extends FileBasedSink.FilenamePolicy {
@Override
public ResourceId windowedFilename(
ResourceId outputDirectory, WindowedContext上下文,字符串扩展){
IntervalWindow窗口=(IntervalWindow)context.getWindow();
String filename = String.format(
%s-%s-%s-%s-of-%s.json,
test,
窗口。 start()。toString(),
window.end()。toString(),
context.getShardNumber(),
context.getShardNumber()
);
返回outputDirectory.resolve(filename,ResolveOptions.StandardResolveOptions.RESOLVE_FILE);

$ b @Override
public ResourceId unwindowedFilename(
ResourceId outputDirectory,Context context,String extension){
throw new UnsupportedOperationException(Unsupported。) ;
}
}


解决方案

Beam 2.0,下面是将来自PubSub的原始消息写入GCS上的窗口化文件的示例。该管道是相当可配置的,允许您通过参数和子目录策略指定窗口持续时间,如果您希望数据的逻辑子部分便于重新处理/归档。请注意,这对Apache Commons Lang 3有额外的依赖性。
$ b PubSubToGcs

  / ** 
*此流水线从Cloud Pub / Sub主题获取传入数据,
*将原始数据输出到指定输出的窗口文件
*目录。
* /
public class PubsubToGcs {

/ **
*管道支持的选项。
*
*< p>继承标准配置选项。< / p>
* /
public static interface Options扩展了DataflowPipelineOptions,StreamingOptions {
@Description(The Cloud Pub / Sub topic to read。)
@Required
ValueProvider< ;字符串> getTopic();
void setTopic(ValueProvider< String> value);

@Description(输出文件到的目录,必须以斜线结尾。)
@Required
ValueProvider< String> getOutputDirectory();
void setOutputDirectory(ValueProvider< String> value);

@Description(要写入的文件的文件名前缀)
@ Default.String(output)
@Required
ValueProvider< String> ; getOutputFilenamePrefix();
void setOutputFilenamePrefix(ValueProvider< String> value);

@Description(输出文件的分片模板,指定为字母'S'或'N'(例如:SSS-NNN)的重复序列
+。分别替换为
+分片号或分片数)
@ Default.String()
ValueProvider< String> getShardTemplate();
void setShardTemplate(ValueProvider< String> value);

@Description(要写入的文件的后缀)
@ Default.String()
ValueProvider< String> getOutputFilenameSuffix();
void setOutputFilenameSuffix(ValueProvider< String> value);

@Description(子目录策略在每个窗口输出时将使用哪些文件。)
@ Default.Enum(NONE)
SubDirectoryPolicy getSubDirectoryPolicy();
void setSubDirectoryPolicy(SubDirectoryPolicy value);

@Description(写入数据的窗口时间,默认为5米。
+允许的格式为:
+Ns(以秒为单位,例如:
+Nm(例如12分钟),
+Nh(小时,例如:2小时)。)
@ Default.String( 5m)
String getWindowDuration();
void setWindowDuration(String value);

@Description(写入时产生的输出分片的最大数量。)
@ Default.Integer(10)
Integer getNumShards();
void setNumShards(Integer value);
}

/ **
*执行管道的主入口点。
* @param args流水线的命令行参数。
* /
public static void main(String [] args){

选项选项= PipelineOptionsFactory
.fromArgs(args)
.withValidation()
.as(Options.class);

run(options);
}

/ **
*使用提供的选项运行管道。
*
* @param选项流水线的执行参数。
* @return管道执行的结果。
* /
public static PipelineResult run(Options options){
//创建管道
管道管道= Pipeline.create(options);

/ **
*步骤:
* 1)从PubSub读取字符串消息
* 2)将消息窗口分隔为由执行程序指定的分钟间隔。
* 3)将窗口化文件输出到GCS
* /
pipeline
.apply(Read PubSub Events,
PubsubIO
.readStrings()
.fromTopic(options.getTopic()))
.apply(options.getWindowDuration()+Window,
Window
.into(FixedWindows.of(parseDuration(options .getWindowDuration()))))
.apply(Write File(s),
TextIO
.write()
.withWindowedWrites()
.withNumShards (options.getNumShards())
.to(options.getOutputDirectory())
.withFilenamePolicy(
)WindowedFilenamePolicy(
options.getOutputFilenamePrefix(),
options。 getShardTemplate(),
options.getOutputFilenameSuffix())
.withSubDirectoryPolicy(options.getSubDirectoryPolicy())));

//执行管道并返回结果。
PipelineResult result = pipeline.run();

返回结果;
}

/ **
*从句点格式化的字符串中解析持续时间。价值
*被接受为以下格式:
*< p>
* Ns - 秒。例如:5s< br>
* Nm - 分钟。例如:13m< br>
* Nh - 小时。例如:2h
*
*< pre>
* parseDuration(null)= NullPointerException()
* parseDuration()= Duration.standardSeconds(0)
* parseDuration(2s)= Duration.standardSeconds(2)
* parseDuration(5m)= Duration.standardMinutes(5)
* parseDuration(3h)= Duration.standardHours(3)
*< / pre>
*
* @param value要解析的周期值。
* @return从提供的句号字符串中分析的{@link Duration}。
* /
private static Duration parseDuration(String value){
Preconditions.checkNotNull(value,指定的持续时间必须是非空值!);

PeriodParser parser = new PeriodFormatterBuilder()
.appendSeconds()。appendSuffix(s)
.appendMinutes()。appendSuffix(m)
。 appendHours()。appendSuffix(h)
.toParser();

MutablePeriod period = new MutablePeriod();
parser.parseInto(period,value,0,Locale.getDefault());

持续时间duration = period.toDurationFrom(new DateTime(0));
回报期;

}





WindowedFilenamePolicy

  / ** 
* @ WindowedFilenamePolicy}类将输出文件
*格式为output-yyyyMMdd'T'HHmmssZ-001-of-100.txt的指定位置。
* /
@SuppressWarnings(serial)
public class WindowedFilenamePolicy extends FilenamePolicy {
$ b $ / **
*可能的子目录创建模式。
* /
public static enum SubDirectoryPolicy {
NONE(。),
PER_HOUR(yyyy-MM-dd / HH),
PER_DAY( YYYY-MM-DD);

private final String subDirectoryPattern;

private SubDirectoryPolicy(String subDirectoryPattern){
this.subDirectoryPattern = subDirectoryPattern;
}

public String getSubDirectoryPattern(){
return subDirectoryPattern;
}

public String format(即时即时){
DateTimeFormatter formatter = DateTimeFormat.forPattern(subDirectoryPattern);
返回formatter.print(instant);
}
}

/ **
*用于格式化输出到文件名的窗口时间戳的格式化程序。
* /
private static final DateTimeFormatter formatter = ISODateTimeFormat
.basicDateTimeNoMillis()
.withZone(DateTimeZone.getDefault());

/ **
*文件名前缀。
* /
private final ValueProvider< String>字首;

/ **
* filenmae后缀。
* /
private final ValueProvider< String>后缀;

/ **
*文件格式化期间使用的分片模板。
* /
private final ValueProvider< String> shardTemplate;

/ **
*指定子目录何时或如何为窗口化文件输出创建
*的策略。
* /
私人ValueProvider< SubDirectoryPolicy> subDirectoryPolicy = StaticValueProvider.of(SubDirectoryPolicy.NONE);

/ **
*用
*提供的用于输出文件的前缀构造一个新的{@link WindowedFilenamePolicy}。
*
* @param prefix要附加到策略输出的所有文件的前缀。
* @param shardTemplate用于创建唯一命名的分片文件的模板。
* @param后缀追加到策略输出的所有文件的后缀。
* /
public WindowedFilenamePolicy(String prefix,String shardTemplate,String suffix){
this(StaticValueProvider.of(prefix),
StaticValueProvider.of(shardTemplate),
StaticValueProvider.of(后缀));
}

/ **
*用
*提供的用于输出文件的前缀构造一个新的{@link WindowedFilenamePolicy}。
*
* @param prefix要附加到策略输出的所有文件的前缀。
* @param shardTemplate用于创建唯一命名的分片文件的模板。
* @param后缀追加到策略输出的所有文件的后缀。
* /
public WindowedFilenamePolicy(
ValueProvider< String>前缀,
ValueProvider< String> shardTemplate,
ValueProvider< String>后缀){
this。前缀=前缀;
this.shardTemplate = shardTemplate;
this.suffix =后缀;
}

/ **
*子目录策略将根据已触发的窗口在
*文件系统上创建子目录。
*
* @param policy要应用的子目录策略。
* @return文件名策略实例。
* /
public WindowedFilenamePolicy withSubDirectoryPolicy(SubDirectoryPolicy policy){
return withSubDirectoryPolicy(StaticValueProvider.of(policy));
}

/ **
*子目录策略将根据已触发的窗口在
*文件系统上创建子目录。
*
* @param policy要应用的子目录策略。
* @return文件名策略实例。
* /
public WindowedFilenamePolicy withSubDirectoryPolicy(ValueProvider< SubDirectoryPolicy> policy){
this.subDirectoryPolicy = policy;
返回此;
}
$ b $ / **
*窗口化的文件名方法将以output-yyyyMMdd'T'HHmmss-001-的
*格式为每个窗口构造文件名-100.txt。
* /
@Override
public ResourceId windowedFilename(ResourceId outputDirectory,WindowedContext c,String extension){
Instant windowInstant = c.getWindow()。maxTimestamp();
String datetimeStr = formatter.print(windowInstant.toDateTime());

//当前缀为空时删除前缀,因此我们不会将文字'null'
//添加到文件名的开头
String filenamePrefix = prefix.get ()== null? datetimeStr:prefix.get()+ - + datetimeStr;
String filename = DefaultFilenamePolicy.constructName(
filenamePrefix,
shardTemplate.get(),
StringUtils.defaultIfBlank(suffix.get(),extension),//忽略扩展名赞成后缀。
c.getShardNumber(),
c.getNumShards());

String subDirectory = subDirectoryPolicy.get()。format(windowInstant);
return outputDirectory
.resolve(subDirectory,StandardResolveOptions.RESOLVE_DIRECTORY)
.resolve(filename,StandardResolveOptions.RESOLVE_FILE);
}

/ **
*此文件名策略不支持未加窗的写操作,因此如果调用,则会抛出{@link UnsupportedOperationException}
*。
* /
@Override
public ResourceId unwindowedFilename(ResourceId outputDirectory,Context c,String extension){
throw new UnsupportedOperationException(没有窗口的文件名策略用于未加载文件
+输出,请使用窗口化写入或切换文件名策略的WindowedFilenamePolicy。
}
}


I'm having a difficult time understanding the concepts of .withFileNamePolicy of TextIO.write(). The requirements for supplying a FileNamePolicy seem incredibly complex for doing something as simple as specifying a GCS bucket to write streamed filed.

At a high level, I have JSON messages being streamed to a PubSub topic, and I'd like to write those raw messages to files in GCS for permanent storage (I'll also be doing other processing on the messages). I initially started with this Pipeline, thinking it would be pretty simple:

public static void main(String[] args) {

        PipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().create();

        Pipeline p = Pipeline.create(options); 

        p.apply("Read From PubSub", PubsubIO.readStrings().fromTopic(topic))
            .apply("Write to GCS", TextIO.write().to(gcs_bucket);

        p.run();

    }

I got the error about needing WindowedWrites, which I applied, and then needing a FileNamePolicy. This is where things get hairy.

I went to the Beam docs and checked out FilenamePolicy. It looks like I would need to extend this class which then also require extending other abstract classes to make this work. Unfortunately the documentation on Apache is a bit scant and I can't find any examples for Dataflow 2.0 doing this, except for The Wordcount Example, which even then uses implements these details in a helper class.

So I could probably make this work just by copying much of the WordCount example, but I'm trying to better understand the details of this. A few questions I have:

1) Is there any roadmap item to abstract a lot of this complexity? It seems like I should be able to do supply a GCS bucket like I would in a nonWindowedWrite, and then just supply a few basic options like the timing and file naming rule. I know writing streaming windowed data to files is more complex than just opening a file pointer (or object storage equivalent).

2) It looks like to make this work, I need to create a WindowedContext object which requires supplying a BoundedWindow abstract class, and PaneInfo Object Class, and then some shard info. The information available for these is pretty bare and I'm having a hard time knowing what is actually needed for all of these, especially given my simple use case. Are there any good examples available that implement these? In addition, it also looks like I need the set the # of shards as part of TextIO.write, but then also supply # shards as part of the fileNamePolicy?

Thanks for anything in helping me understand the details behind this, hoping to learn a few things!

Edit 7/20/17 So I finally got this pipeline to run with extending the FilenamePolicy. My challenge was needing to define the window of the streaming data from PubSub. Here is a pretty close representation of the code:

public class ReadData {
    public static void main(String[] args) {

        PipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().create();

        Pipeline p = Pipeline.create(options);

        p.apply("Read From PubSub", PubsubIO.readStrings().fromTopic(topic))
            .apply(Window.into(FixedWindows.of(Duration.standardMinutes(1))))
            .apply("Write to GCS", TextIO.write().to("gcs_bucket")
                .withWindowedWrites()
                .withFilenamePolicy(new TestPolicy())
                .withNumShards(10));

        p.run();

    }
}

class TestPolicy extends FileBasedSink.FilenamePolicy {
    @Override
    public ResourceId windowedFilename(
        ResourceId outputDirectory, WindowedContext context, String extension) {
        IntervalWindow window = (IntervalWindow) context.getWindow();
        String filename = String.format(
            "%s-%s-%s-%s-of-%s.json",
            "test",
            window.start().toString(),
            window.end().toString(),
            context.getShardNumber(),
            context.getShardNumber()
        );
        return outputDirectory.resolve(filename, ResolveOptions.StandardResolveOptions.RESOLVE_FILE);
    }

    @Override
    public ResourceId unwindowedFilename(
        ResourceId outputDirectory, Context context, String extension) {
        throw new UnsupportedOperationException("Unsupported.");
    }
}

解决方案

In Beam 2.0, the below is an example of writing the raw messages from PubSub out into windowed files on GCS. The pipeline is fairly configurable, allowing you to specify the window duration via a parameter and a sub directory policy if you want logical subsections of your data for ease of reprocessing / archiving. Note that this has an additional dependency on Apache Commons Lang 3.

PubSubToGcs

/**
 * This pipeline ingests incoming data from a Cloud Pub/Sub topic and
 * outputs the raw data into windowed files at the specified output
 * directory.
 */
public class PubsubToGcs {

  /**
   * Options supported by the pipeline.
   * 
   * <p>Inherits standard configuration options.</p>
   */
  public static interface Options extends DataflowPipelineOptions, StreamingOptions {
    @Description("The Cloud Pub/Sub topic to read from.")
    @Required
    ValueProvider<String> getTopic();
    void setTopic(ValueProvider<String> value);

    @Description("The directory to output files to. Must end with a slash.")
    @Required
    ValueProvider<String> getOutputDirectory();
    void setOutputDirectory(ValueProvider<String> value);

    @Description("The filename prefix of the files to write to.")
    @Default.String("output")
    @Required
    ValueProvider<String> getOutputFilenamePrefix();
    void setOutputFilenamePrefix(ValueProvider<String> value);

    @Description("The shard template of the output file. Specified as repeating sequences "
        + "of the letters 'S' or 'N' (example: SSS-NNN). These are replaced with the "
        + "shard number, or number of shards respectively")
    @Default.String("")
    ValueProvider<String> getShardTemplate();
    void setShardTemplate(ValueProvider<String> value);

    @Description("The suffix of the files to write.")
    @Default.String("")
    ValueProvider<String> getOutputFilenameSuffix();
    void setOutputFilenameSuffix(ValueProvider<String> value);

    @Description("The sub-directory policy which files will use when output per window.")
    @Default.Enum("NONE")
    SubDirectoryPolicy getSubDirectoryPolicy();
    void setSubDirectoryPolicy(SubDirectoryPolicy value);

    @Description("The window duration in which data will be written. Defaults to 5m. "
        + "Allowed formats are: "
        + "Ns (for seconds, example: 5s), "
        + "Nm (for minutes, example: 12m), "
        + "Nh (for hours, example: 2h).")
    @Default.String("5m")
    String getWindowDuration();
    void setWindowDuration(String value);

    @Description("The maximum number of output shards produced when writing.")
    @Default.Integer(10)
    Integer getNumShards();
    void setNumShards(Integer value);
  }

  /**
   * Main entry point for executing the pipeline.
   * @param args  The command-line arguments to the pipeline.
   */
  public static void main(String[] args) {

    Options options = PipelineOptionsFactory
        .fromArgs(args)
        .withValidation()
        .as(Options.class);

    run(options);
  }

  /**
   * Runs the pipeline with the supplied options.
   * 
   * @param options The execution parameters to the pipeline.
   * @return  The result of the pipeline execution.
   */
  public static PipelineResult run(Options options) {
    // Create the pipeline
    Pipeline pipeline = Pipeline.create(options);

    /**
     * Steps:
     *   1) Read string messages from PubSub
     *   2) Window the messages into minute intervals specified by the executor.
     *   3) Output the windowed files to GCS
     */
    pipeline
      .apply("Read PubSub Events",
        PubsubIO
          .readStrings()
          .fromTopic(options.getTopic()))
      .apply(options.getWindowDuration() + " Window", 
          Window
            .into(FixedWindows.of(parseDuration(options.getWindowDuration()))))
      .apply("Write File(s)",
          TextIO
            .write()
            .withWindowedWrites()
            .withNumShards(options.getNumShards())
            .to(options.getOutputDirectory())
            .withFilenamePolicy(
                new WindowedFilenamePolicy(
                    options.getOutputFilenamePrefix(),
                    options.getShardTemplate(),
                    options.getOutputFilenameSuffix())
                .withSubDirectoryPolicy(options.getSubDirectoryPolicy())));

    // Execute the pipeline and return the result.
    PipelineResult result = pipeline.run();

    return result;
  }

  /**
   * Parses a duration from a period formatted string. Values
   * are accepted in the following formats:
   * <p>
   * Ns - Seconds. Example: 5s<br>
   * Nm - Minutes. Example: 13m<br>
   * Nh - Hours. Example: 2h
   * 
   * <pre>
   * parseDuration(null) = NullPointerException()
   * parseDuration("")   = Duration.standardSeconds(0)
   * parseDuration("2s") = Duration.standardSeconds(2)
   * parseDuration("5m") = Duration.standardMinutes(5)
   * parseDuration("3h") = Duration.standardHours(3)
   * </pre>
   * 
   * @param value The period value to parse.
   * @return  The {@link Duration} parsed from the supplied period string.
   */
  private static Duration parseDuration(String value) {
    Preconditions.checkNotNull(value, "The specified duration must be a non-null value!");

    PeriodParser parser = new PeriodFormatterBuilder()
      .appendSeconds().appendSuffix("s")
      .appendMinutes().appendSuffix("m")
      .appendHours().appendSuffix("h")
      .toParser();

    MutablePeriod period = new MutablePeriod();
    parser.parseInto(period, value, 0, Locale.getDefault());

    Duration duration = period.toDurationFrom(new DateTime(0));
    return duration;
  }
}


WindowedFilenamePolicy

/**
 * The {@link WindowedFilenamePolicy} class will output files
 * to the specified location with a format of output-yyyyMMdd'T'HHmmssZ-001-of-100.txt.
 */
@SuppressWarnings("serial")
public class WindowedFilenamePolicy extends FilenamePolicy {

    /**
     * Possible sub-directory creation modes.
     */
    public static enum SubDirectoryPolicy {
        NONE("."),
        PER_HOUR("yyyy-MM-dd/HH"),
        PER_DAY("yyyy-MM-dd");

        private final String subDirectoryPattern;

        private SubDirectoryPolicy(String subDirectoryPattern) {
            this.subDirectoryPattern = subDirectoryPattern;
        }

        public String getSubDirectoryPattern() {
            return subDirectoryPattern;
        }

        public String format(Instant instant) {
            DateTimeFormatter formatter = DateTimeFormat.forPattern(subDirectoryPattern);
            return formatter.print(instant);
        }
    }

    /**
     * The formatter used to format the window timestamp for outputting to the filename.
     */
    private static final DateTimeFormatter formatter = ISODateTimeFormat
            .basicDateTimeNoMillis()
            .withZone(DateTimeZone.getDefault());

    /**
     * The filename prefix.
     */
    private final ValueProvider<String> prefix;

    /**
     * The filenmae suffix.
     */
    private final ValueProvider<String> suffix;

    /**
     * The shard template used during file formatting.
     */
    private final ValueProvider<String> shardTemplate;

    /**
     * The policy which dictates when or if sub-directories are created
     * for the windowed file output.
     */
    private ValueProvider<SubDirectoryPolicy> subDirectoryPolicy = StaticValueProvider.of(SubDirectoryPolicy.NONE);

    /**
     * Constructs a new {@link WindowedFilenamePolicy} with the
     * supplied prefix used for output files.
     * 
     * @param prefix    The prefix to append to all files output by the policy.
     * @param shardTemplate The template used to create uniquely named sharded files.
     * @param suffix    The suffix to append to all files output by the policy.
     */
    public WindowedFilenamePolicy(String prefix, String shardTemplate, String suffix) {
        this(StaticValueProvider.of(prefix), 
                StaticValueProvider.of(shardTemplate),
                StaticValueProvider.of(suffix));
    }

    /**
     * Constructs a new {@link WindowedFilenamePolicy} with the
     * supplied prefix used for output files.
     * 
     * @param prefix    The prefix to append to all files output by the policy.
     * @param shardTemplate The template used to create uniquely named sharded files.
     * @param suffix    The suffix to append to all files output by the policy.
     */
    public WindowedFilenamePolicy(
            ValueProvider<String> prefix, 
            ValueProvider<String> shardTemplate, 
            ValueProvider<String> suffix) {
        this.prefix = prefix;
        this.shardTemplate = shardTemplate;
        this.suffix = suffix; 
    }

    /**
     * The subdirectory policy will create sub-directories on the
     * filesystem based on the window which has fired.
     * 
     * @param policy    The subdirectory policy to apply.
     * @return The filename policy instance.
     */
    public WindowedFilenamePolicy withSubDirectoryPolicy(SubDirectoryPolicy policy) {
        return withSubDirectoryPolicy(StaticValueProvider.of(policy));
    }

    /**
     * The subdirectory policy will create sub-directories on the
     * filesystem based on the window which has fired.
     * 
     * @param policy    The subdirectory policy to apply.
     * @return The filename policy instance.
     */
    public WindowedFilenamePolicy withSubDirectoryPolicy(ValueProvider<SubDirectoryPolicy> policy) {
        this.subDirectoryPolicy = policy;
        return this;
    }

    /**
     * The windowed filename method will construct filenames per window in the
     * format of output-yyyyMMdd'T'HHmmss-001-of-100.txt.
     */
    @Override
    public ResourceId windowedFilename(ResourceId outputDirectory, WindowedContext c, String extension) {
        Instant windowInstant = c.getWindow().maxTimestamp();
        String datetimeStr = formatter.print(windowInstant.toDateTime());

        // Remove the prefix when it is null so we don't append the literal 'null'
        // to the start of the filename
        String filenamePrefix = prefix.get() == null ? datetimeStr : prefix.get() + "-" + datetimeStr;
        String filename = DefaultFilenamePolicy.constructName(
                filenamePrefix, 
                shardTemplate.get(), 
                StringUtils.defaultIfBlank(suffix.get(), extension),  // Ignore the extension in favor of the suffix.
                c.getShardNumber(), 
                c.getNumShards());

        String subDirectory = subDirectoryPolicy.get().format(windowInstant);
        return outputDirectory
                .resolve(subDirectory, StandardResolveOptions.RESOLVE_DIRECTORY)
                .resolve(filename, StandardResolveOptions.RESOLVE_FILE);
    }

    /**
     * Unwindowed writes are unsupported by this filename policy so an {@link UnsupportedOperationException}
     * will be thrown if invoked.
     */
    @Override
    public ResourceId unwindowedFilename(ResourceId outputDirectory, Context c, String extension) {
    throw new UnsupportedOperationException("There is no windowed filename policy for unwindowed file"
        + " output. Please use the WindowedFilenamePolicy with windowed writes or switch filename policies.");
    }
}

这篇关于GCP Dataflow 2.0 PubSub到GCS的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆