将通过PubSub接收到的每一行写到自己在Cloud Storage上的文件 [英] Write each row received over PubSub to its own file on Cloud Storage

查看:51
本文介绍了将通过PubSub接收到的每一行写到自己在Cloud Storage上的文件的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在通过pubsub接收消息.每条消息都应作为粗略数据存储在GCS中自己的文件中,对该数据执行一些处理,然后将其保存到大查询中-数据中包含文件名.

I am receiving messages via pubsub. Each message should be stored in its own file in GCS as rough data, execute some processing on the data, and then save it to big query- having the file name in the data.

收到后应立即在BQ中看到数据.

Data should be seen immediately in BQ after received.

示例:

data published to pubsub : {a:1, b:2} 
data saved to GCS file UUID: A1F432 
data processing :  {a:1, b:2} -> 
                   {a:11, b: 22} -> 
                   {fileName: A1F432, data: {a:11, b: 22}} 
data in BQ : {fileName: A1F432, data: {a:11, b: 22}} 

想法是将处理后的数据存储在BQ中,该链接具有到GCS中存储的Rough数据的链接.

the idea is that the processed data is stored in BQ having a link to the Rough data stored in GCS.

这是我的代码.

public class BotPipline {

public static void main(String[] args) {

    DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
    options.setRunner(BlockingDataflowPipelineRunner.class);
    options.setProject(MY_PROJECT);
    options.setStagingLocation(MY_STAGING_LOCATION);
    options.setStreaming(true);

    Pipeline pipeline = Pipeline.create(options);

    PCollection<String> input = pipeline.apply(PubsubIO.Read.subscription(MY_SUBSCRIBTION));

    String uuid = ...;
    input.apply(TextIO.Write.to(MY_STORAGE_LOCATION + uuid));

    input
    .apply(ParDo.of(new DoFn<String,String>(){..}).named("updateJsonAndInsertUUID"))
    .apply(convertToTableRow(...)).named("convertJsonStringToTableRow"))
            .apply(BigQueryIO.Write.to(MY_BQ_TABLE).withSchema(tableSchema)
    );
    pipeline.run();
}

我的代码无法运行,因为在TextIO中编写了无界的集合.不支持写. 经过一番研究后,我发现有一些解决此问题的方法:

My code doesn't run since writing unbounded collections in TextIO.Write is not supported. After some research I found I have a few options to workaround this issue:

  1. 在数据流中创建自定义接收器
  2. 以我自己的DoFn的形式向GCS写入
  3. 使用可选的BoundedWindow访问数据窗口

我不知道如何开始. 任何人都可以为我提供以下解决方案之一的代码,或者给我一个与我的情况相匹配的其他解决方案. (提供代码)

I have no idea how to start. Can anyone provide me code for one of the following solutions, or give me a different solution which matches my case. (providing the code)

推荐答案

最好的选择是#2-一个简单的DoFn,它可以根据您的数据创建文件.像这样:

The best option is #2 - a simple DoFn that creates the files according to your data. Something like this:

class CreateFileFn extends DoFn<String, Void> {
  @ProcessElement
  public void process(ProcessContext c) throws IOException {
    String filename = ...generate filename from element...;
    try (WritableByteChannel channel = FileSystems.create(
            FileSystems.matchNewResource(filename, false),
            "application/text-plain")) {
      OutputStream out = Channels.newOutputStream(channel);
      ...write the element to out...
    }
  }
}

这篇关于将通过PubSub接收到的每一行写到自己在Cloud Storage上的文件的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆