将 MutationGroup 流式传输到 Spanner [英] Streaming MutationGroups into Spanner

本文介绍了将 MutationGroup 流式传输到 Spanner的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在尝试使用 SpannerIO 将 MutationGroups 流式传输到 spanner 中.目标是每 10 秒编写一次新的 MationGroup,因为我们将使用 spanner 查询近期 KPI.

I'm trying to stream MutationGroups into spanner with SpannerIO. The goal is to write new MuationGroups every 10 seconds, as we will use spanner to query near-time KPI's.

当我不使用任何窗口时,出现以下错误:

When I don't use any windows, I get the following error:

Exception in thread "main" java.lang.IllegalStateException: GroupByKey cannot be applied to non-bounded PCollection in the GlobalWindow without a trigger. Use a Window.into or Window.triggering transform prior to GroupByKey.
    at org.apache.beam.sdk.transforms.GroupByKey.applicableTo(GroupByKey.java:173)
    at org.apache.beam.sdk.transforms.GroupByKey.expand(GroupByKey.java:204)
    at org.apache.beam.sdk.transforms.GroupByKey.expand(GroupByKey.java:120)
    at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:537)
    at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:472)
    at org.apache.beam.sdk.values.PCollection.apply(PCollection.java:286)
    at org.apache.beam.sdk.transforms.Combine$PerKey.expand(Combine.java:1585)
    at org.apache.beam.sdk.transforms.Combine$PerKey.expand(Combine.java:1470)
    at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:537)
    at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:491)
    at org.apache.beam.sdk.values.PCollection.apply(PCollection.java:299)
    at org.apache.beam.sdk.io.gcp.spanner.SpannerIO$WriteGrouped.expand(SpannerIO.java:868)
    at org.apache.beam.sdk.io.gcp.spanner.SpannerIO$WriteGrouped.expand(SpannerIO.java:823)
    at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:537)
    at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:472)
    at org.apache.beam.sdk.values.PCollection.apply(PCollection.java:286)
    at quantum.base.transform.entity.spanner.SpannerProtoWrite.expand(SpannerProtoWrite.java:52)
    at quantum.base.transform.entity.spanner.SpannerProtoWrite.expand(SpannerProtoWrite.java:20)
    at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:537)
    at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:491)
    at org.apache.beam.sdk.values.PCollection.apply(PCollection.java:299)
    at quantum.entitybuilder.pipeline.EntityBuilderPipeline$Write$SpannerWrite.expand(EntityBuilderPipeline.java:388)
    at quantum.entitybuilder.pipeline.EntityBuilderPipeline$Write$SpannerWrite.expand(EntityBuilderPipeline.java:372)
    at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:537)
    at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:491)
    at org.apache.beam.sdk.values.PCollection.apply(PCollection.java:299)
    at quantum.entitybuilder.pipeline.EntityBuilderPipeline.main(EntityBuilderPipeline.java:122)
:entityBuilder FAILED

由于上述错误,我假设输入集合需要窗口化和触发,因为 SpannerIO 使用 GroupByKey(这也是我的用例所需要的):

Because of the error above I assume the input collection needs to be windowed and triggered, as SpannerIO uses a GroupByKey (this is also what I need for my use case):

        ...
        .apply("1-minute windows", Window.<MutationGroup>into(FixedWindows.of(Duration.standardMinutes(1)))
            .triggering(Repeatedly.forever(AfterProcessingTime
                    .pastFirstElementInPane()
                    .plusDelayOf(Duration.standardSeconds(10))
            ).orFinally(AfterWatermark.pastEndOfWindow()))
            .discardingFiredPanes()
            .withAllowedLateness(Duration.ZERO))
        .apply(SpannerIO.write()
                    .withProjectId(entityConfig.getSpannerProject())
                    .withInstanceId(entityConfig.getSpannerInstance())
                    .withDatabaseId(entityConfig.getSpannerDb())
                    .grouped());

执行此操作时,我在运行时遇到以下异常:

When I do this, I get the following exceptions during runtime:

java.lang.IllegalArgumentException: Attempted to get side input window for GlobalWindow from non-global WindowFn
        org.apache.beam.sdk.transforms.windowing.PartitioningWindowFn$1.getSideInputWindow(PartitioningWindowFn.java:49)
        com.google.cloud.dataflow.worker.StreamingModeExecutionContext$StepContext.issueSideInputFetch(StreamingModeExecutionContext.java:631)
        com.google.cloud.dataflow.worker.StreamingModeExecutionContext$UserStepContext.issueSideInputFetch(StreamingModeExecutionContext.java:683)
        com.google.cloud.dataflow.worker.StreamingSideInputFetcher.storeIfBlocked(StreamingSideInputFetcher.java:182)
        com.google.cloud.dataflow.worker.StreamingSideInputDoFnRunner.processElement(StreamingSideInputDoFnRunner.java:71)
        com.google.cloud.dataflow.worker.SimpleParDoFn.processElement(SimpleParDoFn.java:323)
        com.google.cloud.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:43)
        com.google.cloud.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:48)
        com.google.cloud.dataflow.worker.SimpleParDoFn$1.output(SimpleParDoFn.java:271)
        org.apache.beam.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:219)
        org.apache.beam.runners.core.SimpleDoFnRunner.access$700(SimpleDoFnRunner.java:69)
        org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:517)
        org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:505)
        org.apache.beam.sdk.values.ValueWithRecordId$StripIdsDoFn.processElement(ValueWithRecordId.java:145)

经过进一步调查,似乎是由于 SpannerIO 中的 .apply(Wait.on(input)):它有一个全局侧输入,似乎不适用于我的固定窗口,作为 Wait.java 状态的文档:

After investigating further it appears to be due to the .apply(Wait.on(input)) in SpannerIO: It has a global side input which does not seem to work with my fixed windows, as the docs of Wait.java state:

If signal is globally windowed, main input must also be. This typically would be useful
 *       only in a batch pipeline, because the global window of an infinite PCollection never
 *       closes, so the wait signal will never be ready.

作为临时解决方法,我尝试了以下方法:

As a temporary workaround I tried the following:

  • 添加一个带有触发器而不是固定窗口的 GlobalWindow:

  • add a GlobalWindow with triggers instead of fixed windows:

    .apply("globalwindow", Window.<MutationGroup>into(new GlobalWindows())
            .triggering(Repeatedly.forever(AfterProcessingTime
                    .pastFirstElementInPane()
                    .plusDelayOf(Duration.standardSeconds(10))
            ).orFinally(AfterWatermark.pastEndOfWindow()))
            .discardingFiredPanes()
            .withAllowedLateness(Duration.ZERO))

这只会在我排空管道时写入扳手.我的印象是 Wait.on() 信号仅在全局窗口关闭时触发,并且不适用于触发器.

This results in writes to spanner only when I drain my pipeline. I have the impression the Wait.on() signal is only triggered when the Global windows closes, and doesn't work with triggers.

禁用 SpannerIO 中的 .apply(Wait.on(input)):

Disable the .apply(Wait.on(input)) in SpannerIO:

这会导致管道卡在视图创建上在这篇SO帖子中描述:SpannerIO Dataflow 2.3.0 卡在 CreateDataflowView.

This results in the pipeline getting stuck on the view creation which is described in this SO post: SpannerIO Dataflow 2.3.0 stuck in CreateDataflowView.

当我检查工作日志以寻找线索时,我确实收到以下警告:

When I check the worker logs for clues, I do get the following warnings:

logger:  "org.apache.beam.sdk.coders.SerializableCoder"
message:  "Can't verify serialized elements of type SpannerSchema have well defined equals method. This may produce incorrect results on some PipelineRunner
logger:  "org.apache.beam.sdk.coders.SerializableCoder"   
message:  "Can't verify serialized elements of type BoundedSource have well defined equals method. This may produce incorrect results on some PipelineRunner"

请注意,一切都适用于 DirectRunner,而我正在尝试使用 DataflowRunner.

Note that everything works with the DirectRunner and that I'm trying to use the DataflowRunner.

有没有人对我可以尝试让它运行的事情有任何其他建议?我很难想象我是唯一一个试图将 MutationGroups 流式传输到 spanner 的人.

Does anyone have any other suggestions for things I can try to get this running? I can hardly imagine that I'm the only one trying to stream MutationGroups into spanner.

提前致谢!

推荐答案

目前,Beam Streaming 不支持 SpannerIO 连接器.请关注此拉取请求,其中添加了对扳手 IO 连接器的流支持.

Currently, SpannerIO connector is not supported with Beam Streaming. Please follow this Pull Request which adds streaming support for spanner IO connector.

这篇关于将 MutationGroup 流式传输到 Spanner的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆