1.添加maven依赖:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
2.配置文件配置RabbitMQ连接信息:
server:
port: 8201
spring:
application:
name: RabbitMQ
rabbitmq:
vhost1:
host: 192.168.124.10
port: 5672
username: user1
password: user1
virtual-host: /vhost1
vhost2:
host: 192.168.124.10
port: 5672
username: user2
password: user2
virtual-host: /vhost2
3.RabbitMQConfig配置
3.1 RabbitConstant
package com.example.rabbitmq.config;
public class RabbitConstant {
// region vhost1 ConnectionFactory.RabbitTemplate.RabbitAdmin配置
public static final String VHOST_1_CONNECTION_FACTORY = "vhost1ConnectionFactory";
public static final String VHOST_1_RABBIT_LISTENER_CONTAINER_FACTORY = "vhost1RabbitListenerContainerFactory";
public static final String VHOST_1_RABBIT_TEMPLATE = "vhost1RabbitTemplate";
public static final String VHOST_1_RABBIT_ADMIN = "vhost1RabbitAdmin";
// endregion
// region vhost2 ConnectionFactory.RabbitTemplate.RabbitAdmin配置
public static final String VHOST_2_CONNECTION_FACTORY = "vhost2ConnectionFactory";
public static final String VHOST_2_RABBIT_LISTENER_CONTAINER_FACTORY = "vhost2RabbitListenerContainerFactory";
public static final String VHOST_2_RABBIT_TEMPLATE = "vhost2RabbitTemplate";
public static final String VHOST_2_RABBIT_ADMIN = "vhost2RabbitAdmin";
// endregion
// region vhost1 测试交换机.队列.路由配置
public static final String VHOST_1_TEST_EXCHANGE_1 = "vhost1TestExchange1";
public static final String VHOST_1_TEST_EXCHANGE_2 = "vhost1TestExchange2";
public static final String VHOST_1_TEST_EXCHANGE_1_QUEUE_1 = "vhost1TestExchange1Queue1";
public static final String VHOST_1_TEST_EXCHANGE_1_ROUTING_KEY_1 = "vhost1TestExchange1RoutingKey1";
public static final String VHOST_1_TEST_EXCHANGE_2_QUEUE_1 = "vhost1TestExchange2Queue1";
public static final String VHOST_1_TEST_EXCHANGE_2_ROUTING_KEY_1 = "vhost1TestExchange2RoutingKey1";
// endregion
// region vhost2 测试交换机.队列.路由配置
public static final String VHOST_2_TEST_EXCHANGE_1 = "vhost2TestExchange1";
public static final String VHOST_2_TEST_EXCHANGE_2 = "vhost2TestExchange2";
public static final String VHOST_2_TEST_EXCHANGE_1_QUEUE_1 = "vhost2TestExchange1Queue1";
public static final String VHOST_2_TEST_EXCHANGE_1_ROUTING_KEY_1 = "vhost2TestExchange1RoutingKey1";
public static final String VHOST_2_TEST_EXCHANGE_2_QUEUE_1 = "vhost2TestExchange2Queue1";
public static final String VHOST_2_TEST_EXCHANGE_2_ROUTING_KEY_1 = "vhost2TestExchange2RoutingKey1";
// endregion
}
3.2 Vhost1RabbitMQConfig
package com.example.rabbitmq.config;
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
@Configuration
public class Vhost1RabbitMQConfig {
@Value("${spring.rabbitmq.vhost1.host}")
private String host;
@Value("${spring.rabbitmq.vhost1.port}")
private int port;
@Value("${spring.rabbitmq.vhost1.username}")
private String username;
@Value("${spring.rabbitmq.vhost1.password}")
private String password;
@Value("${spring.rabbitmq.vhost1.virtual-host}")
private String vhost;
// 为vhost1配置ConnectionFactory
@Bean(name = RabbitConstant.VHOST_1_CONNECTION_FACTORY)
@Primary
public ConnectionFactory vhost1ConnectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory(host, port);
connectionFactory.setUsername(username);
connectionFactory.setPassword(password);
connectionFactory.setVirtualHost(vhost);
return connectionFactory;
}
// 为vhost1配置SimpleRabbitListenerContainerFactory
@Bean(name = RabbitConstant.VHOST_1_RABBIT_LISTENER_CONTAINER_FACTORY)
@Primary
public SimpleRabbitListenerContainerFactory vhost1RabbitListenerContainerFactory(
@Qualifier(RabbitConstant.VHOST_1_CONNECTION_FACTORY)ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
return factory;
}
// 为vhost1配置RabbitTemplate
@Bean(name = RabbitConstant.VHOST_1_RABBIT_TEMPLATE)
@Primary
public RabbitTemplate vhost1RabbitTemplate(
@Qualifier(RabbitConstant.VHOST_1_CONNECTION_FACTORY) ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
// 可以设置其他属性,如消息转换器
return rabbitTemplate;
}
// 为vhost1配置RabbitAdmin
@Bean(name = RabbitConstant.VHOST_1_RABBIT_ADMIN)
@Primary
public RabbitAdmin vhost1RabbitAdmin(
@Qualifier(RabbitConstant.VHOST_1_CONNECTION_FACTORY) ConnectionFactory connectionFactory) {
RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
rabbitAdmin.setAutoStartup(true);
return rabbitAdmin;
}
}
3.3 Vhost2RabbitMQConfig
package com.example.rabbitmq.config;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
@Configuration
public class Vhost2RabbitMQConfig {
@Value("${spring.rabbitmq.vhost2.host}")
private String host;
@Value("${spring.rabbitmq.vhost2.port}")
private int port;
@Value("${spring.rabbitmq.vhost2.username}")
private String username;
@Value("${spring.rabbitmq.vhost2.password}")
private String password;
@Value("${spring.rabbitmq.vhost2.virtual-host}")
private String vhost;
// 为vhost2配置ConnectionFactory
@Bean(name = RabbitConstant.VHOST_2_CONNECTION_FACTORY)
public ConnectionFactory vhost2ConnectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory(host, port);
connectionFactory.setUsername(username);
connectionFactory.setPassword(password);
connectionFactory.setVirtualHost(vhost);
return connectionFactory;
}
// 为vhost2配置SimpleRabbitListenerContainerFactory
@Bean(name = RabbitConstant.VHOST_2_RABBIT_LISTENER_CONTAINER_FACTORY)
public SimpleRabbitListenerContainerFactory vhost2RabbitListenerContainerFactory(
@Qualifier(RabbitConstant.VHOST_2_CONNECTION_FACTORY)ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
return factory;
}
// 为vhost2配置RabbitTemplate
@Bean(name = RabbitConstant.VHOST_2_RABBIT_TEMPLATE)
public RabbitTemplate vhost2RabbitTemplate(
@Qualifier(RabbitConstant.VHOST_2_CONNECTION_FACTORY) ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
// 可以设置其他属性,如消息转换器
return rabbitTemplate;
}
// 为vhost2配置RabbitAdmin
@Bean(name = RabbitConstant.VHOST_2_RABBIT_ADMIN)
public RabbitAdmin vhost2RabbitAdmin(
@Qualifier(RabbitConstant.VHOST_2_CONNECTION_FACTORY) ConnectionFactory connectionFactory) {
RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
rabbitAdmin.setAutoStartup(true);
return rabbitAdmin;
}
}
4.消费者
其中 @QueueBinding下的admins=的配置是交换机和队列数据不会在vhost之间扩散的必要条件
4.1 Vhost1Consumer
package com.example.rabbitmq.consumer;
import com.example.rabbitmq.config.RabbitConstant;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
@Slf4j
public class Vhost1Consumer {
@RabbitListener(
containerFactory = RabbitConstant.VHOST_1_RABBIT_LISTENER_CONTAINER_FACTORY,
bindings = @QueueBinding(
value = @Queue(value = RabbitConstant.VHOST_1_TEST_EXCHANGE_1_QUEUE_1,
admins = RabbitConstant.VHOST_1_RABBIT_ADMIN),
exchange = @Exchange(value = RabbitConstant.VHOST_1_TEST_EXCHANGE_1,
type = ExchangeTypes.TOPIC,
admins = RabbitConstant.VHOST_1_RABBIT_ADMIN),
key = RabbitConstant.VHOST_1_TEST_EXCHANGE_1_ROUTING_KEY_1,
admins = RabbitConstant.VHOST_1_RABBIT_ADMIN)
)
public void vhost1Exchang1Queue1Consumer(String message) {
log.info("vhost1 exchange1 queue1 consumer meaasge: {}", message);
}
@RabbitListener(
containerFactory = RabbitConstant.VHOST_1_RABBIT_LISTENER_CONTAINER_FACTORY,
bindings = @QueueBinding(
value = @Queue(value = RabbitConstant.VHOST_1_TEST_EXCHANGE_2_QUEUE_1,
admins = RabbitConstant.VHOST_1_RABBIT_ADMIN),
exchange = @Exchange(value = RabbitConstant.VHOST_1_TEST_EXCHANGE_2,
type = ExchangeTypes.TOPIC,
admins = RabbitConstant.VHOST_1_RABBIT_ADMIN),
key = RabbitConstant.VHOST_1_TEST_EXCHANGE_2_ROUTING_KEY_1,
admins = RabbitConstant.VHOST_1_RABBIT_ADMIN))
public void vhost1Exchang2Queue1Consumer(String message) {
log.info("vhost1 exchange2 queue1 consumer meaasge: {}", message);
}
}
4.2 Vhost2Consumer
package com.example.rabbitmq.consumer;
import com.example.rabbitmq.config.RabbitConstant;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
@Slf4j
public class Vhost2Consumer {
@RabbitListener(
containerFactory = RabbitConstant.VHOST_2_RABBIT_LISTENER_CONTAINER_FACTORY,
bindings = @QueueBinding(
value = @Queue(value = RabbitConstant.VHOST_2_TEST_EXCHANGE_1_QUEUE_1,
admins = RabbitConstant.VHOST_2_RABBIT_ADMIN),
exchange = @Exchange(value = RabbitConstant.VHOST_2_TEST_EXCHANGE_1,
type = ExchangeTypes.TOPIC,
admins = RabbitConstant.VHOST_2_RABBIT_ADMIN),
key = RabbitConstant.VHOST_2_TEST_EXCHANGE_1_ROUTING_KEY_1,
admins = RabbitConstant.VHOST_2_RABBIT_ADMIN)
)
public void vhost2Exchang1Queue1Consumer(String message) {
log.info("vhost2 exchange1 queue1 consumer meaasge: {}", message);
}
@RabbitListener(
containerFactory = RabbitConstant.VHOST_2_RABBIT_LISTENER_CONTAINER_FACTORY,
bindings = @QueueBinding(
value = @Queue(value = RabbitConstant.VHOST_2_TEST_EXCHANGE_2_QUEUE_1,
admins = RabbitConstant.VHOST_2_RABBIT_ADMIN),
exchange = @Exchange(value = RabbitConstant.VHOST_2_TEST_EXCHANGE_2,
type = ExchangeTypes.TOPIC,
admins = RabbitConstant.VHOST_2_RABBIT_ADMIN),
key = RabbitConstant.VHOST_2_TEST_EXCHANGE_2_ROUTING_KEY_1,
admins = RabbitConstant.VHOST_2_RABBIT_ADMIN))
public void vhost2Exchang2Queue1Consumer(String message) {
log.info("vhost2 exchange2 queue1 consumer meaasge: {}", message);
}
}
5.生产者ProducerController
package com.example.rabbitmq.producer;
import com.example.rabbitmq.config.RabbitConstant;
import jakarta.annotation.Resource;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
@RequestMapping("/")
public class ProducerController {
@Resource(name = RabbitConstant.VHOST_1_RABBIT_TEMPLATE)
RabbitTemplate vhost1RabbitTemplate;
@Resource(name = RabbitConstant.VHOST_2_RABBIT_TEMPLATE)
RabbitTemplate vhost2RabbitTemplate;
@GetMapping("/")
public String index() {
return "rabbitmq study index";
}
@GetMapping("/produceVhost1Exchange1Message")
public String produceVhost1Exchange1Message() {
String content1 = "vhost1 exchange1 message " + System.currentTimeMillis();
vhost1RabbitTemplate.convertAndSend(
RabbitConstant.VHOST_1_TEST_EXCHANGE_1,
RabbitConstant.VHOST_1_TEST_EXCHANGE_1_ROUTING_KEY_1,
content1);
return content1 + " send success";
}
@GetMapping("/produceVhost1Exchange2Message")
public String produceVhost1Exchange2Message() {
String content1 = "vhost1 exchange2 message " + System.currentTimeMillis();
vhost1RabbitTemplate.convertAndSend(
RabbitConstant.VHOST_1_TEST_EXCHANGE_2,
RabbitConstant.VHOST_1_TEST_EXCHANGE_2_ROUTING_KEY_1,
content1);
return content1 + " send success";
}
@GetMapping("/produceVhost2Exchange1Message")
public String produceVhost2Exchange1Message() {
String content1 = "vhost2 exchange1 message " + System.currentTimeMillis();
vhost2RabbitTemplate.convertAndSend(
RabbitConstant.VHOST_2_TEST_EXCHANGE_1,
RabbitConstant.VHOST_2_TEST_EXCHANGE_1_ROUTING_KEY_1,
content1);
return content1 + "send success ";
}
@GetMapping("/produceVhost2Exchange2Message")
public String produceVhost2Exchange2Message() {
String content1 = "vhost2 exchange2 message " + System.currentTimeMillis();
vhost2RabbitTemplate.convertAndSend(
RabbitConstant.VHOST_2_TEST_EXCHANGE_2,
RabbitConstant.VHOST_2_TEST_EXCHANGE_2_ROUTING_KEY_1,
content1);
return content1 + "send success ";
}
}
6.启动测试日志
. ____ _ __ _ _
/\\ / ___'_ __ _ _(_)_ __ __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
\\/ ___)| |_)| | | | | || (_| | ) ) ) )
' |____| .__|_| |_|_| |_\__, | / / / /
=========|_|==============|___/=/_/_/_/
:: Spring Boot :: (v3.2.5)
2024-05-15T23:12:32.832+08:00 INFO 20956 --- [RabbitMQ] [ main] c.example.rabbitmq.RabbitMqApplication : Starting RabbitMqApplication using Java 17.0.8 with PID 20956 (C:\Users\vin\Desktop\WorkSpace\StudyProject\RabbitMQ\target\classes started by vin in C:\Users\vin\Desktop\WorkSpace\StudyProject)
2024-05-15T23:12:32.834+08:00 INFO 20956 --- [RabbitMQ] [ main] c.example.rabbitmq.RabbitMqApplication : No active profile set, falling back to 1 default profile: "default"
2024-05-15T23:12:33.401+08:00 INFO 20956 --- [RabbitMQ] [ main] o.s.b.w.embedded.tomcat.TomcatWebServer : Tomcat initialized with port 8201 (http)
2024-05-15T23:12:33.408+08:00 INFO 20956 --- [RabbitMQ] [ main] o.apache.catalina.core.StandardService : Starting service [Tomcat]
2024-05-15T23:12:33.408+08:00 INFO 20956 --- [RabbitMQ] [ main] o.apache.catalina.core.StandardEngine : Starting Servlet engine: [Apache Tomcat/10.1.20]
2024-05-15T23:12:33.438+08:00 INFO 20956 --- [RabbitMQ] [ main] o.a.c.c.C.[Tomcat].[localhost].[/] : Initializing Spring embedded WebApplicationContext
2024-05-15T23:12:33.439+08:00 INFO 20956 --- [RabbitMQ] [ main] w.s.c.ServletWebServerApplicationContext : Root WebApplicationContext: initialization completed in 577 ms
2024-05-15T23:12:33.738+08:00 INFO 20956 --- [RabbitMQ] [ main] o.s.b.w.embedded.tomcat.TomcatWebServer : Tomcat started on port 8201 (http) with context path ''
2024-05-15T23:12:33.740+08:00 INFO 20956 --- [RabbitMQ] [ main] o.s.a.r.c.CachingConnectionFactory : Attempting to connect to: 192.168.124.10:5672
2024-05-15T23:12:33.764+08:00 INFO 20956 --- [RabbitMQ] [ main] o.s.a.r.c.CachingConnectionFactory : Created new connection: vhost1ConnectionFactory#6af91cc8:0/SimpleConnection@7bee8621 [delegate=amqp://user1@192.168.124.10:5672//vhost1, localPort=3708]
2024-05-15T23:12:33.789+08:00 INFO 20956 --- [RabbitMQ] [ main] o.s.a.r.c.CachingConnectionFactory : Attempting to connect to: 192.168.124.10:5672
2024-05-15T23:12:33.791+08:00 INFO 20956 --- [RabbitMQ] [ main] o.s.a.r.c.CachingConnectionFactory : Created new connection: vhost2ConnectionFactory#43acd79e:0/SimpleConnection@385d819 [delegate=amqp://user2@192.168.124.10:5672//vhost2, localPort=3709]
2024-05-15T23:12:33.800+08:00 INFO 20956 --- [RabbitMQ] [ main] c.example.rabbitmq.RabbitMqApplication : Started RabbitMqApplication in 1.212 seconds (process running for 1.512)
2024-05-15T23:13:00.260+08:00 INFO 20956 --- [RabbitMQ] [nio-8201-exec-1] o.a.c.c.C.[Tomcat].[localhost].[/] : Initializing Spring DispatcherServlet 'dispatcherServlet'
2024-05-15T23:13:00.261+08:00 INFO 20956 --- [RabbitMQ] [nio-8201-exec-1] o.s.web.servlet.DispatcherServlet : Initializing Servlet 'dispatcherServlet'
2024-05-15T23:13:00.261+08:00 INFO 20956 --- [RabbitMQ] [nio-8201-exec-1] o.s.web.servlet.DispatcherServlet : Completed initialization in 0 ms
2024-05-15T23:13:00.315+08:00 INFO 20956 --- [RabbitMQ] [ntContainer#0-1] c.e.rabbitmq.consumer.Vhost1Consumer : vhost1 exchange1 queue1 consumer meaasge: vhost1 exchange1 message 1715785980295
2024-05-15T23:13:04.012+08:00 INFO 20956 --- [RabbitMQ] [ntContainer#1-1] c.e.rabbitmq.consumer.Vhost1Consumer : vhost1 exchange2 queue1 consumer meaasge: vhost1 exchange2 message 1715785984008
2024-05-15T23:13:09.439+08:00 INFO 20956 --- [RabbitMQ] [ntContainer#2-1] c.e.rabbitmq.consumer.Vhost2Consumer : vhost2 exchange1 queue1 consumer meaasge: vhost2 exchange1 message 1715785989435
2024-05-15T23:13:15.260+08:00 INFO 20956 --- [RabbitMQ] [ntContainer#3-1] c.e.rabbitmq.consumer.Vhost2Consumer : vhost2 exchange2 queue1 consumer meaasge: vhost2 exchange2 message 1715785995257
评论 (0)