ActiveMQ消息中间件学习笔记

jupiter
2023-08-25 / 0 评论 / 45 阅读 / 正在检测是否收录...
温馨提示:
本文最后更新于2023年08月25日,已超过242天没有更新,若内容或图片失效,请留言反馈。

1.消息中间件简介及作用

两个系统或两个客户端之间进行消息传送,利用高效可靠的消息传递机制进行平台无关的数据交流,并基于数据通信来进行分布式系统的集成。通过提供消息传递和消息排队模型,它可以在分布式环境下扩展进程间的通信。

消息中间件,总结起来作用有三个:异步化提升性能、降低耦合度、流量削峰。

2 消息中间件的应用场景

2.1 异步通信

有些业务不想也不需要立即处理消息。消息队列提供了异步处理机制,允许用户把一个消息放入队列,但并不立即处理它。想向队列中放入多少消息就放多少,然后在需要的时候再去处理它们。

2.2 缓冲

在任何重要的系统中,都会有需要不同的处理时间的元素。消息队列通过一个缓冲层来帮助任务最高效率的执行,该缓冲有助于控制和优化数据流经过系统的速度。以调节系统响应时间。

2.3 解耦

降低工程间的强依赖程度,针对异构系统进行适配。在项目启动之初来预测将来项目会碰到什么需求,是极其困难的。通过消息系统在处理过程中间插入了一个隐含的、基于数据的接口层,两边的处理过程都要实现这一接口,当应用发生变化时,可以独立的扩展或修改两边的处理过程,只要确保它们遵守同样的接口约束。

2.4 冗余

有些情况下,处理数据的过程会失败。除非数据被持久化,否则将造成丢失。消息队列把数据进行持久化直到它们已经被完全处理,通过这一方式规避了数据丢失风险。许多消息队列所采用的”插入-获取-删除”范式中,在把一个消息从队列中删除之前,需要你的处理系统明确的指出该消息已经被处理完毕,从而确保你的数据被安全的保存直到你使用完毕。

2.5 扩展性

因为消息队列解耦了你的处理过程,所以增大消息入队和处理的频率是很容易的,只要另外增加处理过程即可。不需要改变代码、不需要调节参数。便于分布式扩容。

2.6 可恢复性

系统的一部分组件失效时,不会影响到整个系统。消息队列降低了进程间的耦合度,所以即使一个处理消息的进程挂掉,加入队列中的消息仍然可以在系统恢复后被处理。

2.7 顺序保证

在大多使用场景下,数据处理的顺序都很重要。大部分消息队列本来就是排序的,并且能保证数据会按照特定的顺序来处理。

2.8 过载保护

在访问量剧增的情况下,应用仍然需要继续发挥作用,但是这样的突发流量无法提取预知;如果以为了能处理这类瞬间峰值访问为标准来投入资源随时待命无疑是巨大的浪费。使用消息队列能够使关键组件顶住突发的访问压力,而不会因为突发的超负荷的请求而完全崩溃。

2.9 数据流处理

分布式系统产生的海量数据流,如:业务日志、监控数据、用户行为等,针对这些数据流进行实时或批量采集汇总,然后进行大数据分析是当前互联网的必备技术,通过消息队列完成此类数据收集是最好的选择。

3.常用消息队列

特性ActiveMQRabbitMQRocketMQKafka
生产者消费者模式支持支持支持支持
发布订阅模式支持支持支持支持
请求回应模式支持支持不支持不支持
Api完备性
多语言支持支持支持java支持
单机吞吐量万级万级万级十万级
消息延迟微秒级毫秒级毫秒级
可用性高(主从)高(主从)非常高(分布式)非常高(分布式)
消息丢失理论上不会丢失理论上不会丢失
文档的完备性
提供快速入门
社区活跃度
商业支持商业云商业云

4.消息中间件的角色

Queue: 队列存储,常用与点对点消息模型 ,默认只能由唯一的一个消费者处理。一旦处理消息删除。

Topic: 主题存储,用于订阅/发布消息模型,主题中的消息,会发送给所有的消费者同时处理。只有在消息可以重复处 理的业务场景中可使用,Queue/Topic都是 Destination 的子接口

ConnectionFactory: 连接工厂,客户用来创建连接的对象,例如ActiveMQ提供的ActiveMQConnectionFactory

Connection: JMS Connection封装了客户与JMS提供者之间的一个虚拟的连接。

Destination: 消息的目的地,目的地是客户用来指定它生产的消息的目标和它消费的消息的来源的对象。JMS1.0.2规范中定义了两种消息传递域:点对点(PTP)消息传递域和发布/订阅消息传递域。

点对点消息传递域的特点如下:

  • 每个消息只能有一个消费者。
  • 消息的生产者和消费者之间没有时间上的相关性。无论消费者在生产者发送消息的时候是否处于运行状态,它都可以提取消息。

发布/订阅消息传递域的特点如下:

  • 每个消息可以有多个消费者。
  • 生产者和消费者之间有时间上的相关性。
  • 订阅一个主题的消费者只能消费自它订阅之后发布的消息。JMS规范允许客户创建持久订阅,这在一定程度上放松了时间上的相关性要求 。持久订阅允许消费者消费它在未处于激活状态时发送的消息。
    在点对点消息传递域中,目的地被成为队列(queue);在发布/订阅消息传递域中,目的地被成为主题(topic)。

5.JMS的消息格式

JMS消息由以下三部分组成的:

  • 消息头:

    每个消息头字段都有相应的getter和setter方法。
  • 消息属性:

    如果需要除消息头字段以外的值,那么可以使用消息属性。
  • 消息体:

    JMS定义的消息类型有TextMessage、MapMessage、BytesMessage、StreamMessage和ObjectMessage。

消息类型:

属性类型
TextMessage文本消息
MapMessagek/v
BytesMessage字节流
StreamMessagejava原始的数据流
ObjectMessage序列化的java对象

6.消息可靠性机制

只有在被确认之后,才认为已经被成功地消费了,消息的成功消费通常包含三个阶段 :客户接收消息、客户处理消息和消息被确认在事务性会话中,当一个事务被提交的时候,确认自动发生。在非事务性会话中,消息何时被确认取决于创建会话时的应答模式(acknowledgement mode)。该参数有以下三个可选值:

  • Session.AUTO_ACKNOWLEDGE:当客户成功的从receive方法返回的时候,或者从MessageListener.onMessage方法成功返回的时候,会话自动确认客户收到的消息。
  • Session.CLIENT_ACKNOWLEDGE:客户通过消息的acknowledge方法确认消息。需要注意的是,在这种模式中,确认是在会话层上进行:确认一个被消费的消息将自动确认所有已被会话消费的消息。例如,如果一个消息消费者消费了10个消息,然后确认第5个消息,那么所有10个消息都被确认。
  • Session.DUPS_ACKNOWLEDGE:该选择只是会话迟钝的确认消息的提交。如果JMS Provider失败,那么可能会导致一些重复的消息。如果是重复的消息,那么JMS Provider必须把消息头的JMSRedelivered字段设置为true。

6.1 优先级

可以使用消息优先级来指示JMS Provider首先提交紧急的消息。优先级分10个级别,从0(最低)到9(最高)。如果不指定优先级,默认级别是4。需要注意的是,JMS Provider并不一定保证按照优先级的顺序提交消息。

6.2 消息过期

可以设置消息在一定时间后过期,默认是永不过期。

6.3 临时目的地

可以通过会话上的createTemporaryQueue方法和createTemporaryTopic方法来创建临时目的地。它们的存在时间只限于创建它们的连接所保持的时间。只有创建该临时目的地的连接上的消息消费者才能够从临时目的地中提取消息。

7.ActiveMQ

7.1 简介

ActiveMQ是一种开源的基于JMS(Java Message Servie)规范的一种消息中间件的实现,ActiveMQ的设计目标是提供标准的,面向消息的,能够跨越多语言和多系统的应用集成消息通信中间件。

官网地址:http://activemq.apache.org/

ActiveMQ主要特点:

  1. 支持多语言、多协议客户端。语言: Java,C,C++,C#,Ruby,Perl,Python,PHP。应用协议: OpenWire, Stomp REST, WS Notification, XMPP, AMQP
  2. 对Spring的支持,ActiveMQ可以很容易整合到Spring的系统里面去。
  3. 支持高可用、高性能的集群模式。

7.2 存储方式

1. KahaDB存储: KahaDB是默认的持久化策略,所有消息顺序添加到一个日志文件中,同时另外有一个索引文件记录指向这些日志的存储地址,还有一个事务日志用于消息回复操作。是一个专门针对消息持久化的解决方案,它对典型的消息使用模式进行了优化

特性:
1、日志形式存储消息;
2、消息索引以 B-Tree 结构存储,可以快速更新;
3、 完全支持 JMS 事务;
4、支持多种恢复机制kahadb 可以限制每个数据文件的大小。不代表总计数据容量。

2. AMQ 方式: 只适用于 5.3 版本之前。 AMQ 也是一个文件型数据库,消息信息最终是存储在文件中。内存中也会有缓存数据。

3. JDBC存储 : 使用JDBC持久化方式,数据库默认会创建3个表,每个表的作用如下:

activemq_msgs:queue和topic的消息都存在这个表中
activemq_acks:存储持久订阅的信息和最后一个持久订阅接收的消息ID
activemq_lock:跟kahadb的lock文件类似,确保数据库在某一时刻只有一个broker在访问

4. LevelDB存储 : LevelDB持久化性能高于KahaDB,但是在ActiveMQ官网对LevelDB的表述:LevelDB官方建议使用以及不再支持,推荐使用的是KahaDB

5.Memory 消息存储: 顾名思义,基于内存的消息存储,就是消息存储在内存中。persistent=”false”,表示不设置持 久化存储,直接存储到内存中,在broker标签处设置。

8.ActiveMQ安装

9.使用原生java进行交互

9.1 项目依赖

ActiveMQ 的解压包中,提供了运行 ActiveMQ 需要的 jar 包。ActiveMQ 是实现了 JMS 规范的。在实现消息服务的时候,必须基于 API 接口规范。maven依赖:

<!-- https://mvnrepository.com/artifact/org.apache.activemq/activemq-all -->
<dependency>
    <groupId>org.apache.activemq</groupId>
    <artifactId>activemq-all</artifactId>
    <version>5.18.2</version>
</dependency>

9.2 JMS 常用的 API 说明

下述 API 都是接口类型,定义在 javax.jms 包中,是 JMS 标准接口定义。ActiveMQ 完全实现这一套 api 标准。

接口作用
ConnectionFactory链接工厂, 用于创建链接的工厂类型.
Connection链接. 用于建立访问 ActiveMQ 连接的类型, 由链接工厂创建.
Session会话, 一次持久有效有状态的访问. 由链接创建.
Destination & Queue目的地, 用于描述本次访问 ActiveMQ 的消息访问目的地. 即 ActiveMQ 服务中的具体队 列. 由会话创建. interface Queue extends Destination
MessageProducer消息生成者, 在一次有效会话中, 用于发送消息给 ActiveMQ 服务的工具. 由会话创建
MessageConsumer消息消费者【消息订阅者,消息处理者】, 在一次有效会话中, 用于从 ActiveMQ 服务中 获取消息的工具. 由会话创建
Message消息, 通过消息生成者向 ActiveMQ 服务发送消息时使用的数据载体对象或消息消费者 从 ActiveMQ 服务中获取消息时使用的数据载体对象. 是所有消息【文本消息,对象消息等】 具体类型的顶级接口. 可以通过会话创建或通过会话从 ActiveMQ 服务中获取. . .

9.3 创建消息生成者,发送消息

public class JmsProducer {
    public static void main(String[] args) throws JMSException {
        /*
         * 创建连接工厂,由 ActiveMQ 实现。构造方法参数
         * userName 用户名
         * password 密码
         * brokerURL 访问 ActiveMQ 服务的路径地址,结构为: 协议名://主机地址:端口号
         */
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("admin", "admin", "tcp://127.0.0.1:61616");

        //创建连接对象
        Connection connection = connectionFactory.createConnection();

        //启动连接
        connection.start();

        /*
         * 创建会话,参数含义:
         * 1.transacted - 是否使用事务
         * 2.acknowledgeMode - 消息确认机制,可选机制为:
         *  1)Session.AUTO_ACKNOWLEDGE - 自动确认消息
         *  2)Session.CLIENT_ACKNOWLEDGE - 客户端确认消息机制
         *  3)Session.DUPS_OK_ACKNOWLEDGE - 有副本的客户端确认消息机制
         */
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

        //创建目的地,也就是队列名
        Destination destination = session.createQueue("mq_test");

        //创建消息生成者,该生成者与目的地绑定
        MessageProducer messageProducer = session.createProducer(destination);

        //创建消息
        Message message = session.createTextMessage("Hello, ActiveMQ");
        
        //发送消息
        messageProducer.send(message);
    }
}
  • 运行后查看web控制台管理界面可以看到生成了对应的消息队列和消息。

9.4 创建消息消费者,接收消息

public class JmsConsumer {
    public static void main(String[] args) throws JMSException {
        /*
         * 创建连接工厂,由 ActiveMQ 实现。构造方法参数
         * userName 用户名
         * password 密码
         * brokerURL 访问 ActiveMQ 服务的路径地址,结构为: 协议名://主机地址:端口号
         */
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("admin", "admin", "tcp://127.0.0.1:61616");

        //创建连接对象
        Connection connection = connectionFactory.createConnection();

        //启动连接
        connection.start();

        /*
         * 创建会话,参数含义:
         * 1.transacted - 是否使用事务
         * 2.acknowledgeMode - 消息确认机制,可选机制为:
         *  1)Session.AUTO_ACKNOWLEDGE - 自动确认消息
         *  2)Session.CLIENT_ACKNOWLEDGE - 客户端确认消息机制
         *  3)Session.DUPS_OK_ACKNOWLEDGE - 有副本的客户端确认消息机制
         */
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

        //创建目的地,也就是队列名
        Destination destination = session.createQueue("mq_test");

        //创建消息生成者,该生成者与目的地绑定
        MessageConsumer messageConsumer = session.createConsumer(destination);

        //创建消息
        Message message = session.createTextMessage("Hello, ActiveMQ");

        //读取消息
        while(true){
            TextMessage textMessage = (TextMessage)messageConsumer.receive(10000);
            if(textMessage != null){
                System.out.println("Accept msg : "+textMessage.getText());
            }else{
                break;
            }
        }
    }
}
Accept msg : Hello, ActiveMQ
  • 运行后查看web控制台管理界面可以看到对应的消息已经被消费了。

10.springboot3整合ActiveMQ

10.1 项目依赖

创建标准的Spring Boot项目,并在项目中引入以下依赖:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-activemq</artifactId>
      <version>3.1.3</version>
</dependency>

此时如果不需要web或其他相关处理,只引入该依赖即可。如果使用pool的话, 就需要在pom中加入以下依赖:

<dependency>
     <groupId>org.apache.activemq</groupId>
     <artifactId>activemq-pool</artifactId>
     <version>5.12.1</version>
</dependency>

10.2 配置文件

spring:
  activemq:
    broker-url: tcp://127.0.0.1:61616 #ActiveMQ通讯地址
    user: admin #用户名
    password: admin #密码
    in-memory: false #是否启用内存模式(就是不安装MQ,项目启动时同时启动一个MQ实例)
    packages:
      trust-all: true #信任所有的包
#    pool:
#      enabled: true       #连接池启动
#      max-connections: 10 #最大连接数
    pool:
      enabled: false
  jms:
    pub-sub-domain: false #设置是Queue队列还是Topic,false为Queue,true为Topic,默认false-Queue

10.3 在SpringBoot的启动类,类上添加注解@EnableJms

10.4 创建配置类ActiveMQConfig,读取yml中的内容,并且创建对象

@Configuration
public class ActiveMQConfig {
    @Value("${spring.activemq.broker-url}")
    private String brokerUrl;

    @Value("${spring.activemq.user}")
    private String userName;

    @Value("${spring.activemq.password}")
    private String password;

    @Bean(name = "queue")
    public Queue queue() {
        return new ActiveMQQueue("springboot.queue");
    }

    @Bean(name = "topic")
    public Topic topic(){
        return new ActiveMQTopic("springboot.topic");
    }

    @Bean
    public ConnectionFactory connectionFactory(){
        return new ActiveMQConnectionFactory(userName, password, brokerUrl);
    }

    // 在Queue模式中,对消息的监听需要对containerFactory进行配置
    @Bean("queueListener")
    public JmsListenerContainerFactory<?> queueJmsListenerContainerFactory(ConnectionFactory connectionFactory){
        SimpleJmsListenerContainerFactory factory = new SimpleJmsListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setPubSubDomain(false);
        return factory;
    }

    // 在Topic模式中,对消息的监听需要对containerFactory进行配置
    @Bean("topicListener")
    public JmsListenerContainerFactory<?> topicJmsListenerContainerFactory(ConnectionFactory connectionFactory){
        SimpleJmsListenerContainerFactory factory = new SimpleJmsListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setPubSubDomain(true);
        return factory;
    }
}

10.5 创建生产者

@RestController
public class Producer {

    @Autowired
    private JmsMessagingTemplate jmsTemplate;

    @Autowired
    private Queue queue;

    @Autowired
    private Topic topic;

    //发送queue类型消息
    @GetMapping("/queue")
    public void sendQueueMsg(String msg){
        jmsTemplate.convertAndSend(queue, msg);
    }

    //发送topic类型消息
    @GetMapping("/topic")
    public void sendTopicMsg(String msg){
        jmsTemplate.convertAndSend(topic, msg);
    }

}

10.6 创建消费者

@Component
public class Consumer {
    //接收queue类型消息
    //destination对应配置类中ActiveMQQueue("springboot.queue")设置的名字
    @JmsListener(destination="springboot.queue", containerFactory = "queueListener")
    public void ListenQueue(String msg){
        System.out.println("接收到queue消息:" + msg);
    }

    //接收topic类型消息
    //destination对应配置类中ActiveMQTopic("springboot.topic")设置的名字
    //containerFactory对应配置类中注册JmsListenerContainerFactory的bean名称
    @JmsListener(destination="springboot.topic", containerFactory = "topicListener")
    public void ListenTopic(String msg){
        System.out.println("接收到topic消息:" + msg);
    }
}

10.7 运行测试

  • queue测试地址:localhost:8080/queue?msg=hello
  • topic测试地址:localhost:8080/topic?msg=hello

    • 注:测试topic模式的时候需要把配置文件的 jms. pub-sub-domain设置为true

参考资料

  1. ActiveMQ 入门指引 - 知乎 (zhihu.com)
  2. ActiveMQ详细入门教程系列(一) - 牧小农 - 博客园 (cnblogs.com)
  3. 从入门到精通的ActiveMQ(一) - 知乎 (zhihu.com)
  4. ActiveMQ (apache.org)
  5. ActiveMQ——Java连接ActiveMQ(点对点) - 知乎 (zhihu.com)
  6. 消息中间件系列三、JMS和activeMQ的简单使用-阿里云开发者社区 (aliyun.com)
  7. springboot整合ActiveMQ(点对点+发布订阅)-阿里云开发者社区 (aliyun.com)
  8. SpringBoot集成ActiveMQ实例详解 - 知乎 (zhihu.com)
  9. SpringBoot使用activeMq(绝对可用!亲测)_码学弟的博客-CSDN博客(★★★)
  10. Springboot整合ActiveMQ(Queue和Topic两种模式)_码学弟的博客-CSDN博客
0

评论 (0)

打卡
取消