如何反序列化通过debezium CDC机制从kafka经纪人收到的BigDecimal值? [英] How to deserialize BigDecimal value received from kafka broker through debezium CDC mechanism?

查看:152
本文介绍了如何反序列化通过debezium CDC机制从kafka经纪人收到的BigDecimal值?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我有两个使用Spring Boot开发的微服务,每个微服务都有自己的Postgres数据库。这些微服务通过dekazium平台通过kafka代理和kafka connect提供的CDC机制交换数据。我有一个微服务A,其中存储了一些具有BigDecimal属性的实体。另一个微服务B依赖于A所存储的数据,因此它通过kafka主题作为如下消息获取它:

  :{ id: 267e8ba0-4986-447d-8328-315c839875c3,系数: AZA =, created_at:1559950327672000, label:外部代理, updated_at:1559950327672000} 

系数属性是BigDecimal,它作为BigDecimal(4.00)存储在微服务A数据库中。 / p>

为什么4.00转换为 AZA =?是否使用 AZA =某种编码格式来保留BigDecimal精度?如何从 AZA =再次变为4.0?



要注意的是,杰克逊未能将字符串 AZA =的字符串值反序列化为BigDecimal值,但例外情况是:

  com.fasterxml.jackson.databind.exc.InvalidFormatException:无法从字符串反序列化类型为java.math.BigDecimal的值 AZA =:不是位于[来源:(String) { id: 267e8ba0-4986-447d-8328-315c839875c3的有效表示形式
,系数: AZA =, created_at :1559950327672000, label:外部代理, updated_at:1559950327672000};行:1,列:60](通过参考链:org.perfometer.performanceservice.entities.ActorTypeEntity [ coefficient])com.fasterxml.jackson.databind.exc.InvalidFormatException.from(InvalidFormatException.java :67)com.fasterxml.jackson.databind.DeserializationContext.weirdStringException(DeserializationContext.java:1549)中的
com.fasterxml.jackson.databind.DeserializationContext.handleWeirdStringValue(DeserializationContext.java:911)中的
b $ b在com.fasterxml.jackson.databind.deser.std.NumberDeserializers $ BigDecimalDeserializer.deserialize(NumberDeserializers.java:955)在com.fasterxml.jackson.databind.deser.std.NumberDeserializers $ BigDecimalDeserializer.deserialize(NumberDeserializers.java:955)
(NumberDeserializers.java:922)
在com.fasterxml.jackson.databind.deser.impl.MethodProperty.deserializeAndSet(MethodProperty.java:127)
在com.fasterxml.jackson.databind.deser.BeanDeserializer .vanillaDeserialize(BeanDeserializer.java:288)
在com.fasterxml.jackson.databind.deser.BeanDeseria lizer.deserialize(BeanDeserializer.java:151)com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:4013)
(com.fasterxml.jackson.databind.ObjectMapper.readValue( ObjectMapper.java:3004)org.perfometer.performanceservice.services.impl.ActorTypeServiceImpl.consumeActorTypeMessages(ActorTypeServiceImpl.java:123)
org.perfometer.performanceservice.services.impl.ActorTypeServiceImpl $$ FastClassBySpringCGLIB在org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:218)
处的$ 944d568c.invoke(生成的)
在org.springframework.aop.framework.CglibAopProxy $ CglibMethodInvocation .invokeJoinpoint(CglibAopProxy.java:749)
在org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163)
在org.springframework.transaction.interceptor.TransactionAspectSupport.invokeWithinTransaction(TransactionAspectSupport .java:295)org.springframework.transaction.interceptor.Trans上的
actionInterceptor.invoke(TransactionInterceptor.java:98)
在org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)
在org.springframework.aop.framework.CglibAopProxy $ DynamicAdvisedInterceptor。在org.perfometer.performanceservice.services.impl.ActorTypeServiceImpl $$ EnhancerBySpringCGLIB $$ 167173df.consumeActorTypeMessages(< generated>)
在sun.reflect.NativeMethodAccessorImpl.invoke0处拦截(CglibAopProxy.java:688)
(本地方法)
在sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
在sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
在java.lang .reflect.Method.invoke(Method.java:498)
在org.springframework.messaging.handler.invocation.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:171)
在org.springframework.messaging.handler .invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:120)
在组织。在org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:283)
处的springframework.kafka.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:48)
springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:79)
at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:50)
在org。 springframework.kafka.listener.KafkaMessageListenerContainer $ ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:1263)
at org.springframework.kafka.listener.KafkaMessageListenerContainer $ ListenerConsumer.invokeOnMessage(KafkaMessageListenerContainer $ b $ 1) springorgwork.kafka.listener.KafkaMessageListenerContainer $ ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:1217)
位于org.springframework.kafka。 listener.KafkaMessageListenerContainer $ ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:1198)
at org.springframework.kafka.listener.KafkaMessageListenerContainer $ ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:1118)
work org.springframework.kafka.listener.KafkaMessageListenerContainer $ ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:749)$ b $ ka.org.org上的listener.KafkaMessageListenerContainer $ ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:933)
。 listener.KafkaMessageListenerContainer $ ListenerConsumer.run(KafkaMessageListenerContainer.java:698)
在java.util.concurrent.Executors $ RunnableAdapter.call(Executors.java:511)
在java.util.concurrent.FutureTask。在java.lang.Thread.run上运行(FutureTask.java:266)
(Thread.java:748)

任何提示或任何帮助将不胜感激!谢谢!

解决方案

这是Java的解决方案- https://debezium.io/docs/faq/#how_to_retrieve_decimal_field_from_binary_representation



也请检查 decimal.handling.mode 选项,以其他方式可以将 BigDecimal 编码为邮件。


I have a couple of microservices developed using spring boot and each has its own Postgres database. These microservices exchange data with a CDC mechanism provided by debezium platform through kafka broker and kafka connect. I have a microservice A that stores some entities with a BigDecimal attribute. Another microservice B depends on the data stored by A so it gets it through kafka topics as a message like the following:

"after":{"id":"267e8ba0-4986-447d-8328-315c839875c3","coefficient":"AZA=","created_at":1559950327672000,"label":"External Agent","updated_at":1559950327672000}

The coefficient attribute is a BigDecimal and it is stored in microservice A database as a BigDecimal (4.00).

How come 4.00 is converted to "AZA="? Is "AZA=" some encoding format to preserve the BigDecimal precision? How to go from "AZA=" to 4.0 again?

To note that jackson fails to deserialize the string value of "AZA=" to a BigDecimal value with the exception:

com.fasterxml.jackson.databind.exc.InvalidFormatException: Cannot deserialize value of type `java.math.BigDecimal` from String "AZA=": not a valid representation
at [Source: (String)"{"id":"267e8ba0-4986-447d-8328-315c839875c3","coefficient":"AZA=","created_at":1559950327672000,"label":"External Agent","updated_at":1559950327672000}"; line: 1, column: 60] (through reference chain: org.perfometer.performanceservice.entities.ActorTypeEntity["coefficient"])
at com.fasterxml.jackson.databind.exc.InvalidFormatException.from(InvalidFormatException.java:67)
at com.fasterxml.jackson.databind.DeserializationContext.weirdStringException(DeserializationContext.java:1549)
at com.fasterxml.jackson.databind.DeserializationContext.handleWeirdStringValue(DeserializationContext.java:911)
at com.fasterxml.jackson.databind.deser.std.NumberDeserializers$BigDecimalDeserializer.deserialize(NumberDeserializers.java:955)
at com.fasterxml.jackson.databind.deser.std.NumberDeserializers$BigDecimalDeserializer.deserialize(NumberDeserializers.java:922)
at com.fasterxml.jackson.databind.deser.impl.MethodProperty.deserializeAndSet(MethodProperty.java:127)
at com.fasterxml.jackson.databind.deser.BeanDeserializer.vanillaDeserialize(BeanDeserializer.java:288)
at com.fasterxml.jackson.databind.deser.BeanDeserializer.deserialize(BeanDeserializer.java:151)
at com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:4013)
at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3004)
at org.perfometer.performanceservice.services.impl.ActorTypeServiceImpl.consumeActorTypeMessages(ActorTypeServiceImpl.java:123)
at org.perfometer.performanceservice.services.impl.ActorTypeServiceImpl$$FastClassBySpringCGLIB$$944d568c.invoke(<generated>)
at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:218)
at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.invokeJoinpoint(CglibAopProxy.java:749)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163)
at org.springframework.transaction.interceptor.TransactionAspectSupport.invokeWithinTransaction(TransactionAspectSupport.java:295)
at org.springframework.transaction.interceptor.TransactionInterceptor.invoke(TransactionInterceptor.java:98)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)
at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:688)
at org.perfometer.performanceservice.services.impl.ActorTypeServiceImpl$$EnhancerBySpringCGLIB$$167173df.consumeActorTypeMessages(<generated>)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:171)
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:120)
at org.springframework.kafka.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:48)
at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:283)
at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:79)
at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:50)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:1263)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeOnMessage(KafkaMessageListenerContainer.java:1256)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:1217)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:1198)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:1118)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:933)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:749)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:698)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.lang.Thread.run(Thread.java:748)

Any hint or any help will be appreciated! thanks!

解决方案

this is solution for Java - https://debezium.io/docs/faq/#how_to_retrieve_decimal_field_from_binary_representation

Also please check decimal.handling.mode option for other ways how BigDecimal can be encoded into the message.

这篇关于如何反序列化通过debezium CDC机制从kafka经纪人收到的BigDecimal值?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆