Windows配置
2024年10月24日
17:36
1.下载安装erlang windows.exe,配置环境变量
erl -version
2.下载安装rabbitMQ windows.exe,rabbitmq下的sbin目录,输入命令
rabbitmq-server.bat 前台运行服务
rabbitmq-server.bat –detached 后台运行服务
3.访问图形化界面
localhost:15672
注 : 必须以管理员权限安装
cmd命令
2024年10月25日
9:01
服务
2024年10月25日
9:16
rabbitmq-service命令 :
|
start |
启动rabbit服务 |
|
stop |
停止rabbit服务 |
|
disable |
关闭rabbit服务 |
|
enable |
开启rabbit服务 |
rabbit-server命令 :
|
rabbitmq-server |
前台运行rabbit |
|
rabbitmq-server –detached |
后台运行rabbit |
操作
2024年10月25日
9:17
rabbitmq-plugins命令 :
|
enable rabbitmq_management |
开启图形化界面插件 |
rabbitmqctl命令 :
|
version |
查看rabbitmq版本 |
|
add_user |
添加用户 |
|
set_user_tags administrator |
分配角色 |
|
list_users |
查看用户列表 |
|
change_password |
修改密码 |
|
list_permissions -p / |
查看用户对virtualhost的权限 |
|
set_permissions -p / “.*” “.*” “.*” |
设置指定主机的用户权限 |
|
list_queues -p |
列出指定虚拟主机的所有队列 |
|
list_exchanges -p |
列出指定虚拟主机的所有交换机 |
|
list_bindings -p |
列出指定虚拟主机的所有绑定 |
|
purge_queue |
清空指定队列的所有消息 |
角色 :
adminstrator 超级管理员 :
1.更改策略(policy)
2.更改用户
3.查看所有信息
monitoring 监控者 :
1.可以查看rabbitMQ节点相关的信息
policymaker 策略制定者 :
1.更改策略
management 普通管理者 :
1.仅登录
端口号
2024年10月25日
9:37
5672 :
1.RabbitMQ AMQP端口
2.用于消息与客户端的通信
15672 :
1.RabbitMQ管理插件端口
2.用于访问图形化界面
25672 :
1.RabbitMQ 集群端口
概念
2024年10月25日
9:46
AMQP
2024年10月25日
10:37
定义 : Advanced Message Queuing Protocol 高级消息队列协议
1.应用层协议
2.用于在不同系统之间传递消息
3.RabbitMQ是AMQP的一个实现
Connection
2024年10月25日
9:50
定义 : 连接
1.客户端与RabbitMQ服务器之间的TCP连接
2.每当客户端与RabbitMQ服务器通信的时候,都需要通过连接才能通信
3.创建与管理连接开销过大,建议复用连接
Channel
2024年10月25日
9:52
定义 : 信道
1.客户端通过信道进行消息传输
2.一个Connection可以有多个信道
3.生产与消费都是通过信道进行
4.相比Connection更加轻量,通常是复用Connection并使用多个信道来处理任务
Virtual Host
2024年10月25日
10:33
定义 : 虚拟主机
1.RabbitMQ的一个逻辑分区,用于隔离不同应用
2.可以创建多个,每个都包含自己的Queue,Exchange,Binding
3.RabbitMQ自带一个虚拟主机,/
Client
2024年10月25日
10:05
定义 : 客户端
1.连接到RabbitMQ服务器的引用,包括Consumer和Producer
2.可以生产消息和消费消息
Broker
2024年10月25日
10:20
定义 : 消息代理
1.运行RabbitMQ实例的服务器
2.负责接收,存储,转发消息
3.RabbitMQ本质就是一个消息代理系统
流程 :
1.接收Producer生产的消息
2.存储在Queue中
3.根据由规则转发给Consumer
Queue
2024年10月25日
10:02
定义 : 队列
1.用于存储消息,存储到消息被消费或到达过期时间
2.消费者会从队列中消费消息
3.队列的名称具有唯一性
4.队列可以持久化(durable),RabbitMQ重启,消息依然保存
5.队列默认是内存模式,可以设置为懒加载模式
Binding
2024年10月25日
10:34
定义 : 绑定
1.将Exchange和Queue连接起来
2.指定消息从Exchange如何路由到Queue
Exchange
2024年10月25日
9:57
定义 : 交换机
1.用于接收生产者生产的消息,并路由到合适的队列
2.不会存储消息,而是通过路由规则将消息分发到队列
类型 :
Direct Exchange : 精确匹配路由
Fanout Exchange : 不考虑路由,广播到所有绑定该交换机的队列
Topic Exchange : 通配符匹配路由
Headers Exchange : 根据消息的头部属性匹配路由
Routing Key
2024年10月25日
10:32
定义 : 路由键
1.生产者发送消息时附带的字符串,用来指示Exchange如何路由消息到队列
规范 :
1.通常情况下,routingKey使用.来分隔不同的单词
Producer
2024年10月25日
10:06
定义 : 生产者
1.负责发送消息的应用
2.生产者不与Queue交互,而是通过Exchange进行消息路由
流程 :
1.生产者通过Exchange向RabbitMQ发送消息
2.Exchange根据路由规则将消息转发到合适的队列
Consumer
2024年10月25日
10:10
定义 : 消费者
1.从队列中接收和处理消息的应用
2.接收到消息后执行相应的处理操作
3.消费者可以通过Ack确认消息是否成功处理,手动确认时,消息只有ack之后才会从队列中删除
Ack
2024年10月25日
10:23
定义 : 确认
1.Consumer在成功消费消息之后发送给RabbitMQ的信号,表示消息已被成功处理
2.在Consumer发送确认之前,RabbitMQ不会从队列中删除这个消息
3.如果Consumer没有发送ack并断开连接,RabbitMQ会重新将消息放回队列
Message
2024年10月25日
10:30
定义 : 消息
1.由Producer生产,Consumer消费的数据单元
2.包含了消息体(数据)和一些元数据(路由键,头信息)
流程 :
1.Producer生产消息
2.Exchange路由到Queue
3.等待Consumer消费
TTL
2024年10月25日
10:56
定义 : 生存时间
1.RabbitMQ用于控制消息的生存时间,超过时间后自动删除
级别 :
Message TTL : 消息TTL
1.规定了消息在队列中的最大生存时间
2.如果指定时间内没有被消费,自动删除
3.如果有死信队列,会路由到死信队列进行处理
Queue TTL : 队列TTL
1.指定队列中消息默认的生存时间
2.如果超过这个时间,消息自动删除
3.相当于全局设置,该队列的消息都会应用这个生存时间,除非规定了消息TTL
工作模式
2024年10月25日
9:46
简单模式
2024年10月25日
17:27

特点 :
1.一个生产者
2.一个消费者
3.一个队列
4.路由键名等于队列名
5.配置类不需要绑定交换机(使用默认的交换机)
工作模式
2024年10月26日
8:51

特点 :
1.一个生产者
2.多个消费者
3.多个消费者之间是竞争关系
4.一个队列
5.路由键名等于队列名
6.配置类不需要绑定交换机(使用默认的交换机)
发布订阅模式
2024年10月26日
13:47

特点 : 广播模式
1.一个交换机,交换机类型为fanout,将消息广播到所有绑定队列
2.一个生产者
3.多个消费者
4.多个队列
5.队列传递空值
路由模式
2024年10月26日
16:05

特点 :
1.交换机的类型为direct
2.交换机根据路由键将消息发送到对应的队列
通配符模式
2024年10月26日
16:53

特点 :
1.交换机类型为topic
2.交换机根据路由键和通配符将消息发送到对应的队列
3.通配符类型在交换机绑定队列的时候设置
4.通配符用.号分割
属性 :
|
# |
匹配零个或多个词 |
|
* |
匹配一个词 |
高级特性
2024年10月29日
9:11
确认模式
2024年10月29日
9:15
作用 : Publisher Confirms
1.确认模式用于确保消息已经成功从Producer发送到Broker的Exchange中
2.保证消息的可靠性投递
3.确保消息不在生产阶段丢失
使用 :
1.如果消息成功到达Exchange,Broker会返回ack给生产者
2.如果消息没有到达Exchange,Broker会返回nack,触发ConfirmCallback回调,生产者可以选择重新发送或记录失败
配置 :
1.publisher-confirms = true//开启确认模式,调用方法设置或修改配置文件
|
spring: rabbitmq: # 开启生产者确认模式 |
2.编写确认模式回调方法
2.1可以实现RebbitTemplate.ConfirmCallback接口,重写confirm方法,完成回调方法调用(this)
退回模式
2024年10月29日
9:34
作用 : Returns
1.退回模式用于处理消息从Exchange到Queue的路由问题
2.确保消息在路由过程中能找到对应的Queue
3.如果没有找到,则退回生产者,生产者选择是重新发送消息或处理异常
使用 :
1.消息成功匹配到路由,进行消费
2.消息没有匹配到路由,Broker将消息退回,并触发ReturnCallback回调
配置 :
1.mandatory = true //开启退回模式,调用方法设置或修改配置文件
|
spring: rabbitmq: # 开启退回模式 template: # 设置消息必须投递成功 mandatory: true |
2.编写退回模式回调方法
2.1可以实现RebbitTemplate.ReturnCallback接口,重写confirm方法,完成回调方法调用(this)
消息确认模式
2024年10月29日
10:07
作用 :
1.Consumer接收到消息后的确认方式
2.手动确认模式,自动确认模式,拒绝模式
属性 :
|
AUTO |
自动确认消息 |
|
MANUAL |
手动确认消息 |
配置 :
1.@RabbitListener注解局部配置
2.配置文件全局配置
|
spring: rabbitmq: simple : # 手动确认模式 acknowledge-mode: manual |
使用 :
1.手动确认模式和拒绝模式场景下,方法参数添加Channel对象,进行处理
2.手动确认模式传递的消息信息需要是一个Message
死信队列
2024年10月29日
13:34
定义 : DeadLetter Queue
1.存储无法被消费的消息的队列
2.设置死信队列需要私信交换机(DLX – DeadLetter Exchange)
场景 :
1.队列存储的消息达到限制,根据先进先出原则,最先存储的消息进入死信队列
2.消费者拒绝消费消息,并且不把消息重新放回到原队列,进入死信队列
3.消息超过过期时间仍未被消费,进入死信队列
死信的处理方式 :
1.若该消息不重要,可以直接丢弃
2.死信入库,做后续的业务分析或处理
3.死信队列
使用 :
1.配置死信队列
2.配置死信交换机
3.绑定死信队列与死信交换机
4.配置需要死信机制的队列,绑定死信队列与死信交换机
5.配置死信队列消费者
参数 :
|
x-dead-letter-exchange |
指定死信交换机,值为死信交换机名 |
|
x-dead-letter-routing-key |
指定路由键,值为路由键名 |
延迟队列
2024年10月29日
14:53
作用 :
1.存储延时消息
2.当生产者生产消息后,并不想让消费者立刻拿到消息,而是等待一段时间后在对这个消息进行消费
场景 :
1.用户下单,30分钟后未接单,自动取消订单
2.新用户注册,30分钟后发送欢迎短信
配置 :
1.RabbitMQ中未直接实现延迟队列,可以通过ttl+死信队列的方式实现延迟队列
Rabbit配置文件
2024年10月26日
9:55
基本配置
2024年10月26日
9:56
spring:
rabbitmq:
host: #主机ip
port: #端口号
username: #用户名
password: #密码
virtual-host: #虚拟主机地址
分发策略
2024年10月26日
9:56
轮询分发
2024年10月26日
9:57
定义 :Round Robin
1.RabbitMQ的默认分发策略,将消息在消费者之间轮流分配
配置 : 默认就是轮询分发
公平分发
2024年10月26日
10:00
定义 : Fair Dispatch
1.根据消费者的处理能力来分发消息
2.RabbitMQ会将消息发送给空闲的消费者,以确保负载均衡
3.公平分发需要设置预取计数(prefetch count),确定了消费者每次接收消息的数量
属性 :
|
prefetch |
预取计数 |
配置 :
|
spring: rabbitmq: listener: simple: prefetch: 1 |
最大并发消费者数
2024年10月26日
10:03
定义 : Concurrent Consumers
1.控制并发消费者的并发数量
2.通过属性可以动态的控制消费者的数量
属性 :
|
concurrent |
最小并发数 |
|
max-concurrent |
最大并发数 |
配置 :
|
spring: rabbitmq: listener: simple: concurrent: 2 max-concurrent:5 |
SpringBoot搭建RabbitMQ
2024年10月25日
10:43
1.添加依赖
|
org.springframework.boot spring-boot-starter-amqp |
2.配置yaml文件
|
spring: rabbitmq: host: #主机ip port: #端口号 username: #用户名 password: #密码 virtual-host: #虚拟主机地址 |
2.创建配置类,用于配置交换机,队列和绑定
2.1当配置类只配置了队列,队列名等于路由键,Rabbit默认使用匿名的直接交换机(Default Deriect Exchange)
3.创建监听器,编写一个方法参数是Message对象,使用@RabbitListener注解标识监听的类名,消费消息
3.1如果是手动确认消息,需要接收Channel对象
|
@Configuration public class RabbitListener{ @RabbitListener(queues = “”) public void consume(Object msg){ xxxx } } |
4.测试类通过RabbitTemplate生产消息进行测试
4.1在启动测试类的时候可能会出现测试类生产的消息被测试类的消费者消费了,这时候就需要关闭测试类的消费者
@TestPropertySource(properties = “spring.rabbitmq.listener.simple.enable=false”)
工具类
2024年10月26日
15:13
ExchangeBulider
2024年10月25日
11:50
作用 : 用于构建RabbitMQ的交换机的工具类
声明方法 :
|
topicExchange(exchangeName) |
topic类型交换机 |
|
directExchange(exchangeName) |
direct类型交换机 |
|
fanoutExchange(exchangeName) |
fanout类型交换机 |
|
headerExchange(exchangeName) |
header类型交换机 |
属性方法 :
|
durable(boolean) |
是否持久化 |
|
autoDelete() |
没有绑定队列,自动删除 |
|
internal() |
内部交换机,只允许服务器内部使用,客户端不能发送消息 |
|
alternate(exchangeName) |
备用交换机,消息路由找不到目标队列时,消息会发送给备用交换机 |
|
delayed() |
延迟交换机,创建一个支持消息延迟的交换机,需要插件 |
|
withArgument(key,value) |
添加自定义参数,允许指定交换机的其他参数,如延迟参数,可以传递map |
|
ignoreDeclarationExceptions() |
忽略声明异常 |
构建方法 :
|
Exchange bulid() |
构建交换机对象 |
交换机参数
2024年10月29日
13:52
|
alternate-exchange |
指定一个备用交换机,用于接收无法路由的消息 |
QueueBulider
2024年10月25日
11:55
作用 : 用于构建RabbitMQ的队列的工具类
声明方法 :
|
durable(queueName) |
持久化队列 |
|
noDurable(queueName) |
非持久化队列 |
|
autoDelete() |
自动删除队列 |
属性方法 :
|
exclusive |
排他队列,仅限当前连接使用,断开连接后删除 |
|
ttl(long) |
设置过期时间,单位毫秒 |
|
expires(long) |
设置生存时间,指定时间没有使用,自动删除 |
|
maxLength(int) |
最大消息数,超过此数,删除旧消息 |
|
maxLengthBytes(long) |
最大字节数 |
|
deadLetterExchange(exchangeName) |
指定队列的死信交换机,未被消费的消息转发到该交换机 |
|
deadLetterRoutingKey(routingKey) |
为死信交换机设置路由键 |
|
maxPriority(int) |
设置队列优先级 |
|
lazy() |
设置队列为懒惰模式.消息会存储到磁盘 |
|
withArgument(key,value) |
自定义参数,可以传递map |
构建方法 :
|
Exchange bulid() |
构建队列对象 |
argument :
|
x-max-priority |
消息优先级 |
队列参数
2024年10月29日
13:52
|
x-dead-letter-exchange |
指定一个队列的死信交换机 |
|
x-dead-letter-routing-key |
为死信消息设置特定的路由键 |
|
x-message-ttl |
消息存活时间 |
|
x-expires |
队列存活时间 |
|
x-max-length |
队列最大消息数 |
|
x-max-length-bytes |
队列最大字节数 |
|
x-max-priority |
队列最大优先级 |
|
x-queue-mode |
设置队列模式 |
BindingBuilder
2024年10月25日
11:56
作用 : 用于将Queue绑定到Exchange的工具类
方法 :
|
bind(queue) |
要绑定的队列 |
|
to(exchange) |
队列绑定的交换机 |
构建方法 :
|
with(routingKey) |
指定路由键,适用于Direct和Topic交换机 |
|
withQueueName() |
队列名称作为路由键 |
对象
2024年10月26日
15:15
Exchange
2024年10月26日
15:16
作用 : 交换机对象
子类 :
FanoutExchange
DirectExchange
TopicExchange
HeadersExchange
构造方法 :
|
(“exchangeName”) |
Queue
2024年10月26日
15:16
作用 : 队列对象
构造方法 :
|
(“queueName”) |
Binding
2024年10月26日
15:16
作用 : 绑定对象
Channel
2024年10月29日
10:10
作用 : 信道对象
1.作为消费者方法的参数,用于手动确认消息是否消费成功
方法 :
|
basicAck(deliveryTag,multiple) |
消息确认 |
|
basicNack(deliveryTag,multiple,requeue) |
消息拒绝 |
|
basicReject(deliveryTag,requeue) |
拒绝单条消息 |
参数 :
|
deliveryTag |
消息的标签id |
|
multiple |
是否批量确认 |
|
requeue |
是否重新入队 |
RabbitTemplate
2024年10月25日
11:01
作用 :
1.Spring AMQP提供的用于与RabbitMQ交互的API
2.封装了消息发送,接收和转换等操作
方法 :
|
convertAndSend(exchange,routingKey,message) |
用于发送消息,自动将对象转换为AMQP消息 |
|
receiveAndConvert(queueName) |
从指定的队列中接收消息并转换为Java对象 |
|
send(exchange,routingKey,message) |
发送AMQP消息 |
|
receive(queueName) |
从队列接收原始AMQP消息 |
|
setConfirmCallback(ConfirmCallback) |
设置确认模式回调 |
|
setReturnCallback(ReturnCallback) |
设置退回模式回调 |
|
setMandatory(boolean) |
设置是否必须路由成功 |
Message
2024年10月25日
11:00
作用 :
1.表示RabbitMQ中的消息
2.封装了消息内容(Body)和消息的元数据(MessageProperties)
参数 :
|
body |
消息的主要数据,通常是一个字节数组 |
|
MessageProperties |
包含消息的头信息,如路由键,内容类型,优先级,TTL等 |
方法 :
|
Object getBody() |
获取消息的主要数据 |
|
MessageProperties getMessageProperties() |
获取消息的原数据 |
MessageProperties
2024年10月25日
11:04
作用 :
1.消息属性,包含消息的头信息,如路由键,内容类型,优先级,TTL等
方法 :
|
setContentType() |
定义消息的MIME类型,如application/json |
|
getContentType() |
获取内容类型 |
|
setContentEncoding(String contentEncoding) |
设置消息的内容编码,如UTF-8 |
|
getContentEncoding() |
获取消息内容编码 |
|
setHeader(key,value) |
设置自定义头信息 |
|
Map getHeaders() |
获取所有头信息 |
|
Object getHeader(key) |
获取指定头信息 |
|
setPriority(integer) |
设置消息优先级,值越高优先级越高 |
|
getPriority() |
获取消息优先级 |
|
setDdeliveryMode(MessageDeliveryMode deliveryMode) |
指定消息持久性,2表示持久,1表示非持久 |
|
getDeliveryMode() |
获取消息的持久性 |
|
setExpiration(str) |
设置消息的TTL,单位为毫秒 |
|
getExpiration() |
获取消息TTL |
|
getDeliveryTag() |
获取消息的标签Id |
注解
2024年10月26日
15:15
@RabbitListener
2024年10月25日
11:01
作用 :
1.简化RabbitMQ消费者逻辑的注解
2.能够自动侦听队列中的消息,并将消息交给方法处理
3.可以将AMQP消息自动转换为Java对象
属性 :
|
queues = “str” |
队列名,可以设置多个队列 |
|
concurrency = “5-10” |
指定最大小最小并发数 |
|
ackMode = “AUTO/MANUAL” |
消息确认模式,自动确认/手动确认,默认是自动确认 |
|
lazy = boolean |
容器是否在收起接收消息时才创建 |
🐮🍺