SpringBoot RabbitMQ配置多vhost/多RabbitMQ实例+解决Exchange/Queue在vhost之间扩散造成交换机队列重复

jupiter
2024-05-15 / 0 评论 / 18 阅读 / 正在检测是否收录...

1.添加maven依赖:

<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

2.配置文件配置RabbitMQ连接信息:

server:
  port: 8201
spring:
  application:
    name: RabbitMQ
  rabbitmq:
    vhost1:
      host: 192.168.124.10
      port: 5672
      username: user1
      password: user1
      virtual-host: /vhost1
    vhost2:
      host: 192.168.124.10
      port: 5672
      username: user2
      password: user2
      virtual-host: /vhost2

3.RabbitMQConfig配置

3.1 RabbitConstant

package com.example.rabbitmq.config;

public class RabbitConstant {

    // region vhost1 ConnectionFactory.RabbitTemplate.RabbitAdmin配置
    public static final String VHOST_1_CONNECTION_FACTORY = "vhost1ConnectionFactory";
    public static final String VHOST_1_RABBIT_LISTENER_CONTAINER_FACTORY = "vhost1RabbitListenerContainerFactory";
    public static final String VHOST_1_RABBIT_TEMPLATE = "vhost1RabbitTemplate";
    public static final String VHOST_1_RABBIT_ADMIN = "vhost1RabbitAdmin";
    // endregion

    // region vhost2 ConnectionFactory.RabbitTemplate.RabbitAdmin配置
    public static final String VHOST_2_CONNECTION_FACTORY = "vhost2ConnectionFactory";
    public static final String VHOST_2_RABBIT_LISTENER_CONTAINER_FACTORY = "vhost2RabbitListenerContainerFactory";
    public static final String VHOST_2_RABBIT_TEMPLATE = "vhost2RabbitTemplate";
    public static final String VHOST_2_RABBIT_ADMIN = "vhost2RabbitAdmin";
    // endregion

    // region vhost1 测试交换机.队列.路由配置
    public static final String VHOST_1_TEST_EXCHANGE_1 = "vhost1TestExchange1";
    public static final String VHOST_1_TEST_EXCHANGE_2 = "vhost1TestExchange2";
    public static final String VHOST_1_TEST_EXCHANGE_1_QUEUE_1 = "vhost1TestExchange1Queue1";
    public static final String VHOST_1_TEST_EXCHANGE_1_ROUTING_KEY_1 = "vhost1TestExchange1RoutingKey1";
    public static final String VHOST_1_TEST_EXCHANGE_2_QUEUE_1 = "vhost1TestExchange2Queue1";
    public static final String VHOST_1_TEST_EXCHANGE_2_ROUTING_KEY_1 = "vhost1TestExchange2RoutingKey1";
    // endregion

    // region vhost2 测试交换机.队列.路由配置
    public static final String VHOST_2_TEST_EXCHANGE_1 = "vhost2TestExchange1";
    public static final String VHOST_2_TEST_EXCHANGE_2 = "vhost2TestExchange2";
    public static final String VHOST_2_TEST_EXCHANGE_1_QUEUE_1 = "vhost2TestExchange1Queue1";
    public static final String VHOST_2_TEST_EXCHANGE_1_ROUTING_KEY_1 = "vhost2TestExchange1RoutingKey1";
    public static final String VHOST_2_TEST_EXCHANGE_2_QUEUE_1 = "vhost2TestExchange2Queue1";
    public static final String VHOST_2_TEST_EXCHANGE_2_ROUTING_KEY_1 = "vhost2TestExchange2RoutingKey1";
    // endregion

}

3.2 Vhost1RabbitMQConfig

package com.example.rabbitmq.config;

import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;


@Configuration
public class Vhost1RabbitMQConfig {
    @Value("${spring.rabbitmq.vhost1.host}")
    private String host;

    @Value("${spring.rabbitmq.vhost1.port}")
    private int port;

    @Value("${spring.rabbitmq.vhost1.username}")
    private String username;

    @Value("${spring.rabbitmq.vhost1.password}")
    private String password;

    @Value("${spring.rabbitmq.vhost1.virtual-host}")
    private String vhost;


    // 为vhost1配置ConnectionFactory
    @Bean(name = RabbitConstant.VHOST_1_CONNECTION_FACTORY)
    @Primary
    public ConnectionFactory vhost1ConnectionFactory() {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory(host, port);
        connectionFactory.setUsername(username);
        connectionFactory.setPassword(password);
        connectionFactory.setVirtualHost(vhost);
        return connectionFactory;
    }

    // 为vhost1配置SimpleRabbitListenerContainerFactory
    @Bean(name = RabbitConstant.VHOST_1_RABBIT_LISTENER_CONTAINER_FACTORY)
    @Primary
    public SimpleRabbitListenerContainerFactory vhost1RabbitListenerContainerFactory(
            @Qualifier(RabbitConstant.VHOST_1_CONNECTION_FACTORY)ConnectionFactory connectionFactory) {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        return factory;
    }

    // 为vhost1配置RabbitTemplate
    @Bean(name = RabbitConstant.VHOST_1_RABBIT_TEMPLATE)
    @Primary
    public RabbitTemplate vhost1RabbitTemplate(
            @Qualifier(RabbitConstant.VHOST_1_CONNECTION_FACTORY) ConnectionFactory connectionFactory) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        // 可以设置其他属性,如消息转换器
        return rabbitTemplate;
    }

    // 为vhost1配置RabbitAdmin
    @Bean(name = RabbitConstant.VHOST_1_RABBIT_ADMIN)
    @Primary
    public RabbitAdmin vhost1RabbitAdmin(
            @Qualifier(RabbitConstant.VHOST_1_CONNECTION_FACTORY) ConnectionFactory connectionFactory) {
        RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
        rabbitAdmin.setAutoStartup(true);
        return rabbitAdmin;
    }
}

3.3 Vhost2RabbitMQConfig

package com.example.rabbitmq.config;

import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;


@Configuration
public class Vhost2RabbitMQConfig {
    @Value("${spring.rabbitmq.vhost2.host}")
    private String host;

    @Value("${spring.rabbitmq.vhost2.port}")
    private int port;

    @Value("${spring.rabbitmq.vhost2.username}")
    private String username;

    @Value("${spring.rabbitmq.vhost2.password}")
    private String password;

    @Value("${spring.rabbitmq.vhost2.virtual-host}")
    private String vhost;


    // 为vhost2配置ConnectionFactory
    @Bean(name = RabbitConstant.VHOST_2_CONNECTION_FACTORY)
    public ConnectionFactory vhost2ConnectionFactory() {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory(host, port);
        connectionFactory.setUsername(username);
        connectionFactory.setPassword(password);
        connectionFactory.setVirtualHost(vhost);
        return connectionFactory;
    }

    // 为vhost2配置SimpleRabbitListenerContainerFactory
    @Bean(name = RabbitConstant.VHOST_2_RABBIT_LISTENER_CONTAINER_FACTORY)
    public SimpleRabbitListenerContainerFactory vhost2RabbitListenerContainerFactory(
            @Qualifier(RabbitConstant.VHOST_2_CONNECTION_FACTORY)ConnectionFactory connectionFactory) {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        return factory;
    }


    // 为vhost2配置RabbitTemplate
    @Bean(name = RabbitConstant.VHOST_2_RABBIT_TEMPLATE)
    public RabbitTemplate vhost2RabbitTemplate(
            @Qualifier(RabbitConstant.VHOST_2_CONNECTION_FACTORY) ConnectionFactory connectionFactory) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        // 可以设置其他属性,如消息转换器
        return rabbitTemplate;
    }

    // 为vhost2配置RabbitAdmin
    @Bean(name = RabbitConstant.VHOST_2_RABBIT_ADMIN)
    public RabbitAdmin vhost2RabbitAdmin(
            @Qualifier(RabbitConstant.VHOST_2_CONNECTION_FACTORY) ConnectionFactory connectionFactory) {
        RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
        rabbitAdmin.setAutoStartup(true);
        return rabbitAdmin;
    }
}

4.消费者

其中 @QueueBinding下的admins=的配置是交换机和队列数据不会在vhost之间扩散的必要条件

4.1 Vhost1Consumer

package com.example.rabbitmq.consumer;

import com.example.rabbitmq.config.RabbitConstant;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
@Slf4j
public class Vhost1Consumer {
    @RabbitListener(
            containerFactory = RabbitConstant.VHOST_1_RABBIT_LISTENER_CONTAINER_FACTORY,
            bindings = @QueueBinding(
                    value = @Queue(value = RabbitConstant.VHOST_1_TEST_EXCHANGE_1_QUEUE_1,
                            admins = RabbitConstant.VHOST_1_RABBIT_ADMIN),
                    exchange = @Exchange(value = RabbitConstant.VHOST_1_TEST_EXCHANGE_1,
                            type = ExchangeTypes.TOPIC,
                            admins = RabbitConstant.VHOST_1_RABBIT_ADMIN),
                    key = RabbitConstant.VHOST_1_TEST_EXCHANGE_1_ROUTING_KEY_1,
                    admins = RabbitConstant.VHOST_1_RABBIT_ADMIN)
    )
    public void vhost1Exchang1Queue1Consumer(String message) {
        log.info("vhost1 exchange1 queue1 consumer meaasge: {}", message);
    }

    @RabbitListener(
            containerFactory = RabbitConstant.VHOST_1_RABBIT_LISTENER_CONTAINER_FACTORY,
            bindings = @QueueBinding(
                    value = @Queue(value = RabbitConstant.VHOST_1_TEST_EXCHANGE_2_QUEUE_1,
                            admins = RabbitConstant.VHOST_1_RABBIT_ADMIN),
                    exchange = @Exchange(value = RabbitConstant.VHOST_1_TEST_EXCHANGE_2,
                            type = ExchangeTypes.TOPIC,
                            admins = RabbitConstant.VHOST_1_RABBIT_ADMIN),
                    key = RabbitConstant.VHOST_1_TEST_EXCHANGE_2_ROUTING_KEY_1,
                    admins = RabbitConstant.VHOST_1_RABBIT_ADMIN))
    public void vhost1Exchang2Queue1Consumer(String message) {
        log.info("vhost1 exchange2 queue1 consumer meaasge: {}", message);
    }
}

4.2 Vhost2Consumer

package com.example.rabbitmq.consumer;

import com.example.rabbitmq.config.RabbitConstant;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
@Slf4j
public class Vhost2Consumer {
    @RabbitListener(
            containerFactory = RabbitConstant.VHOST_2_RABBIT_LISTENER_CONTAINER_FACTORY,
            bindings = @QueueBinding(
                    value = @Queue(value = RabbitConstant.VHOST_2_TEST_EXCHANGE_1_QUEUE_1,
                            admins = RabbitConstant.VHOST_2_RABBIT_ADMIN),
                    exchange = @Exchange(value = RabbitConstant.VHOST_2_TEST_EXCHANGE_1,
                            type = ExchangeTypes.TOPIC,
                            admins = RabbitConstant.VHOST_2_RABBIT_ADMIN),
                    key = RabbitConstant.VHOST_2_TEST_EXCHANGE_1_ROUTING_KEY_1,
                    admins = RabbitConstant.VHOST_2_RABBIT_ADMIN)
    )
    public void vhost2Exchang1Queue1Consumer(String message) {
        log.info("vhost2 exchange1 queue1 consumer meaasge: {}", message);
    }

    @RabbitListener(
            containerFactory = RabbitConstant.VHOST_2_RABBIT_LISTENER_CONTAINER_FACTORY,
            bindings = @QueueBinding(
                    value = @Queue(value = RabbitConstant.VHOST_2_TEST_EXCHANGE_2_QUEUE_1,
                            admins = RabbitConstant.VHOST_2_RABBIT_ADMIN),
                    exchange = @Exchange(value = RabbitConstant.VHOST_2_TEST_EXCHANGE_2,
                            type = ExchangeTypes.TOPIC,
                            admins = RabbitConstant.VHOST_2_RABBIT_ADMIN),
                    key = RabbitConstant.VHOST_2_TEST_EXCHANGE_2_ROUTING_KEY_1,
                    admins = RabbitConstant.VHOST_2_RABBIT_ADMIN))
    public void vhost2Exchang2Queue1Consumer(String message) {
        log.info("vhost2 exchange2 queue1 consumer meaasge: {}", message);
    }
}

5.生产者ProducerController

package com.example.rabbitmq.producer;

import com.example.rabbitmq.config.RabbitConstant;
import jakarta.annotation.Resource;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
@RequestMapping("/")
public class ProducerController {

    @Resource(name = RabbitConstant.VHOST_1_RABBIT_TEMPLATE)
    RabbitTemplate vhost1RabbitTemplate;

    @Resource(name = RabbitConstant.VHOST_2_RABBIT_TEMPLATE)
    RabbitTemplate vhost2RabbitTemplate;


    @GetMapping("/")
    public String index() {
        return "rabbitmq study index";
    }

    @GetMapping("/produceVhost1Exchange1Message")
    public String produceVhost1Exchange1Message() {
        String content1 = "vhost1 exchange1 message " + System.currentTimeMillis();
        vhost1RabbitTemplate.convertAndSend(
                RabbitConstant.VHOST_1_TEST_EXCHANGE_1,
                RabbitConstant.VHOST_1_TEST_EXCHANGE_1_ROUTING_KEY_1,
                content1);
        return content1 + " send success";
    }

    @GetMapping("/produceVhost1Exchange2Message")
    public String produceVhost1Exchange2Message() {
        String content1 = "vhost1 exchange2 message " + System.currentTimeMillis();
        vhost1RabbitTemplate.convertAndSend(
                RabbitConstant.VHOST_1_TEST_EXCHANGE_2,
                RabbitConstant.VHOST_1_TEST_EXCHANGE_2_ROUTING_KEY_1,
                content1);
        return content1 + " send success";
    }

    @GetMapping("/produceVhost2Exchange1Message")
    public String produceVhost2Exchange1Message() {
        String content1 = "vhost2 exchange1 message " + System.currentTimeMillis();
        vhost2RabbitTemplate.convertAndSend(
                RabbitConstant.VHOST_2_TEST_EXCHANGE_1,
                RabbitConstant.VHOST_2_TEST_EXCHANGE_1_ROUTING_KEY_1,
                content1);
        return content1 + "send success ";
    }

    @GetMapping("/produceVhost2Exchange2Message")
    public String produceVhost2Exchange2Message() {
        String content1 = "vhost2 exchange2 message " + System.currentTimeMillis();
        vhost2RabbitTemplate.convertAndSend(
                RabbitConstant.VHOST_2_TEST_EXCHANGE_2,
                RabbitConstant.VHOST_2_TEST_EXCHANGE_2_ROUTING_KEY_1,
                content1);
        return content1 + "send success ";
    }

}

6.启动测试日志

  .   ____          _            __ _ _
 /\\ / ___'_ __ _ _(_)_ __  __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
 \\/  ___)| |_)| | | | | || (_| |  ) ) ) )
  '  |____| .__|_| |_|_| |_\__, | / / / /
 =========|_|==============|___/=/_/_/_/
 :: Spring Boot ::                (v3.2.5)

2024-05-15T23:12:32.832+08:00  INFO 20956 --- [RabbitMQ] [           main] c.example.rabbitmq.RabbitMqApplication   : Starting RabbitMqApplication using Java 17.0.8 with PID 20956 (C:\Users\vin\Desktop\WorkSpace\StudyProject\RabbitMQ\target\classes started by vin in C:\Users\vin\Desktop\WorkSpace\StudyProject)
2024-05-15T23:12:32.834+08:00  INFO 20956 --- [RabbitMQ] [           main] c.example.rabbitmq.RabbitMqApplication   : No active profile set, falling back to 1 default profile: "default"
2024-05-15T23:12:33.401+08:00  INFO 20956 --- [RabbitMQ] [           main] o.s.b.w.embedded.tomcat.TomcatWebServer  : Tomcat initialized with port 8201 (http)
2024-05-15T23:12:33.408+08:00  INFO 20956 --- [RabbitMQ] [           main] o.apache.catalina.core.StandardService   : Starting service [Tomcat]
2024-05-15T23:12:33.408+08:00  INFO 20956 --- [RabbitMQ] [           main] o.apache.catalina.core.StandardEngine    : Starting Servlet engine: [Apache Tomcat/10.1.20]
2024-05-15T23:12:33.438+08:00  INFO 20956 --- [RabbitMQ] [           main] o.a.c.c.C.[Tomcat].[localhost].[/]       : Initializing Spring embedded WebApplicationContext
2024-05-15T23:12:33.439+08:00  INFO 20956 --- [RabbitMQ] [           main] w.s.c.ServletWebServerApplicationContext : Root WebApplicationContext: initialization completed in 577 ms
2024-05-15T23:12:33.738+08:00  INFO 20956 --- [RabbitMQ] [           main] o.s.b.w.embedded.tomcat.TomcatWebServer  : Tomcat started on port 8201 (http) with context path ''
2024-05-15T23:12:33.740+08:00  INFO 20956 --- [RabbitMQ] [           main] o.s.a.r.c.CachingConnectionFactory       : Attempting to connect to: 192.168.124.10:5672
2024-05-15T23:12:33.764+08:00  INFO 20956 --- [RabbitMQ] [           main] o.s.a.r.c.CachingConnectionFactory       : Created new connection: vhost1ConnectionFactory#6af91cc8:0/SimpleConnection@7bee8621 [delegate=amqp://user1@192.168.124.10:5672//vhost1, localPort=3708]
2024-05-15T23:12:33.789+08:00  INFO 20956 --- [RabbitMQ] [           main] o.s.a.r.c.CachingConnectionFactory       : Attempting to connect to: 192.168.124.10:5672
2024-05-15T23:12:33.791+08:00  INFO 20956 --- [RabbitMQ] [           main] o.s.a.r.c.CachingConnectionFactory       : Created new connection: vhost2ConnectionFactory#43acd79e:0/SimpleConnection@385d819 [delegate=amqp://user2@192.168.124.10:5672//vhost2, localPort=3709]
2024-05-15T23:12:33.800+08:00  INFO 20956 --- [RabbitMQ] [           main] c.example.rabbitmq.RabbitMqApplication   : Started RabbitMqApplication in 1.212 seconds (process running for 1.512)
2024-05-15T23:13:00.260+08:00  INFO 20956 --- [RabbitMQ] [nio-8201-exec-1] o.a.c.c.C.[Tomcat].[localhost].[/]       : Initializing Spring DispatcherServlet 'dispatcherServlet'
2024-05-15T23:13:00.261+08:00  INFO 20956 --- [RabbitMQ] [nio-8201-exec-1] o.s.web.servlet.DispatcherServlet        : Initializing Servlet 'dispatcherServlet'
2024-05-15T23:13:00.261+08:00  INFO 20956 --- [RabbitMQ] [nio-8201-exec-1] o.s.web.servlet.DispatcherServlet        : Completed initialization in 0 ms
2024-05-15T23:13:00.315+08:00  INFO 20956 --- [RabbitMQ] [ntContainer#0-1] c.e.rabbitmq.consumer.Vhost1Consumer     : vhost1 exchange1 queue1 consumer meaasge: vhost1 exchange1 message 1715785980295
2024-05-15T23:13:04.012+08:00  INFO 20956 --- [RabbitMQ] [ntContainer#1-1] c.e.rabbitmq.consumer.Vhost1Consumer     : vhost1 exchange2 queue1 consumer meaasge: vhost1 exchange2 message 1715785984008
2024-05-15T23:13:09.439+08:00  INFO 20956 --- [RabbitMQ] [ntContainer#2-1] c.e.rabbitmq.consumer.Vhost2Consumer     : vhost2 exchange1 queue1 consumer meaasge: vhost2 exchange1 message 1715785989435
2024-05-15T23:13:15.260+08:00  INFO 20956 --- [RabbitMQ] [ntContainer#3-1] c.e.rabbitmq.consumer.Vhost2Consumer     : vhost2 exchange2 queue1 consumer meaasge: vhost2 exchange2 message 1715785995257

参考资料

  1. SpringBoot RabbitMQ配置多vhost/多RabbitMQ实例方案_rabbitmq springboot消费多个vhost-CSDN博客
  2. SpringBoot连接多RabbitMQ源 - 知乎 (zhihu.com)
  3. @QueueBinding RabbitMQ 多数据源 队列重复-CSDN博客
  4. 【MQ系列】RabbitListener消费基本使用姿势介绍 | 一灰灰Blog (hhui.top)
0

评论 (0)

打卡
取消