如何将此弹簧集成配置从 XML 转换为 Java? [英] How do I convert this spring-integration configuration from XML to Java?

查看:29
本文介绍了如何将此弹簧集成配置从 XML 转换为 Java?的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

在应用程序中而不是在 XML 中实现这一特定部分是有意义的,因为它是整个集群中的常量,而不是本地化到单个作业.

This particular piece makes sense to implement in the application rather than XML because it is a constant across the entire cluster, not localized to a single job.

从剖析 XSD 来看,在我看来,int-kafka:outbound-channel-adapter 的 xml 构造了一个 KafkaProducerMessageHandler.

From dissecting the XSD, it looks to me like the xml for int-kafka:outbound-channel-adapter constructs a KafkaProducerMessageHandler.

没有可见的方式来设置频道、主题或大多数其他属性.

There is no visible way to set the channel, the topic, or most of the other attributes.

请注意潜在的downvoters -(咆哮)我已经进行了一周的RTFM,并且比我开始时更加困惑.我对语言的选择已经从形容词到副词演变而来,我开始从其他语言借词.答案可能就在那里.但如果是这样,那么凡人是无法找到它的.(咆哮)

Note to potential downvoters - (rant on) I have been RTFM'ing for a week and am more confused than when I started. My choice of language has graduated from adjectives through adverbs, and I'm starting to borrow words from other languages. The answer may be in there. But if it is, it is not locatable by mere mortals. (rant off)

XML 配置:

<int-kafka:outbound-channel-adapter id="kafkaOutboundChannelAdapter"
                                    kafka-template="kafkaTemplate"
                                    auto-startup="false"
                                    channel="outbound-staging"
                                    topic="foo"
                                    sync="false"
                                    message-key-expression="'bar'"
                                    send-failure-channel="failures"
                                    send-success-channel="successes"
                                    partition-id-expression="2">
</int-kafka:outbound-channel-adapter>

如果是这样,那么我希望 java 配置看起来像这样:

If so, then I would expect the java config to look something like this:

@Bean
public KafkaProducerMessageHandler kafkaOutboundChannelAdapter () {
    KafkaProducerMessageHandler result = new KafkaProducerMessageHandler(kafkaTemplate());

    result.set????? ();    // WTH?? No methods for most of the attributes?!!!

    return result;
}

有关正在解决的高级问题的其他信息

作为更大项目的一部分,我正在尝试实现 https://docs.spring.io/spring-batch/4.0.x/reference/html/spring-batch-integration.html#remote-partitioning ,使用 Kafka 支持而不是 JMS 支持.

As a part of a larger project, I am trying to implement the textbook example from https://docs.spring.io/spring-batch/4.0.x/reference/html/spring-batch-integration.html#remote-partitioning , with Kafka backing instead of JMS backing.

我相信最终的集成流程应该是这样的:

I believe the final integration flow should be something like this:

partitionHandler -> messagesTemplate -> outbound-requests (DirectChannel) -> outbound-staging (KafkaProducerMessageHandler) -> kafka

partitionHandler -> messagingTemplate -> outbound-requests (DirectChannel) -> outbound-staging (KafkaProducerMessageHandler) -> kafka

kafka -> executionContainer (KafkaMessageListenerContainer) -> inboundKafkaRequests (KafkaMessageDrivenChannelAdapter) -> inbound-requests (DirectChannel) -> serviceActivator (StepExecutionRequestHandler)

kafka -> executionContainer (KafkaMessageListenerContainer) -> inboundKafkaRequests (KafkaMessageDrivenChannelAdapter) -> inbound-requests (DirectChannel) -> serviceActivator (StepExecutionRequestHandler)

serviceActivator(StepExecutionRequestHandler)->reply-staging(KafkaProducerMessageHandler)->kafka

serviceActivator (StepExecutionRequestHandler) -> reply-staging (KafkaProducerMessageHandler) -> kafka

kafka -> replyContainer (KafkaMessageListenerContainer) -> inboundKafkaReplies (KafkaMessageDrivenChannelAdapter) -> inbound-replies (DirectChannel) -> partitionhandler

kafka -> replyContainer (KafkaMessageListenerContainer) -> inboundKafkaReplies (KafkaMessageDrivenChannelAdapter) -> inbound-replies (DirectChannel) -> partitionhandler

推荐答案

不确定你的意思是他们被遗漏了,但这是我在 KafkaProducerMessageHandler 的源代码中看到的:

Not sure what you mean that they are missed, but this is what I see in the source code of that KafkaProducerMessageHandler:

public void setTopicExpression(Expression topicExpression) {
    this.topicExpression = topicExpression;
}

public void setMessageKeyExpression(Expression messageKeyExpression) {
    this.messageKeyExpression = messageKeyExpression;
}

public void setPartitionIdExpression(Expression partitionIdExpression) {
    this.partitionIdExpression = partitionIdExpression;
}

/**
 * Specify a SpEL expression to evaluate a timestamp that will be added in the Kafka record.
 * The resulting value should be a {@link Long} type representing epoch time in milliseconds.
 * @param timestampExpression the {@link Expression} for timestamp to wait for result
 * fo send operation.
 * @since 2.3
 */
public void setTimestampExpression(Expression timestampExpression) {
    this.timestampExpression = timestampExpression;
}

等等.

您还可以访问超类 setter,例如用于您的 XML 变体的 setSync().

You also have access to the super class setters, for example a setSync() for your XML variant.

input-channel 不是 MessageHandler 的责任.它转到 Endpoint 并且可以通过 @ServiceActivator@Bean 一起配置.

The input-channel is not a MessageHandler responsibility. It goes to the Endpoint and can be confgigured via @ServiceActivator alongside with that @Bean.

在核心 Spring 集成参考手册中查看更多信息:https://docs.spring.io/spring-integration/reference/html/#annotations_on_beans

See more info in the Core Spring Integration Reference Manual: https://docs.spring.io/spring-integration/reference/html/#annotations_on_beans

开头还有很重要的一章:https://docs.spring.io/spring-integration/reference/html/#programming-tips

Also there is very important chapter in the beginning: https://docs.spring.io/spring-integration/reference/html/#programming-tips

此外,最好考虑使用 Java DSL 而不是直接的 MessageHandler 用法:

In addition it might be better to consider to use Java DSL instead of direct MessageHandler usage:

             Kafka
                .outboundChannelAdapter(producerFactory)
                .sync(true)
                .messageKey(m -> m
                        .getHeaders()
                        .get(IntegrationMessageHeaderAccessor.SEQUENCE_NUMBER))
                .headerMapper(mapper())
                .partitionId(m -> 0)
                .topicExpression("headers[kafka_topic] ?: '" + topic + "'")
                .configureKafkaTemplate(t -> t.id("kafkaTemplate:" + topic))
                .get();

在提到的 Spring Integration Docs 中查看有关 Java DSL 的更多信息:https://docs.spring.io/spring-integration/reference/html/#java-dsl

See more info about Java DSL in the mentioned Spring Integration Docs: https://docs.spring.io/spring-integration/reference/html/#java-dsl

这篇关于如何将此弹簧集成配置从 XML 转换为 Java?的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆