Google DataFlow:在邮件中附加文件名 [英] Google DataFlow: attaching filename to the message

查看:73
本文介绍了Google DataFlow:在邮件中附加文件名的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试构建Google DataFlow管道,该管道具有以下步骤:

I'm trying to build Google DataFlow pipeline, which has these steps:

  • 从发布/订阅主题中读取一条包含文件名的消息.
  • 从文件名中查找Google存储桶文件
  • 从文件中读取每一行
  • 将带有文件名的每一行作为一条消息发送给另一个主题

我的问题是我无法在最终输出消息中添加文件名. 当前实施:

My problem is that I can't add filename to the final output message. Current implementation:

ConnectorOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(ConnectorOptions.class);

Pipeline p = Pipeline.create(options);

p.apply("ReadFromTopic", PubsubIO.readMessages().fromTopic(options.getInputTopic()))
    .apply("CollectFiles", ParDo.of(new DoFn<PubsubMessage, String>() {
        @ProcessElement
        public void processElement(ProcessContext c) {
            String fileName = new String(c.element().getPayload());
            c.output("gs://bucket-name/" + fileName);
        }
    }))
    .apply("ReadLines", TextIO.readAll())
    .apply("WriteItemsToTopic", PubsubIO.writeStrings().to(options.getOutputTopic()));

p.run().waitUntilFinish();

此处之前,我看到了类似的问题,但对我来说这并不是一个切实可行的解决方案,因为我必须将文件名附加到每条输出消息上,而不仅仅是在每一行中进行解析. 有人可以让我知道可能的解决方案吗?

I saw similar question asked before here but it's not really a working solution for me because I have to attach filename to each output message not just parse per each line. Could anyone please let me know about possible solutions?

更新

感谢@jkff,我遵循了您的建议和我当前的解决方案代码:

Thanks @jkff, I followed your advice and my current solution code:

ConnectorOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(ConnectorOptions.class);
    Pipeline p = Pipeline.create(options);
    p.apply("ReadFromTopic", PubsubIO.readMessages().fromSubscription(options.getInputSubscription()))
            .apply("PrintMessages", ParDo.of(new DoFn<PubsubMessage, String>() {
                @ProcessElement
                public void processElement(ProcessContext c) {
                    String message = new String(c.element().getPayload());
                    c.output("gs://bucket/" + message);
                }
            }))
            .apply(FileIO.matchAll())
            .apply(FileIO.readMatches())
            .apply("ReadFile", ParDo.of(new DoFn<FileIO.ReadableFile, String>() {
                @ProcessElement
                public void processElement(ProcessContext c) throws IOException {
                    FileIO.ReadableFile f = c.element();

                    String filePath = f.getMetadata().resourceId().toString();
                    String fileName = filePath.substring(filePath.lastIndexOf("/") + 1);

                    ReadableByteChannel inChannel = f.open();
                    ByteBuffer buffer = ByteBuffer.allocate(1);
                    StringBuffer line = new StringBuffer();
                    while (inChannel.read(buffer) > 0) {
                        buffer.flip();
                        for (int i = 0; i < buffer.limit(); i++) {
                            char ch = ((char) buffer.get());
                            if (ch == '\r') {
                                c.output(line.toString() + " " + fileName);
                                line = new StringBuffer();
                            } else {
                                line.append(ch);
                            }
                        }
                        buffer.clear();
                    }
                    inChannel.close();
                }
            }))
            .apply("WriteItemsToTopic", PubsubIO.writeStrings().to(options.getOutputTopic()));
    p.run().waitUntilFinish();

推荐答案

您可以使用FileIO-使用FileIO.matchAll()后跟FileIO.readMatches()来获取PCollection<ReadableFile>,其中每个ReadableFile都可以用于获取文件名并读取文件.紧随其后的是一个DoFn,它可以执行您想要的操作.要读取文件,请使用ReadableFile.open()上的标准Java库工具.

You can use FileIO - use FileIO.matchAll() followed by FileIO.readMatches() to get a PCollection<ReadableFile>, where each ReadableFile can be used to get the filename and to read the file. Follow it by a DoFn that does what you want. To read the file, use standard Java library facilities on the ReadableFile's .open().

这篇关于Google DataFlow:在邮件中附加文件名的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆