0%

MQ

面试常问:

生产者出现堆积如何解决

  1. 增加消费者
  2. 优化消费者的处理速度
  3. 使用优先队列,优先处理优先级高的消息
  4. 消息压缩:如果网络带宽是瓶颈
  5. 升级设备
  6. 消息过滤:生产者端发送真正需要处理的消息
  7. 调整生产者的速度

生产消费过程中如果服务出现异常,如何恢复

  1. 重试机制:
  2. 死信队列:将无法处理的消息放在一个特殊的队列中,由人工去处理或者特殊处理
  3. 备份和恢复:对于重要数据应该定期进行备份,当服务出现异常时,从备份中恢复数据。
  4. 服务降级
  5. 容错和冗余设计

AMQP协议

  • AMQ Model组件:
    1. Producer
    2. Exchange 交换器,从Producer中收集消息并根据路由规则发送到对应的消息队列中
    3. Queue 消息队列,存储消息,直到消息被安全的投递到了消费者
    4. Binding 定义了 mq 和 exchange之间的关系,是路由表
    5. Consumer

RocketMQ

常见问题

  1. 生产者组->发送消息的一方
    • 消息的类型:
      1. 普通消息:并发消息,并发消息没有顺序
      2. 分区有序消息:把一个Topic消息分为多个分区,保护和消费,分区内的消息就是队列,FIFO
      3. 全局有序:把一个Topic分区数设为1,所有的消息都支持FIFO
      4. 延迟消息:消息发送后,消费者不能立刻消费,需要等待
      5. 事务消息:涉及分布式事务,保证多个操作同时成功或者失败,消费者才能使用
    • 消息保证机制:
      1. 客户端保证:重试机制和客户端容错(选择延迟较低的Broker来发送消息)
      2. Broker保证
    • 消息发送流程:
      1. 业务层调用Client 发送API业务代码
      2. 消息处理层:Client获取消息对象后进行参数检查,准备和封装
      3. 通信层:基于Netty封装的RPC通信
  2. Topic:
  3. 消费者组:
    • 订阅关系:一个消费者组订阅一个Topic中的某一个Tag
    • 消费模式:
      • 集群消费模式:同一个组中的消费者实例负载均衡的消费Topic中的消息,消费进度保存在Broker端,即使应用崩溃,消费进度也不会出错
      • 广播消费:所有消息广播分发,全部的消费者实例可以消费整个Topic中的所有消息,消费进度保存子啊客户端文件中,适用于通知其他服务刷新缓存
    • 可靠消费保证:
      1. 重试-死信机制:正常Topic遭遇消费失败后->消息被保存在重试Topic中->多次间隔时间进行重新消费后仍然失败->进入死信Topic,经由人工处理,不会再被消费者消费
      2. Rebalance机制:重平衡, 用于在发生Broker掉线、Topic扩容和缩容、消费者扩容和缩容等变化时,自动感知并调整自身消费,以尽量减少甚至避免消息没有被消费。
    • 消费方式:
      1. pull 用户主动pull消息,自主管理位点,由用户代码来进行管理
      2. push 自动pull消息,用户可直接使用,
    • 消费过滤,Broker端可以根据tag进行消费过滤,只返回满足的tag,broker端使用Hash过滤,客户端再进行一次Tag字符串过滤, 因为Hash过来吧可以快速过滤大量数据,但是存在Hash碰撞
  • Namesrv集群:一个无状态的元数据管理,Namesrv之于RocketMQ等价于Zookeeper之于Kafka。Topic路由注册和管理、Broker注册和发现的管理者

RabbitMQ

常见问题

RabbitMQ主要架构

RabbitMQ架构

  • **publisher**:生产者,也就是发送消息的一方

  • **consumer**:消费者,也就是消费消息的一方

  • **queue**:队列,存储消息。生产者投递的消息会暂存在消息队列中,等待消费者处理

  • **exchange**:交换机,负责消息路由。生产者发送的消息由交换机决定投递到哪个队列。不会持久化数据

  • **virtual host**:虚拟主机,起到数据隔离的作用。每个虚拟主机相互独立,有各自的exchange、queue,也就是将exchange和queue进行分组

Spring中使用

使用Spring AMQP来实现

publisher

使用RabbitTemplate来发送消息

consumer

@Component
public class SpringRabbitListener {
        // 利用RabbitListener来声明要监听的队列信息
    // 将来一旦监听的队列中有了消息,就会推送给当前服务,调用当前方法,处理消息。
    // 可以看到方法体中接收的就是消息体的内容
    @RabbitListener(queues = "simple.queue")
    public void listenSimpleQueueMessage(String msg) throws InterruptedException {
        System.out.println("spring 消费者接收到消息:【" + msg + "】");
    }
}

交换机的类型:

Fanout

广播,将消息交给所有绑定到交换机的队列。我们最早在控制台使用的正是Fanout交换机
特点:

  1. 可以有多个队列
  2. 每个队列都要绑定到Exchange
  3. 生产者发送的消息只能发送到交换机
  4. 交换机把消息发送给绑定过的所有队列
  5. 订阅队列的消费者都能拿到消息

Direct

订阅,基于RoutingKey(路由key)发送给订阅了消息的队列
特点:

  1. 队列需要与交换机绑定,指定一个RountingKey
  2. 消息发送方发送时也需要指定RountingKey
  3. Exchange把消息发送给RountingKey对应的队列

Topic

通配符订阅,与Direct类似,只不过RoutingKey可以使用通配符

  • #:匹配一个或多个词

  • *:匹配不多不少恰好1个词
    举例:

  • item.#:能够匹配item.spu.insert 或者 item.spu

  • item.*:只能匹配item.spu
    与DIrect类似,但是可以使用通配符来进行队列绑定

  • Headers:头匹配,基于MQ的消息头匹配,用的较少。