articleList

08-SpringBoot2.X+SpringAMQP 整合 RabbitMQ 实战

2025/03/17 posted in  RabbitMQ
Tags: 

什么是 Spring AMQP

  • 官网: https://spring.io/projects/spring-amqp
  • Spring 框架的 AMQP 消息解决方案,提供模板化的发送和接收消息的抽象层,提供基于消息驱动的 POJO 的消息监听等
  • 提供不依赖于任何特定的 AMQP 代理实现或客户端库通用的抽象,最终用户代码将很容易实现更易替换、添加和删除 AMQP,因为它可以只针对抽象层来开发
  • 总之就是提高我们的框架整合消息队列的效率,SpringBoot 为更方便开发 RabbitMQ 推出了 starter
  • 我们使用 spring-boot-starter-amqp 进行开发

SpringBoot2.X 整合 RabbitMQ 实战

  • 添加依赖

    <!--引入AMQP-->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
    
  • yml 配置文件修改

    #消息队列
    spring:
      rabbitmq:
        host: 47.97.190.214
        port: 5672
        virtual-host: /dev
        password: password
        username: admin
    
  • RabbitMQConfig 文件

    @Configuration
    public class RabbitMQConfig {
    
        public static final String EXCHANGE_NAME = "order_exchange";
        public static final String QUEUE_NAME = "order_queue";
    
        /**
        * 交换机
        *
        * @return
        */
        @Bean
        public Exchange orderExchange() {
            return ExchangeBuilder.topicExchange(EXCHANGE_NAME).durable(true).build();
            //return new TopicExchange(EXCHANGE_NAME, true, false);
        }
    
        /**
        * 队列
        *
        * @return
        */
        @Bean
        public Queue orderQueue() {
            return QueueBuilder.durable(QUEUE_NAME).build();
            //return new Queue(QUEUE_NAME, true, false, false, null);
        }
    
        /**
        * 交换机和队列绑定关系
        */
        @Bean
        public Binding orderBinding(Queue queue, Exchange exchange) {
            return BindingBuilder.bind(queue).to(exchange).with("order.#").noargs();
            //return new Binding(QUEUE_NAME, Binding.DestinationType.QUEUE, EXCHANGE_NAME, "order.#", null);
        }
    
    }
    
  • 消息生产者-测试类

    @SpringBootTest
    class DemoApplicationTests {
    
        @Autowired
        private RabbitTemplate template;
    
        @Test
        void send() {
            template.convertAndSend(RabbitMQConfig.EXCHANGE_NAME, "order.new", "新订单来啦!");
        }
    
    }
    
  • 消息消费者

    @Component
    @RabbitListener(queues = "order_queue")
    public class OrderMQListener {
    
        /**
        * RabbitHandler 会自动匹配 消息类型(消息自动确认)
        *
        * @param msg
        * @param message
        * @throws IOException
        */
        @RabbitHandler
        public void releaseCouponRecord(String msg, Message message) throws IOException {
    
            long msgTag = message.getMessageProperties().getDeliveryTag();
            System.out.println("msgTag=" + msgTag);
            System.out.println("message=" + message.toString());
            System.out.println("监听到消息:消息内容:" + message.getBody());
    
        }
    
    }