articleList

06-RabbitMQ 交换机和发布订阅模型

2025/03/17 posted in  RabbitMQ
Tags: 

RabbitMQ 的 Exchange 交换机

  • 生产者将消息发送到 Exchange,交换机将消息路由到一个或者多个队列中,交换机有多个类型,队列和交换机是多对多的关系
  • 交换机只负责转发消息,不具备存储消息的能力,如果没有队列和 exchange 绑定,或者没有符合的路由规则,则消息会被丢失
  • RabbitMQ 有四种交换机类型,分别是 Direct exchange、Fanout exchange、Topic exchange、Headers exchange,最后的基本不用

交换机类型

  • Direct Exchange 定向

    • 将一个队列绑定到交换机上,要求该消息与一个特定的路由键完全匹配
    • 例子:如果一个队列绑定到该交换机上要求路由键“aabb”,则只有被标记为“aabb”的消息才被转发,不会转发 aabb.cc,也不会转发 gg.aabb,只会转发 aabb
    • 处理路由键
  • Fanout Exchange 广播

    • 只需要简单的将队列绑定到交换机上,一个发送到交换机的消息都会被转发到与该交换机绑定的所有队列上。很像子网广播,每台子网内的主机都获得了一份复制的消息
    • Fanout 交换机转发消息是最快的,用于发布订阅,广播形式,中文是扇形
    • 不处理路由键
  • Topic Exchange 通配符

    • 主题交换机是一种发布/订阅的模式,结合了直连交换机与扇形交换机的特点
    • 将路由键和某模式进行匹配。此时队列需要绑定要一个模式上
    • 符号“#”匹配一个或多个词,符号“*”匹配不多不少一个词
    • 例子:因此“abc.#”能够匹配到“abc.def.ghi”,但是“abc.*”只会匹配到“abc.def”
    • 处理路由键
  • Headers Exchanges(少用)

    • 根据发送的消息内容中的 headers 属性进行匹配,在绑定 Queue 与 Exchange 时指定一组键值对
    • 当消息发送到 RabbitMQ 时会取到该消息的 headers 与 Exchange 绑定时指定的键值对进行匹配
    • 如果完全匹配则消息会路由到该队列,否则不会路由到该队列
    • 不处理路由键

RabbitMQ 的发布订阅消息模型介绍

  • 什么是 rabbitmq 的发布订阅模型

    • 发布-订阅模型中,消息生产者不再是直接面对 queue(队列名称),而是直面 exchange,都需要经过 exchange 来进行消息的发送,所有发往同一个 fanout 交换机的消息都会被所有监听这个交换机的消费者接收到
    • 发布订阅-消息模型引入 fanout 交换机
    • 文档: https://www.rabbitmq.com/tutorials/tutorial-three-java.html
  • 发布订阅模型应用场景

    • 微信公众号
    • 新浪微博关注

  • rabbitmq 发布订阅模型

    • 通过把消息发送给交换机,交换机转发给对应绑定的队列
    • 交换机绑定的队列是排它独占队列,自动删除

  • RabbitMQ 的发布订阅消息模型代码实战

    • 发送端

      public class Send {
      
          private final static String EXCHANGE_NAME = "exchange_fanout";
      
          public static void main(String[] argv) throws Exception {
              ConnectionFactory factory = new ConnectionFactory();
              factory.setHost("47.97.190.214");
              factory.setUsername("admin");
              factory.setPassword("password");
              factory.setVirtualHost("/dev");
              factory.setPort(5672);
      
              /**
              * 消息生产者不用过多操作,只需要和交换机绑定即可
              */
              try (//创建连接
                  Connection connection = factory.newConnection();
                  //创建信道
                  Channel channel = connection.createChannel()) {
      
                  //绑定交换机,fanout扇形,即广播类型
                  channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
      
                  String message = "Hello World pub !";
                  channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes(StandardCharsets.UTF_8));
                  System.out.println(" [x] Sent '" + message + "'");
              }
          }
      }
      
    • 消费端(两个节点)

      public class Recv1 {
      
          private final static String EXCHANGE_NAME = "exchange_fanout";
      
          public static void main(String[] argv) throws Exception {
              ConnectionFactory factory = new ConnectionFactory();
              factory.setHost("47.97.190.214");
              factory.setUsername("admin");
              factory.setPassword("password");
              factory.setVirtualHost("/dev");
              factory.setPort(5672);
      
              //消费者一般不增加自动关闭
              Connection connection = factory.newConnection();
              Channel channel = connection.createChannel();
      
              //绑定交换机,fanout扇形,即广播类型
              channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
      
              //获取队列(排它队列)
              String queueName = channel.queueDeclare().getQueue();
      
              //绑定队列和交换机,fanout交换机不用指定routingkey
              channel.queueBind(queueName, EXCHANGE_NAME, "");
      
              DeliverCallback deliverCallback = (consumerTag, delivery) -> {
                  String message = new String(delivery.getBody(), "UTF-8");
                  System.out.println(" [x] Received '" + message + "'");
              };
      
              //自动确认消息
              channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {
              });
          }
      }
      
    • 验证

      • 启动两个消费者,一个生产者发送消息