使用apache-beam按顺序读取文件和文件夹 [英] reading files and folders in order with apache beam

查看:35
本文介绍了使用apache-beam按顺序读取文件和文件夹的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个 year/month/day/hour/* 类型的文件夹结构,我希望光束按时间顺序将其作为无限源读取.具体来说,这意味着在记录的第一个小时内读取所有文件并添加其内容以进行处理.然后,添加下一小时的文件内容进行处理,直到当前等待新文件到达最新year/month/day/hour文件夹的时间.

I have a folder structure of the type year/month/day/hour/*, and I'd like the beam to read this as an unbounded source in chronological order. Specifically, this means reading in all the files in the first hour on record and adding their contents for processing. Then, add the file contents of the next hour for processing, up until the current time where it waits for new files to arrive in the latest year/month/day/hour folder.

是否可以使用 apache beam 来做到这一点?

Is it possible to do this with apache beam?

推荐答案

所以我要做的是根据文件路径为每个元素添加时间戳.作为测试,我使用了以下示例.

So what I would do is to add timestamps to each element according to the file path. As a test I used the following example.

首先,如这个答案中所述,您可以使用FileIO来匹配连续一个文件模式.这将有所帮助,因为根据您的用例,一旦您完成回填,您希望在同一作业中继续读取新到达的文件.在这种情况下,我提供 gs://BUCKET_NAME/data/** 因为我的文件将类似于 gs://BUCKET_NAME/data/year/month/day/hour/filename.extension:

First of all, as explained in this answer, you can use FileIO to match continuously a file pattern. This will help as, per your use case, once you have finished with the backfill you want to keep reading new arriving files within the same job. In this case I provide gs://BUCKET_NAME/data/** because my files will be like gs://BUCKET_NAME/data/year/month/day/hour/filename.extension:

p
    .apply(FileIO.match()
    .filepattern(inputPath)
    .continuously(
        // Check for new files every minute
        Duration.standardMinutes(1),
        // Never stop checking for new files
        Watch.Growth.<String>never()))
    .apply(FileIO.readMatches())

可随意调整观看频率和超时时间.

Watch frequency and timeout can be adjusted at will.

然后,在下一步中,我们将收到匹配的文件.我将使用 ReadableFile.getMetadata().resourceId() 获取完整路径并通过 "/" 拆分它以构建相应的时间戳.我把它四舍五入到小时,这里不考虑时区校正.使用 readFullyAsUTF8String 我们将读取整个文件(如果整个文件不适合内存,请注意,如果需要,建议将输入分片)并将其拆分为行.使用 ProcessContext.outputWithTimestamp,我们将向下游发出文件名和行的 KV(不再需要文件名,但它有助于查看每个文件的来源)以及从路径派生的时间戳.请注意,我们正在及时"移动时间戳,因此这可能会干扰水印启发式算法,您将收到如下消息:

Then, in the next step we'll receive the matched file. I will use ReadableFile.getMetadata().resourceId() to get the full path and split it by "/" to build the corresponding timestamp. I round it to the hour and do not account for timezone correction here. With readFullyAsUTF8String we'll read the whole file (be careful if the whole file does not fit into memory, it is recommended to shard your input if needed) and split it into lines. With ProcessContext.outputWithTimestamp we'll emit downstream a KV of filename and line (the filename is not needed anymore but it will help to see where each file comes from) and the timestamp derived from the path. Note that we're shifting timestamps "back in time" so this can mess up with the watermark heuristics and you will get a message such as:

无法输出时间戳为 2019-03-17T00:00:00.000Z.输出时间戳不得早于当前输入的时间戳 (2019-06-05T15:41:29.645Z) 减去允许的偏差(0 毫秒).有关更改允许倾斜的详细信息,请参阅 DoFn#getAllowedTimestampSkew() Javadoc.

Cannot output with timestamp 2019-03-17T00:00:00.000Z. Output timestamps must be no earlier than the timestamp of the current input (2019-06-05T15:41:29.645Z) minus the allowed skew (0 milliseconds). See the DoFn#getAllowedTimestampSkew() Javadoc for details on changing the allowed skew.

为了克服这个问题,我将 getAllowedTimestampSkew 设置为 Long.MAX_VALUE 但考虑到这已被弃用.ParDo 代码:

To overcome this I set getAllowedTimestampSkew to Long.MAX_VALUE but take into account that this is deprecated. ParDo code:

.apply("Add Timestamps", ParDo.of(new DoFn<ReadableFile, KV<String, String>>() {

    @Override
    public Duration getAllowedTimestampSkew() {
        return new Duration(Long.MAX_VALUE);
    }

    @ProcessElement
    public void processElement(ProcessContext c) {
        ReadableFile file = c.element();
        String fileName = file.getMetadata().resourceId().toString();
        String lines[];

        String[] dateFields = fileName.split("/");
        Integer numElements = dateFields.length;

        String hour = dateFields[numElements - 2];
        String day = dateFields[numElements - 3];
        String month = dateFields[numElements - 4];
        String year = dateFields[numElements - 5];

        String ts = String.format("%s-%s-%s %s:00:00", year, month, day, hour);
        Log.info(ts);

        try{
            lines = file.readFullyAsUTF8String().split("
");

            for (String line : lines) {
                c.outputWithTimestamp(KV.of(fileName, line), new Instant(dateTimeFormat.parseMillis(ts)));
            }
        }

        catch(IOException e){
            Log.info("failed");
        }
    }}))

最后,我进入 1 小时 FixedWindows 并记录结果:

Finally, I window into 1-hour FixedWindows and log the results:

.apply(Window
    .<KV<String,String>>into(FixedWindows.of(Duration.standardHours(1)))
    .triggering(AfterWatermark.pastEndOfWindow())
    .discardingFiredPanes()
    .withAllowedLateness(Duration.ZERO))
.apply("Log results", ParDo.of(new DoFn<KV<String, String>, Void>() {
    @ProcessElement
    public void processElement(ProcessContext c, BoundedWindow window) {
        String file = c.element().getKey();
        String value = c.element().getValue();
        String eventTime = c.timestamp().toString();

        String logString = String.format("File=%s, Line=%s, Event Time=%s, Window=%s", file, value, eventTime, window.toString());
        Log.info(logString);
    }
}));

对我来说,它与 .withAllowedLateness(Duration.ZERO) 一起工作,但根据您可能需要设置的顺序.请记住,过高的值会导致窗口打开的时间更长并使用更多的持久存储.

For me it worked with .withAllowedLateness(Duration.ZERO) but depending on the order you might need to set it. Keep in mind that a value too high will cause windows to be open for longer and use more persistent storage.

我设置了 $BUCKET$PROJECT 变量,我只上传了两个文件:

I set the $BUCKET and $PROJECT variables and I just upload two files:

gsutil cp file1 gs://$BUCKET/data/2019/03/17/00/
gsutil cp file2 gs://$BUCKET/data/2019/03/18/22/

并使用以下命令运行作业:

And run the job with:

mvn -Pdataflow-runner compile -e exec:java 
 -Dexec.mainClass=com.dataflow.samples.ChronologicalOrder 
      -Dexec.args="--project=$PROJECT 
      --path=gs://$BUCKET/data/** 
      --stagingLocation=gs://$BUCKET/staging/ 
      --runner=DataflowRunner"

结果:

完整代码

让我知道这是如何工作的.这只是一个入门示例,您可能需要调整窗口和触发策略、延迟等以适应您的用例

Let me know how this works. This was just an example to get started and you might need to adjust windowing and triggering strategies, lateness, etc to suit your use case

这篇关于使用apache-beam按顺序读取文件和文件夹的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆