Google DataFlow:将文件名附加到消息中 [英] Google DataFlow: attaching filename to the message

查看:25
本文介绍了Google DataFlow:将文件名附加到消息中的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试构建 Google DataFlow 管道,其中包含以下步骤:

I'm trying to build Google DataFlow pipeline, which has these steps:

  • 从发布/订阅主题中读取包含文件名的消息.
  • 从文件名在谷歌存储桶文件中查找
  • 读取文件中的每一行
  • 将带有文件名的每一行作为一条消息发送到另一个主题

我的问题是我无法将文件名添加到最终输出消息中.当前实现:

My problem is that I can't add filename to the final output message. Current implementation:

ConnectorOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(ConnectorOptions.class);

Pipeline p = Pipeline.create(options);

p.apply("ReadFromTopic", PubsubIO.readMessages().fromTopic(options.getInputTopic()))
    .apply("CollectFiles", ParDo.of(new DoFn<PubsubMessage, String>() {
        @ProcessElement
        public void processElement(ProcessContext c) {
            String fileName = new String(c.element().getPayload());
            c.output("gs://bucket-name/" + fileName);
        }
    }))
    .apply("ReadLines", TextIO.readAll())
    .apply("WriteItemsToTopic", PubsubIO.writeStrings().to(options.getOutputTopic()));

p.run().waitUntilFinish();

我在这里之前看到过类似的问题a> 但这对我来说并不是一个真正可行的解决方案,因为我必须将文件名附加到每个输出消息,而不仅仅是每行解析.谁能告诉我可能的解决方案?

I saw similar question asked before here but it's not really a working solution for me because I have to attach filename to each output message not just parse per each line. Could anyone please let me know about possible solutions?

更新

谢谢@jkff,我听从了你的建议和我当前的解决方案代码:

Thanks @jkff, I followed your advice and my current solution code:

ConnectorOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(ConnectorOptions.class);
    Pipeline p = Pipeline.create(options);
    p.apply("ReadFromTopic", PubsubIO.readMessages().fromSubscription(options.getInputSubscription()))
            .apply("PrintMessages", ParDo.of(new DoFn<PubsubMessage, String>() {
                @ProcessElement
                public void processElement(ProcessContext c) {
                    String message = new String(c.element().getPayload());
                    c.output("gs://bucket/" + message);
                }
            }))
            .apply(FileIO.matchAll())
            .apply(FileIO.readMatches())
            .apply("ReadFile", ParDo.of(new DoFn<FileIO.ReadableFile, String>() {
                @ProcessElement
                public void processElement(ProcessContext c) throws IOException {
                    FileIO.ReadableFile f = c.element();

                    String filePath = f.getMetadata().resourceId().toString();
                    String fileName = filePath.substring(filePath.lastIndexOf("/") + 1);

                    ReadableByteChannel inChannel = f.open();
                    ByteBuffer buffer = ByteBuffer.allocate(1);
                    StringBuffer line = new StringBuffer();
                    while (inChannel.read(buffer) > 0) {
                        buffer.flip();
                        for (int i = 0; i < buffer.limit(); i++) {
                            char ch = ((char) buffer.get());
                            if (ch == '\r') {
                                c.output(line.toString() + " " + fileName);
                                line = new StringBuffer();
                            } else {
                                line.append(ch);
                            }
                        }
                        buffer.clear();
                    }
                    inChannel.close();
                }
            }))
            .apply("WriteItemsToTopic", PubsubIO.writeStrings().to(options.getOutputTopic()));
    p.run().waitUntilFinish();

推荐答案

您可以使用 FileIO - 使用 FileIO.matchAll() 后跟 FileIO.readMatches() 得到一个 PCollection,其中每个 ReadableFile 可用于获取文件名和读取文件.跟随它的 DoFn 做你想做的事.要读取文件,请使用 ReadableFile.open() 上的标准 Java 库工具.

You can use FileIO - use FileIO.matchAll() followed by FileIO.readMatches() to get a PCollection<ReadableFile>, where each ReadableFile can be used to get the filename and to read the file. Follow it by a DoFn that does what you want. To read the file, use standard Java library facilities on the ReadableFile's .open().

这篇关于Google DataFlow:将文件名附加到消息中的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆