articleList

09-RabbitMQ 消息可靠性投递+消费

2025/03/17 posted in  RabbitMQ
Tags: 

RabbitMQ 消息可靠性投递讲解

  • 什么是消息的可靠性投递

    • 保证消息百分百发送到消息队列中去

    • 详细

      • 保证 mq 节点成功接受消息
      • 消息发送端需要接受到 mq 服务端接受到消息的确认应答
      • 完善的消息补偿机制,发送失败的消息可以再感知并二次处理
  • RabbitMQ 消息投递路径

    • 生产者-->交换机->队列->消费者

    • 通过两个点控制消息的可靠性投递

      • 生产者到交换机

        • 通过 confirmCallback
      • 交换机到队列

        • 通过 returnCallback
  • 建议

    • 开启消息确认机制以后,保证了消息的准确送达,但由于频繁的确认交互, rabbitmq 整体效率变低,吞吐量下降严重,不是非常重要的消息真心不建议用消息确认机制

Rabbitmq 的 消息可靠性投递 confirmCallback 实战

  • 生产者到交换机

    • 通过 confirmCallback
    • 生产者投递消息后,如果 Broker 收到消息后,会给生产者一个 ACK。生产者通过 ACK,可以确认这条消息是否正常发送到 Broker,这种方式是消息可靠性投递的核心
  • 开启 confirmCallback

    • #旧版,确认消息发送成功,通过实现 ConfirmCallBack 接口,消息发送到交换器 Exchange 后触发回调

      spring.rabbitmq.publisher-confirms=true
      
    • #新版,NONE 值是禁用发布确认模式,是默认值,CORRELATED 值是发布消息成功到交换器后会触发回调方法

      spring.rabbitmq.publisher-confirm-type: correlated
      
  • 开发实战

    @Autowired
    private RabbitTemplate template;
    
    @Test
    void testConfirmCallback() {
    
        template.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
            /**
            *
            * @param correlationData 配置
            * @param ack 交换机是否收到消息,true是成功,false是失败
            * @param cause 失败的原因
            */
            @Override
            public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                System.out.println("confirm=====>");
                System.out.println("confirm==== ack=" + ack);
                System.out.println("confirm==== cause=" + cause);
    
                //根据ACK状态做对应的消息更新操作 TODO
            }
        });
    
        template.convertAndSend(RabbitMQConfig.EXCHANGE_NAME, "order.new", "新订单来啦!");
    
    }
    
  • 模拟异常

    • 修改投递的交换机名称

Rabbitmq 的消息可靠性投递 returnCallback 实战

  • 交换机到队列

    • 通过 returnCallback

    • 消息从交换机发送到对应队列失败时触发

    • 两种模式

      • 交换机到队列不成功,则丢弃消息(默认)
      • 交换机到队列不成功,返回给消息生产者,触发 returnCallback
      //为true,则交换机处理消息到路由失败,则会返回给生产者
      //或者配置文件 spring.rabbitmq.template.mandatory=true
      template.setMandatory(true);
      
  • 第一步 开启 returnCallback 配置

    #新版
    spring.rabbitmq.publisher-returns=true
    
  • 第二步 修改交换机投递到队列失败的策略

    #为true,则交换机处理消息到路由失败,则会返回给生产者
    spring.rabbitmq.template.mandatory=true
    
  • 开发实战

    @Test
    void testReturnCallback() {
    
        //为true,则交换机处理消息到路由失败,则会返回给生产者
        //开启强制消息投递(mandatory为设置为true),但消息未被路由至任何一个queue,则回退一条消息
        template.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
            @Override
            public void returnedMessage(ReturnedMessage returned) {
                int code = returned.getReplyCode();
                System.out.println("code=" + code);
                System.out.println("returned=" + returned.toString());
            }
        });
    
        template.convertAndSend(RabbitMQConfig.EXCHANGE_NAME, "xxx.order.new", "新订单来啦!");
    
    }
    
  • 模拟异常

    • 修改路由 key,拼接不存在的路由

Rabbitmq 的消息确机制 ACK 讲解

  • 背景

    • 消费者从 broker 中监听消息,需要确保消息被合理处理
  • RabbitMQ 的 ACK 介绍

    • 消费者从 RabbitMQ 收到消息并处理完成后,反馈给 RabbitMQ,RabbitMQ 收到反馈后才将此消息从队列中删除
    • 消费者在处理消息出现了网络不稳定、服务器异常等现象,那么就不会有 ACK 反馈,RabbitMQ 会认为这个消息没有正常消费,会将消息重新放入队列中
    • 只有当消费者正确发送 ACK 反馈,RabbitMQ 确认收到后,消息才会从 RabbitMQ 服务器的数据中删除
    • 消息的 ACK 确认机制默认是打开的,消息如未被进行 ACK,这条消息被锁定 Unacked
  • 确认方式

    • 自动确认(默认)

    • 手动确认 manual

      spring:
        rabbitmq:
          #开启手动确认消息,如果消息重新入队,进行重试
          listener:
            simple:
              acknowledge-mode: manual
      
    • 其他(基本不用,忽略)

RabbitMQ 消息确认机制 ACK 配置实战+DeliveryTag+Reject 介绍

  • 代码实战

    @RabbitHandler
    public void releaseCouponRecord(String body, Message message, Channel channel) throws IOException {
    
        long msgTag = message.getMessageProperties().getDeliveryTag();
        System.out.println("msgTag=" + msgTag);
        System.out.println("message=" + message.toString());
        System.out.println("body=" + body);
    
        //成功确认,使用此回执方法后,消息会被 rabbitmq broker 删除
        //channel.basicAck(msgTag, false);
        //channel.basicNack(msgTag, false, true);
        //channel.basicReject(msgTag, true);
    }
    
  • deliveryTag 介绍

    • 表示消息投递序号,每次消费消息或者消息重新投递后, deliveryTag 都会增加
  • basicNack 和 basicReject 介绍

    • basicReject 一次只能拒绝接收一个消息,可以设置是否 requeue
    • basicNack 方法可以支持一次 0 个或多个消息的拒收,可以设置是否 requeue
  • 人工审核异常消息

    • 设置重试阈值,超过后确认消费成功,记录消息,人工处理