使用Cloud Dataflow从PubSub将数据流式传输到Google Cloud Storage [英] Streaming data to Google Cloud Storage from PubSub using Cloud Dataflow

查看:137
本文介绍了使用Cloud Dataflow从PubSub将数据流式传输到Google Cloud Storage的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在使用数据流中的流数据来监听来自pub-sub的数据. 然后,我需要上传到存储,处理数据并将其上传到bigquery.

I am listening to data from pub-sub using streaming data in dataflow. Then I need to upload to storage, process the data and upload it to bigquery.

这是我的代码:

public class BotPipline {

public static void main(String[] args) {

    DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
    options.setRunner(BlockingDataflowPipelineRunner.class);
    options.setProject(MY_PROJECT);
    options.setStagingLocation(MY_STAGING_LOCATION);
    options.setStreaming(true);

    Pipeline pipeline = Pipeline.create(options);

    PCollection<String> input = pipeline.apply(PubsubIO.Read.maxNumRecords(1).subscription(MY_SUBSCRIBTION));

    input.apply(TextIO.Write.to(MY_STORAGE_LOCATION));

    input
    .apply(someDataProcessing(...)).named("update json"))
    .apply(convertToTableRow(...)).named("convert json to table row"))
            .apply(BigQueryIO.Write.to(MY_BQ_TABLE).withSchema(tableSchema)
    );
    pipeline.run();
}

}

当我运行注释注释编写以存储代码"的代码时,效果很好. 但是当我尝试上传到大型查询时,出现此错误(这是预期的..):

when I run the code commenting the Writing to storage the code works well. but when I try uploading to big query I get this error (which is expected..):

Write can only be applied to a Bounded PCollection

我没有使用bound,因为我需要一直运行,并且需要立即上传数据. 有什么办法吗?

I am not using bound since I need to run this all the time and I need the data to be uploaded immediately . Any solution?

这是我想要的行为:

this my desired behavior:

我正在通过pubsub接收消息. 每封邮件都应作为粗略数据存储在GCS中自己的文件中, 对数据执行一些处理,然后将其保存到大查询中-在数据中具有文件名.

I am receiving messages via pubsub. Each message should be stored in its own file in GCS as rough data, execute some processing on the data, and then save it to big query- having the file name in the data.

在收到BQ后应立即查看数据 例子:

Data should be seen immediately after received in BQ example :

data published to pubsub : {a:1, b:2} 
data saved to GCS file UUID: A1F432 
data processing :  {a:1, b:2} -> 
                   {a:11, b: 22} -> 
                   {fileName: A1F432, data: {a:11, b: 22}} 
data in BQ : {fileName: A1F432, data: {a:11, b: 22}} 

想法是将处理后的数据存储在BQ中,该链接具有指向GCS中存储的Rough数据的链接

the idea is that the processed data is stored in BQ having a link to the Rough data stored in GCS

推荐答案

当前,我们不支持在TextIO.Write中编写无界集合.参见相关问题.

Currently we don't support writing unbounded collections in TextIO.Write. See related question.

您能否阐明您希望无限制的TextIO.Write的行为是什么?例如.您是否想要一个不断增长的文件,或每个窗口一个文件,在关闭窗口时关闭,或其他原因,还是对您而言无关紧要的是,写入文件的总内容最终将包含所有PubSub消息,但是文件的结构如何等等都没关系?

Could you clarify what you would like the behavior of unbounded TextIO.Write to be? E.g. would you like to have one constantly growing file, or one file per window, closed when the window closes, or something else, or does it only matter to you that the total contents of the files written will eventually contain all the PubSub messages but it doesn't matter how the files are structured, etc?

作为一种解决方法,您可以使用IOChannelFactory与GCS交互,以自己的DoFn方式实现对GCS的写入(实际上,TextIO.Write只是用户可能拥有的复合转换从头开始写自己.)

As a workaround, you can implement writing to GCS as your own DoFn, using IOChannelFactory to interact with GCS (in fact, TextIO.Write is, under the hood, just a composite transform that a user could have written themselves from scratch).

您可以使用@ProcessElement上的可选BoundedWindow参数访问数据窗口.如果您解释所需的行为,我将能够提供更多建议.

You can access the window of the data using the optional BoundedWindow parameter on @ProcessElement. I'd be able to provide more advice if you explain the desired behavior.

这篇关于使用Cloud Dataflow从PubSub将数据流式传输到Google Cloud Storage的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
相关文章
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆