使用kafka流绑定器测试Spring云流:使用TopologyTestDriver,我收到“类不在受信任包中"的错误. [英] Testing Spring cloud stream with kafka stream binder: using TopologyTestDriver I get the error of "The class is not in the trusted packages"

查看:125
本文介绍了使用kafka流绑定器测试Spring云流:使用TopologyTestDriver,我收到“类不在受信任包中"的错误.的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有一个使用kafka stream活页夹的简单流处理器(不是消费者/生产者).

I have this simple stream processor (not a consumer/producer) using kafka streams binder.

@Bean
fun processFoo():Function<KStream<FooName, FooAddress>, KStream<FooName, FooAddressPlus>> {
    return Function { input-> input.map { key, value ->
        println("\nPAYLOAD KEY: ${key.name}\n");
        println("\nPAYLOAD value: ${value.address}\n");
        val output = FooAddressPlus()
        output.address = value.address
        output.name = value.name
        output.plus = "$value.name-$value.address"
        KeyValue(key, output)
    }}
}

我正在尝试使用TopologyTestDriver对其进行测试:

I'm trying to test it using the TopologyTestDriver:

@SpringBootTest(
        webEnvironment = SpringBootTest.WebEnvironment.NONE,
        classes = [Application::class, FooProcessor::class]
)
class FooProcessorTests {
    var testDriver: TopologyTestDriver? = null
    val INPUT_TOPIC = "input"
    val OUTPUT_TOPIC = "output"

    val inputKeySerde: Serde<FooName> = JsonSerde<FooName>()
    val inputValueSerde: Serde<FooAddress> = JsonSerde<FooAddress>()
    val outputKeySerde: Serde<FooName> = JsonSerde<FooName>()
    val outputValueSerde: Serde<FooAddressPlus> = JsonSerde<FooAddressPlus>()

    fun getStreamsConfiguration(): Properties? {
        val streamsConfiguration = Properties()
        streamsConfiguration[StreamsConfig.APPLICATION_ID_CONFIG] = "TopologyTestDriver"
        streamsConfiguration[StreamsConfig.BOOTSTRAP_SERVERS_CONFIG] = "dummy:1234"
        streamsConfiguration[JsonDeserializer.TRUSTED_PACKAGES] = "*"
        streamsConfiguration["spring.kafka.consumer.properties.spring.json.trusted.packages"] = "*"
        return streamsConfiguration
    }

    @Before
    fun setup() {
        val builder = StreamsBuilder()
        val input: KStream<FooName, FooAddress> = builder.stream(INPUT_TOPIC, Consumed.with(inputKeySerde, inputValueSerde))
        val processor = FooProcessor()
        val output: KStream<FooName, FooAddressPlus> = processor.processFoo().apply(input)
        output.to(OUTPUT_TOPIC, Produced.with(outputKeySerde, outputValueSerde))
        testDriver = TopologyTestDriver(builder.build(), getStreamsConfiguration())
    }

    @After
    fun tearDown() {
        try {
            testDriver!!.close()
        } catch (e: RuntimeException) {
            // https://issues.apache.org/jira/browse/KAFKA-6647 causes exception when executed in Windows, ignoring it
            // Logged stacktrace cannot be avoided
            println("Ignoring exception, test failing in Windows due this exception:" + e.localizedMessage)
        }
    }

    @org.junit.Test
    fun testOne() {
        val inputTopic: TestInputTopic<FooName, FooAddress> =
                testDriver!!.createInputTopic(INPUT_TOPIC, inputKeySerde.serializer(), inputValueSerde.serializer())
        val key = FooName()
        key.name = "sherlock"
        val value = FooAddress()
        value.name = "sherlock"
        value.address = "Baker street"
        inputTopic.pipeInput(key, value)
        val outputTopic: TestOutputTopic<FooName, FooAddressPlus> =
                testDriver!!.createOutputTopic(OUTPUT_TOPIC, outputKeySerde.deserializer(), outputValueSerde.deserializer())
        val message = outputTopic.readValue()

        assertThat(message.name).isEqualTo(key.name)
        assertThat(message.address).isEqualTo(value.address)
    }
}

运行它时,在第inputTopic.pipeInput(key, value)

类' package .FooAddress'不在受信任的软件包中:[java.util,java.lang].如果您认为该类别可以安全地反序列化,请提供其名称.如果仅由可信源进行序列化,则还可以启用全部信任().*

The class 'package.FooAddress' is not in the trusted packages: [java.util, java.lang]. If you believe this class is safe to deserialize, please provide its name. If the serialization is only done by a trusted source, you can also enable trust all ().*

关于如何解决此问题的任何想法?在getStreamsConfiguration()中设置这些属性无济于事.请注意,这是一个流处理器,而不是消费者/生产者.

Any ideas on how to solve this? Setting those properties in getStreamsConfiguration() is not helping. Please note that this is a stream processor, not a consumer/producer.

非常感谢!

推荐答案

当Kafka自己创建Serde时,它将通过调用configure()来应用属性.

When Kafka creates the Serde itself, it applies the properties by calling configure().

由于您自己实例化Serde,因此需要在其上调用configure()并将其传递给属性映射.

Since you are instantiating the Serde yourself, you need to call configure() on it passing in the map of properties.

这就是可信包属性如何传播到反序列化器的方式.

That's how the trusted packages property gets propagated to the deserializer.

或者,您可以在反序列化器上调用setTrustedPackages().

Or, you can call setTrustedPackages() on the deserializer.

这篇关于使用kafka流绑定器测试Spring云流:使用TopologyTestDriver,我收到“类不在受信任包中"的错误.的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆