如何对基于 KinesisRecord 的 DoFn 进行单元测试? [英] How to unit test a KinesisRecord-based DoFn?

查看:16
本文介绍了如何对基于 KinesisRecord 的 DoFn 进行单元测试?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在着手一个从 AWS Kinesis 读取数据的 Beam 项目,所以我有一个简单的 DoFn,它接受 KinesisRecord 并记录内容.我想编写一个单元测试来运行这个 DoFn 并证明它有效.不过,事实证明,使用 KinesisRecord 进行单元测试具有挑战性.

I’m getting started on a Beam project that reads from AWS Kinesis, so I have a simple DoFn that accepts a KinesisRecord and logs the contents. I want to write a unit test to run this DoFn and prove that it works. Unit testing with a KinesisRecord has proven to be challenging, though.

当我尝试仅使用 Create.of(testKinesisRecord) 时出现此错误:

I get this error when I try to just use Create.of(testKinesisRecord):

java.lang.IllegalArgumentException: Unable to infer a coder and no Coder was specified. Please set a coder by invoking Create.withCoder() explicitly  or a schema by invoking Create.withSchema().

我曾尝试按照错误提示使用withCoder"显式提供 KinesisRecordCoder,但它是一个私有类.也许还有另一种方法可以对 DoFn 进行单元测试?

I have tried providing the KinesisRecordCoder explicitly using "withCoder" as the error suggests, but it’s a private class. Perhaps there's another way to unit test a DoFn?

测试代码:

public class MyProjectTests {
    @Rule
    public TestPipeline p = TestPipeline.create();

    @Test
    public void testPoC() {
        var testKinesisRecord = new KinesisRecord(
                ByteBuffer.wrap("SomeData".getBytes()),
                "seq01",
                12,
                "pKey",
                Instant.now().minus(Duration.standardHours(4)),
                Instant.now(),
                "MyStream",
                "shard-001"
        );


        PCollection<Void> output =
                p.apply(Create.of(testKinesisRecord))
                        .apply(ParDo.of(new MyProject.PrintRecordFn()));

        var result = p.run();
        result.waitUntilFinish();
        result.metrics().allMetrics().getCounters().forEach(longMetricResult -> {
            Assertions.assertEquals(1, longMetricResult.getCommitted().intValue());
        });
    }
}

DoFn 代码:

  static class PrintRecordFn extends DoFn<KinesisRecord, Void> {
    private static final Logger LOG = LoggerFactory.getLogger(PrintRecordFn.class);
    private final Counter items = Metrics.counter(PrintRecordFn.class, "itemsProcessed");

    @ProcessElement
    public void processElement(@Element KinesisRecord element) {
      items.inc();

      LOG.info("Stream: `{}` Shard: `{}` Arrived at `{}`\nData: {}",
              element.getStreamName(),
              element.getShardId(),
              element.getApproximateArrivalTimestamp(),
              element.getDataAsBytes());
    }
  }

推荐答案

KinesisRecordCoder 应该用于内部用途,因此将其设为包私有.同时,您可以提供自定义的 AWSClientsProvider 并使用它来生成测试数据.例如,请查看 KinesisMockReadTest 和自定义 提供者

KinesisRecordCoder is supposed to be used for internal purposes, so it is made package private. In the same time, you can provide custom AWSClientsProvider and use it to generate test data. As an example, please, take a look on KinesisMockReadTest and custom Provider

这篇关于如何对基于 KinesisRecord 的 DoFn 进行单元测试?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆