面试常问:
生产者出现堆积如何解决
- 增加消费者
- 优化消费者的处理速度
- 使用优先队列,优先处理优先级高的消息
- 消息压缩:如果网络带宽是瓶颈
- 升级设备
- 消息过滤:生产者端发送真正需要处理的消息
- 调整生产者的速度
生产消费过程中如果服务出现异常,如何恢复
- 重试机制:
- 死信队列:将无法处理的消息放在一个特殊的队列中,由人工去处理或者特殊处理
- 备份和恢复:对于重要数据应该定期进行备份,当服务出现异常时,从备份中恢复数据。
- 服务降级
- 容错和冗余设计
AMQP协议
- AMQ Model组件:
- Producer
- Exchange 交换器,从Producer中收集消息并根据路由规则发送到对应的消息队列中
- Queue 消息队列,存储消息,直到消息被安全的投递到了消费者
- Binding 定义了 mq 和 exchange之间的关系,是路由表
- Consumer
RocketMQ
- 生产者组->发送消息的一方
- 消息的类型:
- 普通消息:并发消息,并发消息没有顺序
- 分区有序消息:把一个Topic消息分为多个分区,保护和消费,分区内的消息就是队列,FIFO
- 全局有序:把一个Topic分区数设为1,所有的消息都支持FIFO
- 延迟消息:消息发送后,消费者不能立刻消费,需要等待
- 事务消息:涉及分布式事务,保证多个操作同时成功或者失败,消费者才能使用
- 消息保证机制:
- 客户端保证:重试机制和客户端容错(选择延迟较低的Broker来发送消息)
- Broker保证
- 消息发送流程:
- 业务层调用Client 发送API业务代码
- 消息处理层:Client获取消息对象后进行参数检查,准备和封装
- 通信层:基于Netty封装的RPC通信
- 消息的类型:
- Topic:
- 消费者组:
- 订阅关系:一个消费者组订阅一个Topic中的某一个Tag
- 消费模式:
- 集群消费模式:同一个组中的消费者实例负载均衡的消费Topic中的消息,消费进度保存在Broker端,即使应用崩溃,消费进度也不会出错
- 广播消费:所有消息广播分发,全部的消费者实例可以消费整个Topic中的所有消息,消费进度保存子啊客户端文件中,适用于通知其他服务刷新缓存
- 可靠消费保证:
- 重试-死信机制:正常Topic遭遇消费失败后->消息被保存在重试Topic中->多次间隔时间进行重新消费后仍然失败->进入死信Topic,经由人工处理,不会再被消费者消费
- Rebalance机制:重平衡, 用于在发生Broker掉线、Topic扩容和缩容、消费者扩容和缩容等变化时,自动感知并调整自身消费,以尽量减少甚至避免消息没有被消费。
- 消费方式:
- pull 用户主动pull消息,自主管理位点,由用户代码来进行管理
- push 自动pull消息,用户可直接使用,
- 消费过滤,Broker端可以根据tag进行消费过滤,只返回满足的tag,broker端使用Hash过滤,客户端再进行一次Tag字符串过滤, 因为Hash过来吧可以快速过滤大量数据,但是存在Hash碰撞
- Namesrv集群:一个无状态的元数据管理,Namesrv之于RocketMQ等价于Zookeeper之于Kafka。Topic路由注册和管理、Broker注册和发现的管理者
RabbitMQ
RabbitMQ主要架构
**
publisher
**:生产者,也就是发送消息的一方**
consumer
**:消费者,也就是消费消息的一方**
queue
**:队列,存储消息。生产者投递的消息会暂存在消息队列中,等待消费者处理**
exchange
**:交换机,负责消息路由。生产者发送的消息由交换机决定投递到哪个队列。不会持久化数据**
virtual host
**:虚拟主机,起到数据隔离的作用。每个虚拟主机相互独立,有各自的exchange、queue,也就是将exchange和queue进行分组
Spring中使用
使用Spring AMQP来实现
publisher
使用RabbitTemplate来发送消息
consumer
@Component
public class SpringRabbitListener {
// 利用RabbitListener来声明要监听的队列信息
// 将来一旦监听的队列中有了消息,就会推送给当前服务,调用当前方法,处理消息。
// 可以看到方法体中接收的就是消息体的内容
@RabbitListener(queues = "simple.queue")
public void listenSimpleQueueMessage(String msg) throws InterruptedException {
System.out.println("spring 消费者接收到消息:【" + msg + "】");
}
}
交换机的类型:
Fanout
广播,将消息交给所有绑定到交换机的队列。我们最早在控制台使用的正是Fanout交换机
特点:
- 可以有多个队列
- 每个队列都要绑定到Exchange
- 生产者发送的消息只能发送到交换机
- 交换机把消息发送给绑定过的所有队列
- 订阅队列的消费者都能拿到消息
Direct
订阅,基于RoutingKey(路由key)发送给订阅了消息的队列
特点:
- 队列需要与交换机绑定,指定一个RountingKey
- 消息发送方发送时也需要指定RountingKey
- Exchange把消息发送给RountingKey对应的队列
Topic
通配符订阅,与Direct类似,只不过RoutingKey可以使用通配符
#
:匹配一个或多个词*
:匹配不多不少恰好1个词
举例:item.#
:能够匹配item.spu.insert
或者item.spu
item.*
:只能匹配item.spu
与DIrect类似,但是可以使用通配符来进行队列绑定Headers:头匹配,基于MQ的消息头匹配,用的较少。