PubSub到Spanner流传输管道 [英] PubSub to Spanner Streaming Pipeline

查看:77
本文介绍了PubSub到Spanner流传输管道的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试将JSON类型的PubSub消息流式传输到扳手数据库,并且insert_update效果很好.Spanner表具有复合主键,因此需要在从PubSub插入新数据之前删除现有数据(因此仅提供最新数据).扳手替换或插入/更新突变在这种情况下不起作用.我添加了管道

I am trying to stream PubSub message of type JSON to spanner database and the insert_update works very well. Spanner table has composite primary key, so need to delete the existing data before inserting new data from PubSub (so only latest data is present). Spanner replace or insert/update mutations does not work in this case. I added pipeline


import org.apache.beam.* ;

public class PubSubToSpannerPipeline {

  // JSON to TableData Object
  public static class PubSubToTableDataFn extends DoFn<String, TableData> {

    @ProcessElement
    public void processElement(ProcessContext c) {
      .
      .
      .
    }
  }

  public interface PubSubToSpannerOptions extends PipelineOptions, StreamingOptions {
    .
    .
    .
  }

  public static void main(String[] args) {
    PubSubToSpannerOptions options = PipelineOptionsFactory
        .fromArgs(args)
        .withValidation()
        .as(PubSubToSpannerOptions.class);
    options.setStreaming(true);

    SpannerConfig spannerConfig =
        SpannerConfig.create()
        .withProjectId(options.getProjectId())
        .withInstanceId(options.getInstanceId())
        .withDatabaseId(options.getDatabaseId());

    Pipeline pipeLine = Pipeline.create(options);

    PCollection<TableData> tableDataMsgs = pipeLine.apply(PubsubIO.readStrings()
        .fromSubscription(options.getInputSubscription()))
        .apply("ParsePubSubMessage", ParDo.of(new PubSubToTableDataFn ()));

    // Window function
    PCollection<TableData> tableDataJson = tableDataMsgs
        .apply(Window.into(FixedWindows.of(Duration.standardMinutes(1))));

    PCollection<MutationGroup> upsertMutationGroup = tableDataJson.apply("TableDataMutation",
        MapElements.via(new SimpleFunction<TableData, MutationGroup>() {

          public MutationGroup apply(TableData input) {

            String object_id = input.objectId;

            pipeLine.apply("ReadExistingData", SpannerIO.read()
                .withSpannerConfig(spannerConfig)
                .withQuery("SELECT object_id, mapped_object_id, mapped_object_name from TableName where object_id ='" + object_id + "'")
            .apply("MutationForExistingTableData", 
                    ParDo.of(new DoFn<Struct, Mutation>(){
                      @ProcessElement
                      public void processElement(ProcessContext c) {
                        Struct str = c.element();
                        c.output(Mutation.delete("TableName", KeySet.newBuilder()
                            .addKey(Key.newBuilder()
                                .append(str.getString("object_id"))
                                .append(str.getString("mapped_object_id"))
                                .append(str.getString("mapped_object_name")).build()).build()));
                      }
                    } ))
            .apply("DeleteExistingTableData", SpannerIO.write().withSpannerConfig(spannerConfig));

              Mutation dataMutation = Mutation.newReplaceBuilder("TableName",
                  .
                  .
                  .

                  );
              List<Mutation> list = new ArrayList<Mutation>();


              List<Map<String, String>> mappingList = input.listOfObjectRows;

              for (Map<String, String> objectMap : mappingList ) {
                list.add(Mutation.newReplaceBuilder("TableName",
                    .
                    .
                    .);
              }     

              return MutationGroup.create(dataMutation, list);


          }
        } )));


        upsertMutationGroup.apply("WriteDataToSpanner", SpannerIO.write()
            .withSpannerConfig(spannerConfig)
            .grouped());

        // Run the pipeline.
        pipeLine.run().waitUntilFinish();
  }

}

class TableData implements Serializable {
  String objectId;
  List<Map<String, String>> listOfObjectRows;

}

期望是现有的映射数据必须在插入或更新数据之前从表中删除.

Expectation is existing mapping data must be deleted from table before insert or updating the data.

推荐答案

我不确定您在做什么,但是您似乎想要:

I am not entirely sure what you are doing, but it looks like you want to:

  • 使用与pubsub消息匹配的键(或部分键)读取一些现有数据
  • 删除此数据
  • 从pubsub消息中插入新数据

一个选择是创建一个 DoFn ,该代码在读写事务中执行此读取/删除/插入(或读取/更新)操作.这样可以保持数据库的一致性...

One option is to create a DoFn that performs this read/delete/insert, (or a read/update) within a read-write transaction. This will preserve the DB consistency...

使用

Use the SpannerIO.WriteFn as a model - you need to set the SpannerAccessor as transient and create/delete it in the @Setup and @Teardown event handlers

DoFn @ProcessElement 处理程序将创建

The @ProcessElement handler of your DoFn would create a Read-write Transaction, inside which you would reads the rows for the key, update or delete them and then inserts the new element(s).

此方法的缺点是,每个Spanner事务将仅处理一条Pub/Sub消息(除非您在上一步中进行了巧妙的操作(例如将它们分组)),这是一个复杂的读写事务.如果您的消息/秒速率相对较低,则可以,但是,如果不这样,则此方法将给数据库增加更多的负载.

The disadvantage of this method is that only one Pub/Sub message will be processed per Spanner transaction (unless you do something clever in a previous step such as grouping them), and this is a complex read-write transaction. If your messages/sec rate is relatively low this would be fine, but if not, this method would be putting a lot more load on your DB.

第二种选择是对键范围进行盲删除.仅当object_id是复合键的第一部分(它似乎来自您的代码)时,此方法才有效.

A second option is to use blind deletes of a key-range. This can only work if the object_id is the first part of the composite key (which it appears to be from your code).

您将创建一个包含删除突变的 MutationGroup ,该突变将使用具有键范围的Delete突变来盲删除键以object_id开头的所有现有行,然后使用插入突变来替换已删除的行行.

You would create a MutationGroup containing a delete mutation which blind-deletes any existing rows whose keys start with the object_id using a Delete mutation with a key-range, followed by insert mutations to replace the deleted rows.

MutationGroup.create(
    // Delete rows with key starting with object_id.
    Mutation.delete("TableName", KeySet.newBuilder()
        .addRange(
            KeyRange.closedClosed(
                Key.of(str.getString("object_id")),
                Key.of(str.getString("object_id"))))
        .build()),
    // Insert replacement rows.
    Mutation.newInsertBuilder("TableName")
        .set("column").to("value"),
        ...
        .build(),
    Mutation.newInsertBuilder("TableName")
        ...);

然后将其像以前一样传递给SpannerIO.write().grouped(),以便可以批量处理它们以提高效率.

This would then be passed to SpannerIO.write().grouped() as before so that they can be batched for efficiency.

这篇关于PubSub到Spanner流传输管道的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆