Google DataFlow:在邮件中附加文件名 [英] Google DataFlow: attaching filename to the message
问题描述
我正在尝试构建Google DataFlow管道,该管道具有以下步骤:
I'm trying to build Google DataFlow pipeline, which has these steps:
- 从发布/订阅主题中读取一条包含文件名的消息.
- 从文件名中查找Google存储桶文件
- 从文件中读取每一行
- 将带有文件名的每一行作为一条消息发送给另一个主题
我的问题是我无法在最终输出消息中添加文件名. 当前实施:
My problem is that I can't add filename to the final output message. Current implementation:
ConnectorOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(ConnectorOptions.class);
Pipeline p = Pipeline.create(options);
p.apply("ReadFromTopic", PubsubIO.readMessages().fromTopic(options.getInputTopic()))
.apply("CollectFiles", ParDo.of(new DoFn<PubsubMessage, String>() {
@ProcessElement
public void processElement(ProcessContext c) {
String fileName = new String(c.element().getPayload());
c.output("gs://bucket-name/" + fileName);
}
}))
.apply("ReadLines", TextIO.readAll())
.apply("WriteItemsToTopic", PubsubIO.writeStrings().to(options.getOutputTopic()));
p.run().waitUntilFinish();
在此处之前,我看到了类似的问题,但对我来说这并不是一个切实可行的解决方案,因为我必须将文件名附加到每条输出消息上,而不仅仅是在每一行中进行解析. 有人可以让我知道可能的解决方案吗?
I saw similar question asked before here but it's not really a working solution for me because I have to attach filename to each output message not just parse per each line. Could anyone please let me know about possible solutions?
更新
感谢@jkff,我遵循了您的建议和我当前的解决方案代码:
Thanks @jkff, I followed your advice and my current solution code:
ConnectorOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(ConnectorOptions.class);
Pipeline p = Pipeline.create(options);
p.apply("ReadFromTopic", PubsubIO.readMessages().fromSubscription(options.getInputSubscription()))
.apply("PrintMessages", ParDo.of(new DoFn<PubsubMessage, String>() {
@ProcessElement
public void processElement(ProcessContext c) {
String message = new String(c.element().getPayload());
c.output("gs://bucket/" + message);
}
}))
.apply(FileIO.matchAll())
.apply(FileIO.readMatches())
.apply("ReadFile", ParDo.of(new DoFn<FileIO.ReadableFile, String>() {
@ProcessElement
public void processElement(ProcessContext c) throws IOException {
FileIO.ReadableFile f = c.element();
String filePath = f.getMetadata().resourceId().toString();
String fileName = filePath.substring(filePath.lastIndexOf("/") + 1);
ReadableByteChannel inChannel = f.open();
ByteBuffer buffer = ByteBuffer.allocate(1);
StringBuffer line = new StringBuffer();
while (inChannel.read(buffer) > 0) {
buffer.flip();
for (int i = 0; i < buffer.limit(); i++) {
char ch = ((char) buffer.get());
if (ch == '\r') {
c.output(line.toString() + " " + fileName);
line = new StringBuffer();
} else {
line.append(ch);
}
}
buffer.clear();
}
inChannel.close();
}
}))
.apply("WriteItemsToTopic", PubsubIO.writeStrings().to(options.getOutputTopic()));
p.run().waitUntilFinish();
推荐答案
您可以使用FileIO
-使用FileIO.matchAll()
后跟FileIO.readMatches()
来获取PCollection<ReadableFile>
,其中每个ReadableFile
都可以用于获取文件名并读取文件.紧随其后的是一个DoFn
,它可以执行您想要的操作.要读取文件,请使用ReadableFile
的.open()
上的标准Java库工具.
You can use FileIO
- use FileIO.matchAll()
followed by FileIO.readMatches()
to get a PCollection<ReadableFile>
, where each ReadableFile
can be used to get the filename and to read the file. Follow it by a DoFn
that does what you want. To read the file, use standard Java library facilities on the ReadableFile
's .open()
.
这篇关于Google DataFlow:在邮件中附加文件名的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!