加入2个Kafka流时出现问题(使用自定义的timestampextractor) [英] Problems joining 2 kafka streams (using custom timestampextractor)

查看:89
本文介绍了加入2个Kafka流时出现问题(使用自定义的timestampextractor)的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我在加入2个kafka流时遇到问题,这些流从事件字段中提取日期.当我没有定义自定义TimeStampExtractor时,联接工作正常,但是当我执行联接时,联接不再起作用.我的拓扑非常简单:

I'm having problems joining 2 kafka streams extracting the date from the fields of my event. The join is working fine when I do not define a custom TimeStampExtractor but when I do the join does not work anymore. My topology is quite simple:

val builder = new StreamsBuilder()

val couponConsumedWith = Consumed.`with`(Serdes.String(),
  getAvroCouponSerde(schemaRegistryHost, schemaRegistryPort))
val couponStream: KStream[String, Coupon] = builder.stream(couponInputTopic, couponConsumedWith)

val purchaseConsumedWith = Consumed.`with`(Serdes.String(),
  getAvroPurchaseSerde(schemaRegistryHost, schemaRegistryPort))
val purchaseStream: KStream[String, Purchase] = builder.stream(purchaseInputTopic, purchaseConsumedWith)

val couponStreamKeyedByProductId: KStream[String, Coupon] = couponStream.selectKey(couponProductIdValueMapper)
val purchaseStreamKeyedByProductId: KStream[String, Purchase] = purchaseStream.selectKey(purchaseProductIdValueMapper)

val couponPurchaseValueJoiner = new ValueJoiner[Coupon, Purchase, Purchase]() {

  @Override
  def apply(coupon: Coupon, purchase: Purchase): Purchase = {
      val discount = (purchase.getAmount * coupon.getDiscount) / 100
      new Purchase(purchase.getTimestamp, purchase.getProductid, purchase.getProductdescription, purchase.getAmount - discount)
  }
}

val fiveMinuteWindow = JoinWindows.of(TimeUnit.MINUTES.toMillis(10))
val outputStream: KStream[String, Purchase] = couponStreamKeyedByProductId.join(purchaseStreamKeyedByProductId,
  couponPurchaseValueJoiner,
  fiveMinuteWindow
  )

outputStream.to(outputTopic)

builder.build()

正如我说的那样,当我不使用自定义TimeStampExtractor时,但当我通过将StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG设置为自定义提取器类来执行此代码时,它就像是一个魅力(我已经仔细检查了该类是否正确提取了日期)联接不再起作用.

As I said this code works like a charm when I do not use a custom TimeStampExtractor but when I do by setting the StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG to my custom extractor class (I've double checked that the class is extracting the date properly) the join does not work anymore.

我正在通过运行单元测试并将以下事件传递给拓扑来测试拓扑:

I'm testing the topology by running a unit test and passing the following events to it:

    val coupon1 = new Coupon("Dec 05 2018 09:10:00.000 UTC", "1234", 10F)
    // Purchase within the five minutes after the coupon - The discount should be applied
    val purchase1 = new Purchase("Dec 05 2018 09:12:00.000 UTC", "1234", "Green Glass", 25.00F)
    val purchase1WithDiscount = new Purchase("Dec 05 2018 09:12:00.000 UTC", "1234", "Green Glass", 22.50F)
    val couponRecordFactory1 = couponRecordFactory.create(couponInputTopic, "c1", coupon1)
    val purchaseRecordFactory1 = purchaseRecordFactory.create(purchaseInputTopic, "p1", purchase1)

    testDriver.pipeInput(couponRecordFactory1)
    testDriver.pipeInput(purchaseRecordFactory1)
    val outputRecord1 = testDriver.readOutput(outputTopic,
      new StringDeserializer(),
      JoinTopologyBuilder.getAvroPurchaseSerde(
        schemaRegistryHost,
        schemaRegistryPort).deserializer())
    OutputVerifier.compareKeyValue(outputRecord1, "1234", purchase1WithDiscount)

不确定选择新密钥的步骤是否已删除正确的日期.我已经测试了很多没有运气的组合:(

Not sure if the step of selecting a new key is getting rid of the proper date. I have tested a lot of combinations with no luck :(

任何帮助将不胜感激!

推荐答案

我不确定,因为我不知道您将对代码进行多少测试,但是我的猜测是:

I'm not sure of that because I don't know how much you test your code, but my guess will be that :

1)您的代码可以使用默认的时间戳提取器,因为它使用的是将记录发送到管道中的时间作为时间戳记录,因此基本上可以使用,因为在您的测试中,您要一次又一次地发送数据而不会暂停.

1) your code work with the default timestamp extractor because it's using the time when you're sending record into the pipes as timestamps records, so basically it will work because in your test you're sending data one after another without a pause.

2)您正在使用 TopologyTestDriver 进行测试!请注意,这对于以一个单元测试您的业务代码和拓扑(我有什么作为输入以及根据输出正确的是什么)非常有用,但是这些测试中没有运行Kafka Stream应用程序.

2) you are using the TopologyTestDriver to do your tests ! Note that it's very useful for testing your business code and the topology as a unit (what I have as inputs and what is the correct according outputs) but there isn't a Kafka Stream app running in thoses tests.

在您的情况下,您可以使用 TopologyTestDriver 类中的方法 advanceWallClockTime(long)来模拟系统的时间步移.

In your case you can play with the method advanceWallClockTime(long) in the TopologyTestDriver class to simulate the system time walking.

如果要启动拓扑,则必须对嵌入式kafka集群进行集成测试(kafka库中有一个可以正常工作!).

If you want to start the topology you will have to do an integration test with an embedded kafka cluster (there is one on kafka libraries that's working just fine !).

让我知道是否有帮助:-)

Let me know if that's help :-)

这篇关于加入2个Kafka流时出现问题(使用自定义的timestampextractor)的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆