使用DoFn使用Cloud Dataflow从PubSub写入Google Cloud Storage [英] Writing to Google Cloud Storage from PubSub using Cloud Dataflow using DoFn
问题描述
我正在尝试使用Google Cloud Dataflow将Google PubSub消息写入Google Cloud Storage.我知道TextIO/AvroIO不支持流传输管道.但是,我在[1]中读到,有可能在作者评论中的ParDo/DoFn
中通过流水线写入GCS.我尽可能地关注他们的文章,从而构建了一条管道.
I am trying write Google PubSub messages to Google Cloud Storage using Google Cloud Dataflow. I know that TextIO/AvroIO do not support streaming pipelines. However, I read in [1] that it is possible to write to GCS in a streaming pipeline from a ParDo/DoFn
in a comment by the author. I constructed a pipeline by following their article as closely as I could.
我的目标是这种行为:
- 在与消息在
dataflow-requests/[isodate-time]/[paneIndex]
中发布的时间相对应的路径下,最多可将多达100批消息写到GCS中的对象(每个窗格一个).
- Messages written out in a batches of up to 100 to objects in GCS (one per window pane) under a path that corresponds to the time the message was published in
dataflow-requests/[isodate-time]/[paneIndex]
.
我得到不同的结果:
- 每个小时的窗口中只有一个窗格.因此,我每个小时的存储桶"中只会得到一个文件(这实际上是GCS中的一个对象路径).将MAX_EVENTS_IN_FILE减少到10没什么区别,仍然只有一个窗格/文件.
- 每个GCS对象中只有一条消息被写出
- 在写入GCS时,管道有时会引发CRC错误.
如何解决这些问题并获得期望的行为?
How do I fix these problems and get the behaviour I'm expecting?
示例日志输出:
21:30:06.977 writing pane 0 to blob dataflow-requests/2016-04-08T20:59:59.999Z/0
21:30:06.977 writing pane 0 to blob dataflow-requests/2016-04-08T20:59:59.999Z/0
21:30:07.773 sucessfully write pane 0 to blob dataflow-requests/2016-04-08T20:59:59.999Z/0
21:30:07.846 sucessfully write pane 0 to blob dataflow-requests/2016-04-08T20:59:59.999Z/0
21:30:07.847 writing pane 0 to blob dataflow-requests/2016-04-08T20:59:59.999Z/0
这是我的代码:
package com.example.dataflow;
import com.google.cloud.dataflow.sdk.Pipeline;
import com.google.cloud.dataflow.sdk.io.PubsubIO;
import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions;
import com.google.cloud.dataflow.sdk.options.PipelineOptions;
import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
import com.google.cloud.dataflow.sdk.transforms.DoFn;
import com.google.cloud.dataflow.sdk.transforms.ParDo;
import com.google.cloud.dataflow.sdk.transforms.windowing.*;
import com.google.cloud.dataflow.sdk.values.PCollection;
import com.google.gcloud.storage.BlobId;
import com.google.gcloud.storage.BlobInfo;
import com.google.gcloud.storage.Storage;
import com.google.gcloud.storage.StorageOptions;
import org.joda.time.Duration;
import org.joda.time.format.ISODateTimeFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
public class PubSubGcsSSCCEPipepline {
private static final Logger LOG = LoggerFactory.getLogger(PubSubGcsSSCCEPipepline.class);
public static final String BUCKET_PATH = "dataflow-requests";
public static final String BUCKET_NAME = "myBucketName";
public static final Duration ONE_DAY = Duration.standardDays(1);
public static final Duration ONE_HOUR = Duration.standardHours(1);
public static final Duration TEN_SECONDS = Duration.standardSeconds(10);
public static final int MAX_EVENTS_IN_FILE = 100;
public static final String PUBSUB_SUBSCRIPTION = "projects/myProjectId/subscriptions/requests-dataflow";
private static class DoGCSWrite extends DoFn<String, Void>
implements DoFn.RequiresWindowAccess {
public transient Storage storage;
{ init(); }
public void init() { storage = StorageOptions.defaultInstance().service(); }
private void readObject(java.io.ObjectInputStream in)
throws IOException, ClassNotFoundException {
init();
}
@Override
public void processElement(ProcessContext c) throws Exception {
String isoDate = ISODateTimeFormat.dateTime().print(c.window().maxTimestamp());
String blobName = String.format("%s/%s/%s", BUCKET_PATH, isoDate, c.pane().getIndex());
BlobId blobId = BlobId.of(BUCKET_NAME, blobName);
LOG.info("writing pane {} to blob {}", c.pane().getIndex(), blobName);
storage.create(BlobInfo.builder(blobId).contentType("text/plain").build(), c.element().getBytes());
LOG.info("sucessfully write pane {} to blob {}", c.pane().getIndex(), blobName);
}
}
public static void main(String[] args) {
PipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().create();
options.as(DataflowPipelineOptions.class).setStreaming(true);
Pipeline p = Pipeline.create(options);
PubsubIO.Read.Bound<String> readFromPubsub = PubsubIO.Read.named("ReadFromPubsub")
.subscription(PUBSUB_SUBSCRIPTION);
PCollection<String> streamData = p.apply(readFromPubsub);
PCollection<String> windows = streamData.apply(Window.<String>into(FixedWindows.of(ONE_HOUR))
.withAllowedLateness(ONE_DAY)
.triggering(AfterWatermark.pastEndOfWindow()
.withEarlyFirings(AfterPane.elementCountAtLeast(MAX_EVENTS_IN_FILE))
.withLateFirings(AfterFirst.of(AfterPane.elementCountAtLeast(MAX_EVENTS_IN_FILE),
AfterProcessingTime.pastFirstElementInPane()
.plusDelayOf(TEN_SECONDS))))
.discardingFiredPanes());
windows.apply(ParDo.of(new DoGCSWrite()));
p.run();
}
}
[1] https://labs.spotify.com/2016/03/10/spotifys-event-delivery-the-road-to-the-cloud-part-iii/
感谢Sam McVeety提供的解决方案.这是任何阅读者的更正代码:
Thanks to Sam McVeety for the solution. Here is the corrected code for anyone reading:
package com.example.dataflow;
import com.google.cloud.dataflow.sdk.Pipeline;
import com.google.cloud.dataflow.sdk.io.PubsubIO;
import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions;
import com.google.cloud.dataflow.sdk.options.PipelineOptions;
import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
import com.google.cloud.dataflow.sdk.transforms.*;
import com.google.cloud.dataflow.sdk.transforms.windowing.*;
import com.google.cloud.dataflow.sdk.values.KV;
import com.google.cloud.dataflow.sdk.values.PCollection;
import com.google.gcloud.WriteChannel;
import com.google.gcloud.storage.BlobId;
import com.google.gcloud.storage.BlobInfo;
import com.google.gcloud.storage.Storage;
import com.google.gcloud.storage.StorageOptions;
import org.joda.time.Duration;
import org.joda.time.format.ISODateTimeFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Iterator;
public class PubSubGcsSSCCEPipepline {
private static final Logger LOG = LoggerFactory.getLogger(PubSubGcsSSCCEPipepline.class);
public static final String BUCKET_PATH = "dataflow-requests";
public static final String BUCKET_NAME = "myBucketName";
public static final Duration ONE_DAY = Duration.standardDays(1);
public static final Duration ONE_HOUR = Duration.standardHours(1);
public static final Duration TEN_SECONDS = Duration.standardSeconds(10);
public static final int MAX_EVENTS_IN_FILE = 100;
public static final String PUBSUB_SUBSCRIPTION = "projects/myProjectId/subscriptions/requests-dataflow";
private static class DoGCSWrite extends DoFn<Iterable<String>, Void>
implements DoFn.RequiresWindowAccess {
public transient Storage storage;
{ init(); }
public void init() { storage = StorageOptions.defaultInstance().service(); }
private void readObject(java.io.ObjectInputStream in)
throws IOException, ClassNotFoundException {
init();
}
@Override
public void processElement(ProcessContext c) throws Exception {
String isoDate = ISODateTimeFormat.dateTime().print(c.window().maxTimestamp());
long paneIndex = c.pane().getIndex();
String blobName = String.format("%s/%s/%s", BUCKET_PATH, isoDate, paneIndex);
BlobId blobId = BlobId.of(BUCKET_NAME, blobName);
LOG.info("writing pane {} to blob {}", paneIndex, blobName);
WriteChannel writer = storage.writer(BlobInfo.builder(blobId).contentType("text/plain").build());
LOG.info("blob stream opened for pane {} to blob {} ", paneIndex, blobName);
int i=0;
for (Iterator<String> it = c.element().iterator(); it.hasNext();) {
i++;
writer.write(ByteBuffer.wrap(it.next().getBytes()));
LOG.info("wrote {} elements to blob {}", i, blobName);
}
writer.close();
LOG.info("sucessfully write pane {} to blob {}", paneIndex, blobName);
}
}
public static void main(String[] args) {
PipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().create();
options.as(DataflowPipelineOptions.class).setStreaming(true);
Pipeline p = Pipeline.create(options);
PubsubIO.Read.Bound<String> readFromPubsub = PubsubIO.Read.named("ReadFromPubsub")
.subscription(PUBSUB_SUBSCRIPTION);
PCollection<String> streamData = p.apply(readFromPubsub);
PCollection<KV<String, String>> keyedStream =
streamData.apply(WithKeys.of(new SerializableFunction<String, String>() {
public String apply(String s) { return "constant"; } }));
PCollection<KV<String, Iterable<String>>> keyedWindows = keyedStream
.apply(Window.<KV<String, String>>into(FixedWindows.of(ONE_HOUR))
.withAllowedLateness(ONE_DAY)
.triggering(AfterWatermark.pastEndOfWindow()
.withEarlyFirings(AfterPane.elementCountAtLeast(MAX_EVENTS_IN_FILE))
.withLateFirings(AfterFirst.of(AfterPane.elementCountAtLeast(MAX_EVENTS_IN_FILE),
AfterProcessingTime.pastFirstElementInPane()
.plusDelayOf(TEN_SECONDS))))
.discardingFiredPanes())
.apply(GroupByKey.create());
PCollection<Iterable<String>> windows = keyedWindows
.apply(Values.<Iterable<String>>create());
windows.apply(ParDo.of(new DoGCSWrite()));
p.run();
}
}
推荐答案
这里有一个陷阱,那就是您需要GroupByKey
以便适当地聚合窗格. Spotify示例将其引用为窗格的标准化是在聚合事件"转换中完成的,这仅是GroupByKey转换",但这是一个微妙的要点.您需要提供一个密钥才能执行此操作,在您的情况下,它似乎可以使用恒定值.
There's a gotcha here, which is that you'll need a GroupByKey
in order for the panes to be aggregated appropriate. The Spotify example references this as "Materialization of panes is done in "Aggregate Events" transform which is nothing else than a GroupByKey transform", but it's a subtle point. You'll need to provide a key in order to do this, and in your case, it appears a constant value will work.
PCollection<String> streamData = p.apply(readFromPubsub);
PCollection<KV<String, String>> keyedStream =
streamData.apply(WithKeys.of(new SerializableFunction<String, String>() {
public Integer apply(String s) { return "constant"; } }));
这时,您可以应用窗口功能,然后应用最终的GroupByKey
以获得所需的行为:
At this point, you can apply your windowing function, and then a final GroupByKey
to get the desired behavior:
PCollection<String, Iterable<String>> keyedWindows = keyedStream.apply(...)
.apply(GroupByKey.create());
PCollection<Iterable<String>> windows = keyedWindows
.apply(Values.<Iterable<String>>create());
现在processElement
中的元素将是Iterable<String>
,大小为100或更大.
Now the elements in processElement
will be Iterable<String>
, with size 100 or more.
我们已提交 https://issues.apache.org/jira/browse/BEAM-184 使这种行为更清晰.
We've filed https://issues.apache.org/jira/browse/BEAM-184 to make this behavior clearer.
这篇关于使用DoFn使用Cloud Dataflow从PubSub写入Google Cloud Storage的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!