将MutationGroups流式传输到Spanner [英] Streaming MutationGroups into Spanner

本文介绍了将MutationGroups流式传输到Spanner的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试使用SpannerIO将MutationGroups流式传输到扳手中。
的目标是每10秒编写一个新的MuationGroup,因为我们将使用扳手查询近期的KPI。

I'm trying to stream MutationGroups into spanner with SpannerIO. The goal is to write new MuationGroups every 10 seconds, as we will use spanner to query near-time KPI's.

当我不使用任何窗口时,出现以下错误:

When I don't use any windows, I get the following error:

Exception in thread "main" java.lang.IllegalStateException: GroupByKey cannot be applied to non-bounded PCollection in the GlobalWindow without a trigger. Use a Window.into or Window.triggering transform prior to GroupByKey.
    at org.apache.beam.sdk.transforms.GroupByKey.applicableTo(GroupByKey.java:173)
    at org.apache.beam.sdk.transforms.GroupByKey.expand(GroupByKey.java:204)
    at org.apache.beam.sdk.transforms.GroupByKey.expand(GroupByKey.java:120)
    at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:537)
    at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:472)
    at org.apache.beam.sdk.values.PCollection.apply(PCollection.java:286)
    at org.apache.beam.sdk.transforms.Combine$PerKey.expand(Combine.java:1585)
    at org.apache.beam.sdk.transforms.Combine$PerKey.expand(Combine.java:1470)
    at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:537)
    at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:491)
    at org.apache.beam.sdk.values.PCollection.apply(PCollection.java:299)
    at org.apache.beam.sdk.io.gcp.spanner.SpannerIO$WriteGrouped.expand(SpannerIO.java:868)
    at org.apache.beam.sdk.io.gcp.spanner.SpannerIO$WriteGrouped.expand(SpannerIO.java:823)
    at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:537)
    at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:472)
    at org.apache.beam.sdk.values.PCollection.apply(PCollection.java:286)
    at quantum.base.transform.entity.spanner.SpannerProtoWrite.expand(SpannerProtoWrite.java:52)
    at quantum.base.transform.entity.spanner.SpannerProtoWrite.expand(SpannerProtoWrite.java:20)
    at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:537)
    at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:491)
    at org.apache.beam.sdk.values.PCollection.apply(PCollection.java:299)
    at quantum.entitybuilder.pipeline.EntityBuilderPipeline$Write$SpannerWrite.expand(EntityBuilderPipeline.java:388)
    at quantum.entitybuilder.pipeline.EntityBuilderPipeline$Write$SpannerWrite.expand(EntityBuilderPipeline.java:372)
    at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:537)
    at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:491)
    at org.apache.beam.sdk.values.PCollection.apply(PCollection.java:299)
    at quantum.entitybuilder.pipeline.EntityBuilderPipeline.main(EntityBuilderPipeline.java:122)
:entityBuilder FAILED

由于上述错误,我认为输入集合需要进行窗口化和触发,因为SpannerIO使用GroupByKey(这也是我需要的用例):

Because of the error above I assume the input collection needs to be windowed and triggered, as SpannerIO uses a GroupByKey (this is also what I need for my use case):

        ...
        .apply("1-minute windows", Window.<MutationGroup>into(FixedWindows.of(Duration.standardMinutes(1)))
            .triggering(Repeatedly.forever(AfterProcessingTime
                    .pastFirstElementInPane()
                    .plusDelayOf(Duration.standardSeconds(10))
            ).orFinally(AfterWatermark.pastEndOfWindow()))
            .discardingFiredPanes()
            .withAllowedLateness(Duration.ZERO))
        .apply(SpannerIO.write()
                    .withProjectId(entityConfig.getSpannerProject())
                    .withInstanceId(entityConfig.getSpannerInstance())
                    .withDatabaseId(entityConfig.getSpannerDb())
                    .grouped());

这样做时,在运行时出现以下异常:

When I do this, I get the following exceptions during runtime:

java.lang.IllegalArgumentException: Attempted to get side input window for GlobalWindow from non-global WindowFn
        org.apache.beam.sdk.transforms.windowing.PartitioningWindowFn$1.getSideInputWindow(PartitioningWindowFn.java:49)
        com.google.cloud.dataflow.worker.StreamingModeExecutionContext$StepContext.issueSideInputFetch(StreamingModeExecutionContext.java:631)
        com.google.cloud.dataflow.worker.StreamingModeExecutionContext$UserStepContext.issueSideInputFetch(StreamingModeExecutionContext.java:683)
        com.google.cloud.dataflow.worker.StreamingSideInputFetcher.storeIfBlocked(StreamingSideInputFetcher.java:182)
        com.google.cloud.dataflow.worker.StreamingSideInputDoFnRunner.processElement(StreamingSideInputDoFnRunner.java:71)
        com.google.cloud.dataflow.worker.SimpleParDoFn.processElement(SimpleParDoFn.java:323)
        com.google.cloud.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:43)
        com.google.cloud.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:48)
        com.google.cloud.dataflow.worker.SimpleParDoFn$1.output(SimpleParDoFn.java:271)
        org.apache.beam.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:219)
        org.apache.beam.runners.core.SimpleDoFnRunner.access$700(SimpleDoFnRunner.java:69)
        org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:517)
        org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:505)
        org.apache.beam.sdk.values.ValueWithRecordId$StripIdsDoFn.processElement(ValueWithRecordId.java:145)

进一步调查之后它似乎是由于S中的 .apply(Wait.on(input)) pannerIO:它具有全局输入,似乎与我的固定窗口不兼容,因为 Wait.java 的文档状态为:

After investigating further it appears to be due to the .apply(Wait.on(input)) in SpannerIO: It has a global side input which does not seem to work with my fixed windows, as the docs of Wait.java state:

If signal is globally windowed, main input must also be. This typically would be useful
 *       only in a batch pipeline, because the global window of an infinite PCollection never
 *       closes, so the wait signal will never be ready.

作为临时解决方法,我尝试了以下操作:

As a temporary workaround I tried the following:


  • 添加带有触发器而不是固定窗口的GlobalWindow:

  • add a GlobalWindow with triggers instead of fixed windows:

    .apply("globalwindow", Window.<MutationGroup>into(new GlobalWindows())
            .triggering(Repeatedly.forever(AfterProcessingTime
                    .pastFirstElementInPane()
                    .plusDelayOf(Duration.standardSeconds(10))
            ).orFinally(AfterWatermark.pastEndOfWindow()))
            .discardingFiredPanes()
            .withAllowedLateness(Duration.ZERO))

仅当我排空管道时,这会导致写入扳手。我的印象是 Wait.on()信号仅在全局窗口关闭时触发,并且不适用于触发器。

This results in writes to spanner only when I drain my pipeline. I have the impression the Wait.on() signal is only triggered when the Global windows closes, and doesn't work with triggers.

在SpannerIO中禁用 .apply(Wait.on(input))

Disable the .apply(Wait.on(input)) in SpannerIO:

这导致流水线卡在视图创建中,在此SO帖子中描述了

SpannerIO Dataflow 2.3.0卡在CreateDataflowView中

This results in the pipeline getting stuck on the view creation which is described in this SO post: SpannerIO Dataflow 2.3.0 stuck in CreateDataflowView.

当我检查工作日志中的线索时,我确实收到以下警告:

When I check the worker logs for clues, I do get the following warnings:

logger:  "org.apache.beam.sdk.coders.SerializableCoder"
message:  "Can't verify serialized elements of type SpannerSchema have well defined equals method. This may produce incorrect results on some PipelineRunner
logger:  "org.apache.beam.sdk.coders.SerializableCoder"   
message:  "Can't verify serialized elements of type BoundedSource have well defined equals method. This may produce incorrect results on some PipelineRunner"


请注意,一切都可以与DirectRunner一起使用,并且我正在尝试使用DataflowRunner。

Note that everything works with the DirectRunner and that I'm trying to use the DataflowRunner.

有人对我可以尝试使它运行的事情有其他建议吗?

Does anyone have any other suggestions for things I can try to get this running? I can hardly imagine that I'm the only one trying to stream MutationGroups into spanner.

预先感谢!

推荐答案

当前,Beam Streaming不支持SpannerIO连接器。请遵循此拉取请求 增加了对扳手IO连接器的流式支持。

Currently, SpannerIO connector is not supported with Beam Streaming. Please follow this Pull Request which adds streaming support for spanner IO connector.

这篇关于将MutationGroups流式传输到Spanner的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆