创建 PCollectionView<Map<String,String>> 时如何解决重复值异常 [英] How to solve Duplicate values exception when I create PCollectionView&lt;Map&lt;String,String&gt;&gt;

查看:26
本文介绍了创建 PCollectionView<Map<String,String>> 时如何解决重复值异常的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在我的 Apache-Beam 管道中设置一个缓慢变化的查找映射.它不断更新查找图.对于查找映射中的每个键,我使用累积模式在全局窗口中检索最新值.但它总是遇到异常:

I'm setting up a slow-changing lookup Map in my Apache-Beam pipeline. It continuously updates the lookup map. For each key in lookup map, I retrieve the latest value in the global window with accumulating mode. But it always meets Exception :

org.apache.beam.sdk.Pipeline$PipelineExecutionException: java.lang.IllegalArgumentException: mykey 的重复值

这段代码有什么问题吗?

Is anything wrong with this snippet code?

如果我使用 .discardingFiredPanes() 代替,我会在最后一次发出时丢失信息.

If I use .discardingFiredPanes() instead, I will lose information in the last emit.

pipeline
  .apply(GenerateSequence.from(0).withRate(1, Duration.standardMinutes(1L)))
  .apply(
      Window.<Long>into(new GlobalWindows())
         .triggering(Repeatedly.forever(
             AfterProcessingTime.pastFirstElementInPane()))
         .accumulatingFiredPanes())
  .apply(new ReadSlowChangingTable())
  .apply(Latest.perKey())
  .apply(View.asMap());

示例输入触发器:

t1 : KV<k1,v1> KV< k2,v2>
t2 : KV<k1,v1>

accumulatingFiredPanes => t2 时的预期结果 => KV(k1,v1), KV(k2,v2) 但由于重复异常而失败

accumulatingFiredPanes => expected result at t2 => KV(k1,v1), KV(k2,v2) but failed due to duplicated exception

discardingFiredPanes => t2 的预期结果 => KV(k1,v1) 成功

discardingFiredPanes => expected result at t2 => KV(k1,v1) Success

推荐答案

特别是关于 view.asMap 和评论中的累积窗格讨论:

Specifically with regards to view.asMap and accumulating panes discussion in the comments:

如果您想使用 View.asMap 侧输入(例如,当地图元素的源本身是分布式的 - 通常是因为您正在从输出创建侧输入之前的转换),还有一些其他因素需要考虑: View.asMap 本身是一个聚合,它将继承触发并累积其输入.在此特定模式中,即使在 View.asMap 之前使用了诸如 Latest.perKey 之类的转换,在此转换之前将管道设置为 accumulatingPanes 模式也会导致重复键错误转变.

If you would like to make use of the View.asMap side input (for example, when the source of the map elements is itself distributed – often because you are creating a side input from the output of a previous transform), there are some other factors that will need to be taken into consideration: View.asMap is itself an aggregation, it will inherit triggering and accumulate its input. In this specific pattern, setting the pipeline to accumulatingPanes mode before this transform will result in duplicate key errors even if a transform such as Latest.perKey is used before the View.asMap transform.

鉴于读取更新了整个地图,那么我认为使用 View.asSingleton 会是这个用例的更好方法.

Given the read updates the whole map, then the use of View.asSingleton would I think be a better approach for this use case.

关于此模式的一些一般性说明,希望对其他人也有用:

Some general notes around this pattern, which will hopefully be useful for others as well:

对于这种模式,我们可以使用 GenerateSequence 源转换定期发出一个值,例如每天一次.通过在每个元素上激活的数据驱动触发器将此值传递到全局窗口中.在 DoFn 中,使用此过程作为触发器从有界源中提取数据Create SideInput 以用于下游转换.

For this pattern we can use the GenerateSequence source transform to emit a value periodically for example once a day. Pass this value into a global window via a data-driven trigger that activates on each element. In a DoFn, use this process as a trigger to pull data from your bounded source Create your SideInput for use in downstream transforms.

需要注意的是,由于此模式使用在处理时间触发的全局窗口侧输入,与在事件时间处理的元素的匹配将是不确定的.例如,如果我们有一个在事件时间窗口化的主管道,这些窗口将看到的 SideInput 视图的版本将取决于在处理时间而不是事件时间触发的最新触发器.

It's important to note that because this pattern uses a global-window side input triggering on processing time, matching to elements being processed in event time will be nondeterministic. For example if we have a main pipeline which is windowed on event time, the version of the SideInput View that those windows will see will depend on the latest trigger that has fired in processing time rather than the event time.

同样重要的是要注意,一般来说,侧输入应该是适合内存的东西.

Also important to note that in general the side input should be something that fits into memory.

Java(SDK 2.9.0):

Java (SDK 2.9.0):

在下面的示例中,侧边输入以非常短的间隔更新,这样可以很容易地看到效果.期望侧输入更新缓慢,例如每隔几个小时或一天更新一次.

In the sample below the side input is updated at very short intervals, this is so that effects can be easily seen. The expectation is that the side input is updating slowly, for example every few hours or once a day.

在下面的示例代码中,我们使用了在成为 View.asSingleton 的 DoFn 中创建的 Map,这是此模式的推荐方法.

In the example code below we make use of a Map that we create in a DoFn which becomes the View.asSingleton, this is the recommended approach for this pattern.

下面的示例说明了该模式,请注意 View.asSingleton 在每次计数器更新时都会重新构建.

The sample below illustrates the pattern, please note the View.asSingleton is rebuilt on every counter update.

public static void main(String[] args) {

 // Create pipeline
 PipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation()
     .as(PipelineOptions.class);

 // Using View.asSingleton, this pipeline uses a dummy external service as illustration.
 // Run in debug mode to see the output
 Pipeline p = Pipeline.create(options);

 // Create slowly updating sideinput

 PCollectionView<Map<String, String>> map = p
     .apply(GenerateSequence.from(0).withRate(1, Duration.standardSeconds(5L)))

     .apply(Window.<Long>into(new GlobalWindows())
         .triggering(Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane()))
         .discardingFiredPanes())

     .apply(ParDo.of(new DoFn<Long, Map<String, String>>() {
       @ProcessElement public void process(@Element Long input,
           OutputReceiver<Map<String, String>> o) {
         // Do any external reads needed here...
         // We will make use of our dummy external service.
         // Every time this triggers, the complete map will be replaced with that read from 
         // the service.
         o.output(DummyExternalService.readDummyData());
       }

     })).apply(View.asSingleton());

 // ---- Consume slowly updating sideinput

 // GenerateSequence is only used here to generate dummy data for this illustration.
 // You would use your real source for example PubSubIO, KafkaIO etc...
 p.apply(GenerateSequence.from(0).withRate(1, Duration.standardSeconds(1L)))
     .apply(Window.into(FixedWindows.of(Duration.standardSeconds(1))))
     .apply(Sum.longsGlobally().withoutDefaults())
     .apply(ParDo.of(new DoFn<Long, KV<Long, Long>>() {

       @ProcessElement public void process(ProcessContext c) {
         Map<String, String> keyMap = c.sideInput(map);
         c.outputWithTimestamp(KV.of(1L, c.element()), Instant.now());

  LOG.debug("Value is {} key A is {} and key B is {}"
, c.element(), keyMap.get("Key_A"),keyMap.get("Key_B"));

       }
     }).withSideInputs(map));

 p.run();
}

public static class DummyExternalService {

 public static Map<String, String> readDummyData() {

   Map<String, String> map = new HashMap<>();
   Instant now = Instant.now();

   DateTimeFormatter dtf = DateTimeFormat.forPattern("HH:MM:SS");

   map.put("Key_A", now.minus(Duration.standardSeconds(30)).toString(dtf));
   map.put("Key_B", now.minus(Duration.standardSeconds(30)).toString());

   return map;

 }
}

这篇关于创建 PCollectionView<Map<String,String>> 时如何解决重复值异常的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆