articleList

07-RabbitMQ 路由、主题模式

2025/03/17 posted in  RabbitMQ
Tags: 

RabbitMQ 的路由模式和应用场景

  • 什么是 rabbitmq 的路由模式

    • 文档: https://www.rabbitmq.com/tutorials/tutorial-four-java.html
    • 交换机类型是 Direct
    • 队列和交换机绑定,需要指定一个路由 key(也叫 Bingding Key)
    • 消息生产者发送消息给交换机,需要指定 routingKey
    • 交换机根据消息的路由 key,转发给对应的队列
  • 应用场景

    • 日志采集系统 ELK

      • 一个队列收集 error 信息->告警
      • 一个队列收集全部信息->日常使用

RabbitMQ 路由模式代码实战

  • 消息生产者

    public class Send {
    
        private final static String EXCHANGE_NAME = "exchange_direct";
    
        public static void main(String[] argv) throws Exception {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("47.97.190.214");
            factory.setUsername("admin");
            factory.setPassword("password");
            factory.setVirtualHost("/dev");
            factory.setPort(5672);
    
            /**
            * 消息生产者不用过多操作,只需要和交换机绑定即可
            */
    
            try (//创建连接
                Connection connection = factory.newConnection();
                //创建信道
                Channel channel = connection.createChannel()) {
    
                //绑定交换机,直连交换机
                channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
    
                String error = "我是错误日志";
                String info = "我是info日志";
                String debug = "我是debug日志";
                channel.basicPublish(EXCHANGE_NAME, "errorRoutingKey", null, error.getBytes(StandardCharsets.UTF_8));
                channel.basicPublish(EXCHANGE_NAME, "infoRoutingKey", null, info.getBytes(StandardCharsets.UTF_8));
                channel.basicPublish(EXCHANGE_NAME, "debugRoutingKey", null, debug.getBytes(StandardCharsets.UTF_8));
    
                System.out.println("消息发送成功");
            }
        }
    }
    
  • 消息消费者(两个节点)

    public class Recv1 {
    
        private final static String EXCHANGE_NAME = "exchange_direct";
    
        public static void main(String[] argv) throws Exception {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("47.97.190.214");
            factory.setUsername("admin");
            factory.setPassword("password");
            factory.setVirtualHost("/dev");
            factory.setPort(5672);
    
            //消费者一般不增加自动关闭
            Connection connection = factory.newConnection();
            Channel channel = connection.createChannel();
    
            //绑定交换机,direct类型
            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
    
            //获取队列
            String queueName = channel.queueDeclare().getQueue();
    
            //绑定队列和交换机,另外一个节点只绑定一个 errorRoutingKey
            channel.queueBind(queueName, EXCHANGE_NAME, "errorRoutingKey");
            channel.queueBind(queueName, EXCHANGE_NAME, "infoRoutingKey");
            channel.queueBind(queueName, EXCHANGE_NAME, "debugRoutingKey");
    
            DeliverCallback deliverCallback = (consumerTag, delivery) -> {
                String message = new String(delivery.getBody(), "UTF-8");
                System.out.println(" [x] Received '" + message + "'");
            };
    
            //自动确认消息
            channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {
            });
        }
    }
    
  • web 控制台查看交换机绑定的队列及 routingKey

  • web 控制台查看交换机类型

RabbitMQ 的 topic 主题通配符模式和应用场景

  • 背景

    • 如果业务有多路由 key,怎么维护?
    • topic 交换机,支持通配符匹配模式,更加强大
    • 工作基本都是用这个 topic 模式
  • 什么是 rabbitmq 的主题模式

    • 文档: https://www.rabbitmq.com/tutorials/tutorial-five-java.html

    • 交换机是 topic,可以实现发布订阅模式 fanout 和路由模式 Direct 的功能,更加灵活,支持模式匹配,通配符等

    • 交换机通过通配符进行转发到对应的队列,_ 代表一个词,# 代表 1 个或多个词,一般用 # 作为通配符居多,比如 #.order,会匹配 info.order、sys.error.order,而 _.order,只会匹配 info.order,之间是使用 . 点进行分割多个词的;如果是 *.order, 则 info.order、error.order 都会匹配

    • 注意

      • 交换机和队列绑定时用的 binding 使用通配符的路由键
      • 生产者发送消息时需要使用具体的路由键
  • 测试,下面的匹配规则是怎样的

    • quick.orange.rabbit 只会匹配 .orange...rabbit,进到 Q1 和 Q2
    • lazy.orange.elephant 只会匹配 .orange. 和 lazy.#,进到 Q1 和 Q2
    • quick.orange.fox 只会匹配 .orange.,进入 Q1
    • lazy.brown.fox 只会匹配 lazy.#,进入 Q2
    • lazy.pink.rabbit 只会匹配 lazy.# 和 ..rabbit,同个队列进入 Q2(消息只会发一次)
    • quick.brown.fox 没有匹配,默认会被丢弃,可以通过回调监听二次处理
    • lazy.orange.male.rabbit 只会匹配 lazy.#,进入 Q2

  • 应用场景

    • 日志采集系统

      • 一个队列收集订单系统的全部日志信息, order.log.*
      • 一个队列收集全部系统的全部日志信息, .log.

RabbitMQ 的 topic 主题模式代码实战

  • 生产者

    public class Send {
    
        private final static String EXCHANGE_NAME = "exchange_topic";
    
        public static void main(String[] argv) throws Exception {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("47.97.190.214");
            factory.setUsername("admin");
            factory.setPassword("password");
            factory.setVirtualHost("/dev");
            factory.setPort(5672);
    
            /**
            * 消息生产者不用过多操作,只需要和交换机绑定即可
            */
    
            try (//创建连接
                Connection connection = factory.newConnection();
                //创建信道
                Channel channel = connection.createChannel()) {
    
                //绑定交换机,直连交换机
                channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
    
                String error = "我是订单错误日志";
                String info = "我是订单info日志";
                String debug = "我是商品debug日志";
                channel.basicPublish(EXCHANGE_NAME, "order.log.error", null, error.getBytes(StandardCharsets.UTF_8));
                channel.basicPublish(EXCHANGE_NAME, "order.log.info", null, info.getBytes(StandardCharsets.UTF_8));
                channel.basicPublish(EXCHANGE_NAME, "product.log.debug", null, debug.getBytes(StandardCharsets.UTF_8));
    
                System.out.println("消息发送成功");
            }
        }
    }
    
  • 消费者(两个)

    public class Recv1 {
    
        private final static String EXCHANGE_NAME = "exchange_topic";
    
        public static void main(String[] argv) throws Exception {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("47.97.190.214");
            factory.setUsername("admin");
            factory.setPassword("password");
            factory.setVirtualHost("/dev");
            factory.setPort(5672);
    
            //消费者一般不增加自动关闭
            Connection connection = factory.newConnection();
            Channel channel = connection.createChannel();
    
            //绑定交换机
            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
    
            //获取队列
            String queueName = channel.queueDeclare().getQueue();
    
            //绑定队列和交换机,第一个节点
            //channel.queueBind(queueName, EXCHANGE_NAME, "order.log.error");
    
            //绑定队列和交换机, 第二个节点
            //channel.queueBind(queueName, EXCHANGE_NAME, "*.log.*");
    
            DeliverCallback deliverCallback = (consumerTag, delivery) -> {
                String message = new String(delivery.getBody(), "UTF-8");
                System.out.println(" [x] Received '" + message + "'");
            };
    
            //自动确认消息
            channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {
            });
        }
    }
    

RabbitMQ 的多种工作模式总结

  • 对照官网回顾总结

  • 简单模式

    • 一个生产,一个消费,不用指定交换机,使用默认交换机
  • 工作队列模式

    • 一个生产,多个消费,可以有轮询和公平策略,不用指定交换机,使用默认交换机
  • 发布订阅模式

    • fanout 类型交换机,通过交换机和队列绑定,不用指定绑定的路由键,生产者发送消息到交换机,fanout 交换机直接进行转发,消息不用指定 routingkey 路由键
  • 路由模式

    • direct 类型交换机,过交换机和队列绑定,指定绑定的路由键,生产者发送消息到交换机,交换机根据消息的路由 key 进行转发到对应的队列,消息要指定 routingkey 路由键
  • 通配符模式

    • topic 交换机,过交换机和队列绑定,指定绑定的【通配符路由键】,生产者发送消息到交换机,交换机根据消息的路由 key 进行转发到对应的队列,消息要指定 routingkey 路由键