在Qpid Broker中创建Exchange和队列 [英] Create Exchange and queues in Qpid broker

查看:109
本文介绍了在Qpid Broker中创建Exchange和队列的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!

问题描述

我正在使用嵌入式代理Qpid测试Spring集成项目.但是问题是我如何在qpid中进行队列和交换.我以为Rabbit-config.xml会在qpid代理中进行队列和交换,但无济于事.我的流程是创建队列,并在qpid代理中进行交换,将消息传递给它们,绑定到这些队列的入站amqp适配器将获得消息,我可以继续进行测试

Hi I am testing spring integration project using an embedded broker Qpid. But the problem is that HOW CAN i make queues and exchanges in qpid. I thought that rabbit-config.xml would make the queues and exchanges in qpid broker but to no avail. My flow is create queues and exchanges in qpid broker pass messages to them and inbound amqp adapters bounded to these queues would get messages and i can proceed with the test

错误:队列:在VirtualHost默认"上找不到"push.customer.arkona.controller.search".

Error : Queue: 'push.customer.arkona.controller.search' not found on VirtualHost 'default'.

    qpid-config.json:

    {   "name": "EmbeddedBroker",   "modelVersion": "2.0",   "storeVersion" : 1,   "authenticationproviders" : [ {
        "name" : "noPassword",
        "type" : "Anonymous",
        "secureOnlyMechanisms": []
            },
        {
          "name" : "passwordFile",
          "type" : "PlainPasswordFile",
          "path" : "${qpid.home_dir}${file.separator}src${file.separator}main${file.separator}resources${file.separator}password.properties",
          "secureOnlyMechanisms": []
        }    ],   "ports" : [
        {
          "name": "AMQP",
          "port": "${qpid.amqp_port}",
          "authenticationProvider": "passwordFile",
          "protocols": [
            "AMQP_0_10",
            "AMQP_0_8",
            "AMQP_0_9",
            "AMQP_0_9_1"
          ]
        }],
        "virtualhostnodes" : [ {
        "name" : "default",
        "type" : "JSON",
        "defaultVirtualHostNode" : "true",
        "virtualHostInitialConfiguration" : "${qpid.initial_config_virtualhost_config}",
         "storeType" : "DERBY"   
} 
] 
}

password.properties具有

password.properties has

guest:guest

guest:guest

我已经创建了用于运行测试的单独配置文件.这是rabbitmq配置.除此之外,我还有一个兔子上下文的xml文件,其中定义了所有队列,交换.

I have created a separate profile for running my tests. this is the rabbitmq configuration. apart from this i have a rabbit-context xml file where all the queues, exchanges are defined.

@Configuration
@Profile("qpid")
public class QpidConfig {

    String amqpPort = "5672";

    //String qpidHomeDir = "complete";
    String configFileName = "src/main/resources/qpid-config.json";

    @Bean
    BrokerOptions brokerOptions() {

        File tmpFolder= Files.createTempDir();

        //small hack, because userDir is not same when running Application and ApplicationTest
        //it leads to some issue locating the files after, so hacking it here
        String userDir=System.getProperty("user.dir").toString();

        File file = new File(userDir);
        String homePath = file.getAbsolutePath();

        BrokerOptions brokerOptions=new BrokerOptions();

        brokerOptions.setConfigProperty("qpid.work_dir", tmpFolder.getAbsolutePath());
        brokerOptions.setConfigProperty("qpid.amqp_port",amqpPort);
        brokerOptions.setConfigProperty("qpid.home_dir", homePath);
        brokerOptions.setInitialConfigurationLocation(homePath + "/"+configFileName);

        return brokerOptions;
    }

    @SuppressWarnings("rawtypes")
    @Bean
    Broker broker() throws Exception {

            org.apache.qpid.server.Broker broker = new org.apache.qpid.server.Broker();
        broker.startup(brokerOptions());
        return (Broker) broker;
    }

    private ConnectionFactory connectionFactory() {
        ConnectionFactory factory = new ConnectionFactory();

        factory.setUsername("admin");
        factory.setPassword("admin");
        factory.setHost("127.0.0.1");
        factory.setPort(5672);
        return factory;
    }

    @Bean(name ="rabbitConnectionFactory")
    public CachingConnectionFactory rabbitConnectionFactory(){
        return new CachingConnectionFactory(connectionFactory());
    }

    @Bean(name="rabbitTemplate")
    public RabbitTemplate rabbitTemplate(){
        return new RabbitTemplate(rabbitConnectionFactory());
    }

    @Bean(name ="arkonaHeaderMapper")
    public DefaultAmqpHeaderMapper syncerHeaderMapper() {
        DefaultAmqpHeaderMapper amqpHeaderMapper = DefaultAmqpHeaderMapper.inboundMapper();
        amqpHeaderMapper.setRequestHeaderNames("*");
        amqpHeaderMapper.setReplyHeaderNames("*");
        return amqpHeaderMapper;
    }


}

编辑

MY Rabbit-context.xml

MY rabbit-context.xml

    <?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:rabbit="http://www.springframework.org/schema/rabbit"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
    http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">

    <rabbit:queue name="pull.appt.arkona.scheduler.adapter" />
    <rabbit:queue name="pull.appt.arkona.adapter.processor" />

    <rabbit:queue name="pull.customer.arkona.to.lookup" />
    <rabbit:queue name="pull.customer.arkona.lookup.processor" />

    <rabbit:queue name="pull.customer.arkona.scheduler.adapter" />

    <rabbit:queue name="pull.ro.arkona.to.lookup" />
    <rabbit:queue name="pull.ro.arkona.adapter.processor" />

    <rabbit:queue name="pull.ro.arkona.scheduler.adapter" />

    <rabbit:queue name="pull.closed.arkona.scheduler.adapter" />
    <rabbit:queue name="pull.parts.arkona.scheduler.adapter" />

    <rabbit:queue name="pull.closed.arkona.adapter.processor" />
    <rabbit:queue name="pull.parts.arkona.adapter.processor" />

    <rabbit:queue name="pull.vehicle.arkona.to.lookup" />
    <rabbit:queue name="pull.vehicle.arkona.lookup.processor" />

    <rabbit:direct-exchange name="dms.arkona.exchange" durable="true">
        <rabbit:bindings>
            <rabbit:binding queue="pull.appt.arkona.scheduler.adapter" key="pull.appt.arkona.scheduler.adapter.key"></rabbit:binding>
            <rabbit:binding queue="pull.appt.arkona.adapter.processor" key="pull.appt.arkona.adapter.processor.key"></rabbit:binding>
            <rabbit:binding queue="pull.customer.arkona.to.lookup" key="pull.customer.arkona.to.lookup.key"></rabbit:binding>
            <rabbit:binding queue="pull.customer.arkona.lookup.processor" key="pull.customer.arkona.lookup.processor.key"></rabbit:binding>
            <rabbit:binding queue="pull.customer.arkona.scheduler.adapter" key="pull.customer.arkona.scheduler.adapter.key"></rabbit:binding>
            <rabbit:binding queue="pull.ro.arkona.to.lookup" key="pull.ro.arkona.to.lookup.key"></rabbit:binding>
            <rabbit:binding queue="pull.ro.arkona.adapter.processor" key="pull.ro.arkona.adapter.processor.key"></rabbit:binding>
            <rabbit:binding queue="pull.ro.arkona.scheduler.adapter" key="pull.ro.arkona.scheduler.adapter.key"></rabbit:binding>
            <rabbit:binding queue="pull.vehicle.arkona.to.lookup" key="pull.vehicle.arkona.to.lookup.key"></rabbit:binding>
            <rabbit:binding queue="pull.vehicle.arkona.lookup.processor" key="pull.vehicle.arkona.lookup.processor.key"></rabbit:binding>
            <rabbit:binding queue="pull.closed.arkona.scheduler.adapter" key="pull.closed.arkona.scheduler.adapter.key"></rabbit:binding>
            <rabbit:binding queue="pull.closed.arkona.adapter.processor" key="pull.closed.arkona.adapter.processor.key"></rabbit:binding>
            <rabbit:binding queue="pull.parts.arkona.scheduler.adapter" key="pull.parts.arkona.scheduler.adapter.key"></rabbit:binding>
            <rabbit:binding queue="pull.parts.arkona.adapter.processor" key="pull.parts.arkona.adapter.processor.key"></rabbit:binding>
        </rabbit:bindings>
    </rabbit:direct-exchange>
</beans>

推荐答案

您的应用程序上下文中是否有RabbitAdmin? (它检测队列/交换/绑定,并在建立连接时声明它们).

Do you have a RabbitAdmin in your application context? (It detects the queues/exchanges/bindings and declares them when the connection is established).

我刚刚测试了 Spring Integration AMQP示例使用QPID 6.1.2,它可以正常创建所有内容...

I just tested the Spring Integration AMQP Sample with QPID 6.1.2 and it created everything ok...

<!-- Infrastructure -->

<rabbit:connection-factory id="connectionFactory" host="xx.xx.xx.xx" virtual-host="default" />

<rabbit:template id="amqpTemplate" connection-factory="connectionFactory" />

<rabbit:admin connection-factory="connectionFactory" />

<rabbit:queue name="si.test.queue" />

<rabbit:direct-exchange name="si.test.exchange">
    <rabbit:bindings>
        <rabbit:binding queue="si.test.queue" key="si.test.binding" />
    </rabbit:bindings>
</rabbit:direct-exchange>

编辑

启动应用程序对我来说也很好...

Boot app works fine for me too...

@SpringBootApplication
public class So50364236Application {

    public static void main(String[] args) {
        SpringApplication.run(So50364236Application.class, args);
    }

    @Bean
    public ApplicationRunner runner(RabbitTemplate template) {
        return args -> template.convertAndSend("so50364236", "foo");
    }

    @Bean
    public Queue queue() {
        return new Queue("so50364236");
    }

    @RabbitListener(queues = "so50364236")
    public void listen(String in) {
        System.out.println(in);
    }

}

spring.rabbitmq.addresses=xx.x.x.x
spring.rabbitmq.virtual-host=default

2018-05-16 13:17:25.013  INFO 34714 --- [           main] com.example.So50364236Application        : Started So50364236Application in 1.151 seconds (JVM running for 1.579)
foo

我在经纪人的管理页面上看到了队列.

And I see the queue on the broker's admin page.

EDIT2

这是另一个启动应用程序,其中队列在XML文件中声明;使用嵌入的QPID 6.1.6 ...

Here's another boot app where the queue is declared in an XML file; using QPID 6.1.6 embedded...

qpid-config.json

{
    "name": "EmbeddedBroker",
    "modelVersion": "2.0",
    "storeVersion": 1,
    "authenticationproviders": [
        {
            "name": "noPassword",
            "type": "Anonymous",
            "secureOnlyMechanisms": []
        },
        {
            "name": "passwordFile",
            "type": "PlainPasswordFile",
            "path": "${qpid.home_dir}${file.separator}etc${file.separator}passwd",
            "secureOnlyMechanisms": []
        }
    ],
    "ports": [
        {
            "name": "AMQP",
            "port": "${qpid.amqp_port}",
            "authenticationProvider": "passwordFile",
            "protocols": [
                "AMQP_0_10",
                "AMQP_0_8",
                "AMQP_0_9",
                "AMQP_0_9_1"
            ]
        }
    ],
    "virtualhostnodes": [
        {
            "name": "default",
            "type": "JSON",
            "defaultVirtualHostNode": "true",
            "virtualHostInitialConfiguration": "${qpid.initial_config_virtualhost_config}",
            "storeType": "DERBY"
        }
    ]
}

config.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:rabbit="http://www.springframework.org/schema/rabbit"
    xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
        http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">

    <rabbit:queue name="so50364236b" />

</beans>

application.properties

spring.rabbitmq.addresses=localhost:8888

启动应用程序

@SpringBootApplication
@ImportResource("config.xml")
public class So50364236Application {

    public static void main(String[] args) {
        new SpringApplicationBuilder(So50364236Application.class)
            .web(WebApplicationType.NONE)
            .run(args);
    }

    @Bean
    public ApplicationRunner runner(RabbitTemplate template) {
        return args -> template.convertAndSend("so50364236b", "foo");
    }

    @Bean
    BrokerOptions brokerOptions() throws Exception {

        Path tmpFolder = Files.createTempDirectory("qpidWork");
        Path homeFolder = Files.createTempDirectory("qpidHome");
        File etc = new File(homeFolder.toFile(), "etc");
        etc.mkdir();
        FileOutputStream fos = new FileOutputStream(new File(etc, "passwd"));
        fos.write("guest:guest\n".getBytes());
        fos.close();

        BrokerOptions brokerOptions = new BrokerOptions();

        brokerOptions.setConfigProperty("qpid.work_dir", tmpFolder.toAbsolutePath().toString());
        brokerOptions.setConfigProperty("qpid.amqp_port", "8888");
        brokerOptions.setConfigProperty("qpid.home_dir", homeFolder.toAbsolutePath().toString());
        Resource config = new ClassPathResource("qpid-config.json");
        brokerOptions.setInitialConfigurationLocation(config.getFile().getAbsolutePath());

        return brokerOptions;
    }

    @Bean
    Broker broker() throws Exception {
        org.apache.qpid.server.Broker broker = new org.apache.qpid.server.Broker();
        broker.startup(brokerOptions());
        return broker;
    }

    @RabbitListener(queues = "so50364236b")
    public void listen(String in) {
        System.out.println(in);
    }

}

[Broker] BRK-1004 : Qpid Broker Ready
received: foo

也许您正在执行某些操作,导致未声明引导程序的Admin.目前尚不清楚为什么要添加自己的连接工厂和模板.您是否也尝试过添加自己的RabbitAdmin?

Perhaps you are doing something that causes boot's Admin to be not declared. It's not clear why you are adding your own connection factory and template; have you tried adding your own RabbitAdmin too?

这篇关于在Qpid Broker中创建Exchange和队列的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持IT屋!

查看全文
登录 关闭
扫码关注1秒登录
发送“验证码”获取 | 15天全站免登陆