创建PCollectionView< Map< Map,String,String>>时如何解决重复值异常 [英] How to solve Duplicate values exception when I create PCollectionView<Map<String,String>>

查看:85
本文介绍了创建PCollectionView< Map< Map,String,String>>时如何解决重复值异常的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在Apache-Beam管道中设置一个变化缓慢的查找映射。它不断更新查找图。对于查找映射中的每个键,我使用累积模式在全局窗口中检索最新值。
但它总是遇到异常:

I'm setting up a slow-changing lookup Map in my Apache-Beam pipeline. It continuously updates the lookup map. For each key in lookup map, I retrieve the latest value in the global window with accumulating mode. But it always meets Exception :

org.apache.beam.sdk.Pipeline $ PipelineExecutionException:java.lang.IllegalArgumentException:Duplicate mykey的值

此代码段代码有什么问题吗?

Is anything wrong with this snippet code?

如果我使用 .discardingFiredPanes()代替,我将在最后一次发出时丢失信息。

If I use .discardingFiredPanes() instead, I will lose information in the last emit.

pipeline
  .apply(GenerateSequence.from(0).withRate(1, Duration.standardMinutes(1L)))
  .apply(
      Window.<Long>into(new GlobalWindows())
         .triggering(Repeatedly.forever(
             AfterProcessingTime.pastFirstElementInPane()))
         .accumulatingFiredPanes())
  .apply(new ReadSlowChangingTable())
  .apply(Latest.perKey())
  .apply(View.asMap());

示例输入触发器:

t1 : KV<k1,v1> KV< k2,v2>
t2 : KV<k1,v1>

accumulatingFiredPanes =>在t2的预期结果= > KV(k1,v1),KV(k2,v2)但由于重复的异常而失败

accumulatingFiredPanes => expected result at t2 => KV(k1,v1), KV(k2,v2) but failed due to duplicated exception

discardingFiredPanes = >预期结果在t2 => KV(k1,v1)成功

discardingFiredPanes => expected result at t2 => KV(k1,v1) Success

推荐答案

特别是关于到 view.asMap 并在评论中累积窗格讨论:

Specifically with regards to view.asMap and accumulating panes discussion in the comments:

如果您想使用 View.asMap 侧面输入(例如,当地图元素的源本身是分布式的时–通常是因为您是从上一个转换的输出中创建侧面输入),还有一些其他因素需要考虑: View.asMap 本身是一个聚合,它将继承触发并累积其输入。在这种特定模式下,即使在<$ c $之前使用了诸如 Latest.perKey 之类的转换,在此转换之前将管道设置为accumulationPanes模式也会导致重复的键错误。 c> View.asMap 转换。

If you would like to make use of the View.asMap side input (for example, when the source of the map elements is itself distributed – often because you are creating a side input from the output of a previous transform), there are some other factors that will need to be taken into consideration: View.asMap is itself an aggregation, it will inherit triggering and accumulate its input. In this specific pattern, setting the pipeline to accumulatingPanes mode before this transform will result in duplicate key errors even if a transform such as Latest.perKey is used before the View.asMap transform.

鉴于读取操作会更新整个地图,因此我认为使用 View.asSingleton 是更好的方法对于这个用例。

Given the read updates the whole map, then the use of View.asSingleton would I think be a better approach for this use case.

围绕此模式的一些一般性注释,也希望对其他人有用:

Some general notes around this pattern, which will hopefully be useful for others as well:

对于此模式我们可以使用 GenerateSequence 源转换来定期发出值,例如每天一次。通过在每个元素上激活的数据驱动触发器将该值传递到全局窗口中。在 DoFn 中,使用此过程作为触发从绑定源中提取数据创建 SideInput以便在下游使用

For this pattern we can use the GenerateSequence source transform to emit a value periodically for example once a day. Pass this value into a global window via a data-driven trigger that activates on each element. In a DoFn, use this process as a trigger to pull data from your bounded source Create your SideInput for use in downstream transforms.

需要特别注意的是,由于此模式使用了在处理时间上触发的全局窗口侧输入,因此与在事件时间内处理的元素的匹配将是不确定的。例如,如果我们有一个在事件时间上打开窗口的主管道,则这些窗口将看到的SideInput视图的版本将取决于在处理时间而不是事件时间上触发的最新触发器。

It's important to note that because this pattern uses a global-window side input triggering on processing time, matching to elements being processed in event time will be nondeterministic. For example if we have a main pipeline which is windowed on event time, the version of the SideInput View that those windows will see will depend on the latest trigger that has fired in processing time rather than the event time.

还要特别注意,一般来说,侧面输入应该适合内存。

Also important to note that in general the side input should be something that fits into memory.

Java(SDK 2.9.0):

Java (SDK 2.9.0):

在下面的示例中,输入在很短的时间内更新间隔,以便可以轻松看到效果。期望侧面输入正在缓慢更新,例如每隔几个小时或每天一次。

In the sample below the side input is updated at very short intervals, this is so that effects can be easily seen. The expectation is that the side input is updating slowly, for example every few hours or once a day.

在下面的示例代码中,我们使用了在中创建的 Map DoFn 成为View.asSingleton,这是此模式的推荐方法。

In the example code below we make use of a Map that we create in a DoFn which becomes the View.asSingleton, this is the recommended approach for this pattern.

下面的示例说明了这种模式,请注意 View.asSingleton 会在每次计数器更新时重建。

The sample below illustrates the pattern, please note the View.asSingleton is rebuilt on every counter update.

public static void main(String[] args) {

 // Create pipeline
 PipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation()
     .as(PipelineOptions.class);

 // Using View.asSingleton, this pipeline uses a dummy external service as illustration.
 // Run in debug mode to see the output
 Pipeline p = Pipeline.create(options);

 // Create slowly updating sideinput

 PCollectionView<Map<String, String>> map = p
     .apply(GenerateSequence.from(0).withRate(1, Duration.standardSeconds(5L)))

     .apply(Window.<Long>into(new GlobalWindows())
         .triggering(Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane()))
         .discardingFiredPanes())

     .apply(ParDo.of(new DoFn<Long, Map<String, String>>() {
       @ProcessElement public void process(@Element Long input,
           OutputReceiver<Map<String, String>> o) {
         // Do any external reads needed here...
         // We will make use of our dummy external service.
         // Every time this triggers, the complete map will be replaced with that read from 
         // the service.
         o.output(DummyExternalService.readDummyData());
       }

     })).apply(View.asSingleton());

 // ---- Consume slowly updating sideinput

 // GenerateSequence is only used here to generate dummy data for this illustration.
 // You would use your real source for example PubSubIO, KafkaIO etc...
 p.apply(GenerateSequence.from(0).withRate(1, Duration.standardSeconds(1L)))
     .apply(Window.into(FixedWindows.of(Duration.standardSeconds(1))))
     .apply(Sum.longsGlobally().withoutDefaults())
     .apply(ParDo.of(new DoFn<Long, KV<Long, Long>>() {

       @ProcessElement public void process(ProcessContext c) {
         Map<String, String> keyMap = c.sideInput(map);
         c.outputWithTimestamp(KV.of(1L, c.element()), Instant.now());

  LOG.debug("Value is {} key A is {} and key B is {}"
, c.element(), keyMap.get("Key_A"),keyMap.get("Key_B"));

       }
     }).withSideInputs(map));

 p.run();
}

public static class DummyExternalService {

 public static Map<String, String> readDummyData() {

   Map<String, String> map = new HashMap<>();
   Instant now = Instant.now();

   DateTimeFormatter dtf = DateTimeFormat.forPattern("HH:MM:SS");

   map.put("Key_A", now.minus(Duration.standardSeconds(30)).toString(dtf));
   map.put("Key_B", now.minus(Duration.standardSeconds(30)).toString());

   return map;

 }
}

这篇关于创建PCollectionView&lt; Map&lt; Map,String,String&gt;&gt;时如何解决重复值异常的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
相关文章
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆