articleList

05-RabbitMQ 工作队列模型

2025/03/17 posted in  RabbitMQ
Tags: 

Java 项目创建并整合 RabbitMQ

  • 创建 Maven 项目

  • 添加依赖

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <maven.compiler.source>11</maven.compiler.source>
        <maven.compiler.target>11</maven.compiler.target>
    </properties>
    ​
    <dependencies>
        <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>5.10.0</version>
        </dependency>
    </dependencies>
    
  • 涉及重复代码,关闭 idea 重复代码检测

RabbitMQ 简单队列实战

  • 简单队列测试

  • 消息生产者

    public class Send {
    
        private final static String QUEUE_NAME = "hello";
    
        public static void main(String[] argv) throws Exception {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("47.97.190.214");
            factory.setUsername("admin");
            factory.setPassword("password");
            factory.setVirtualHost("/dev");
            factory.setPort(5672);
            try (   //JDK7语法 或自动关闭 connnection和channel
                    //创建连接
                    Connection connection = factory.newConnection();
                    //创建信道
                    Channel channel = connection.createChannel()) {
                /**
                * 队列名称
                * 持久化配置:mq重启后还在
                * 是否独占:只能有一个消费者监听队列;当connection关闭是否删除队列,一般是false,发布订阅是独占
                * 自动删除: 当没有消费者的时候,自动删除掉,一般是false
                * 其他参数
                *
                * 队列不存在则会自动创建,如果存在则不会覆盖,所以此时的时候需要注意属性
                */
                channel.queueDeclare(QUEUE_NAME, false, false, false, null);
                String message = "Hello World!";
                /**
                * 参数说明:
                * 交换机名称:不写则是默认的交换机,那路由键需要和队列名称一样才可以被路由,
                * 路由键名称
                * 配置信息
                * 发送的消息数据:字节数组
                */
                channel.basicPublish("", QUEUE_NAME, null, message.getBytes(StandardCharsets.UTF_8));
                System.out.println(" [x] Sent '" + message + "'");
            }
        }
    }
    
  • 消息消费者(会一直监听队列)

    public class Recv {
    
        private final static String QUEUE_NAME = "hello";
    
        public static void main(String[] argv) throws Exception {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("47.97.190.214");
            factory.setUsername("admin");
            factory.setPassword("password");
            factory.setVirtualHost("/dev");
            factory.setPort(5672);
    
            //消费者一般不增加自动关闭
            Connection connection = factory.newConnection();
            Channel channel = connection.createChannel();
    
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
    
            //回调方法,下面两种都行
            Consumer consumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties
                        properties, byte[] body) throws IOException {
                    // consumerTag 是固定的 可以做此会话的名字, deliveryTag 每次接收消息+1
                    System.out.println("consumerTag消息标识="+consumerTag);
                    //可以获取交换机,路由键等
                    System.out.println("envelope元数据="+envelope);
    
                    System.out.println("properties配置信息="+properties);
    
                    System.out.println("body="+new String(body,"utf-8"));
                }
            };
            channel.basicConsume(QUEUE_NAME, true, consumer);
    
            //        DeliverCallback deliverCallback = (consumerTag, envelop, delivery,properties, msg) -> {
            //            String message = new String(msg, "UTF-8");
            //            System.out.println(" [x] Received '" + message + "'");
            //        };
    
            //自动确认消息
            //channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });
        }
    }
    

RabbitMQ 工作队列轮询策略实战

  • 工作队列

    • 消息生产能力大于消费能力,增加多几个消费节点
    • 和简单队列类似,增加多几个消费节点,处于竞争关系
    • 默认策略:round robin 轮询
    • 每个消费者获得相同数量的消息

  • 消息生产者

    public class Send {
    
        private final static String QUEUE_NAME = "work_mq_rr";
    
        public static void main(String[] argv) throws Exception {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("47.97.190.214");
            factory.setUsername("admin");
            factory.setPassword("password");
            factory.setVirtualHost("/dev");
            factory.setPort(5672);
    
            try (
                    //创建连接
                    Connection connection = factory.newConnection();
                    //创建信道
                    Channel channel = connection.createChannel()) {
    
                /**
                * 队列名称
                * 持久化配置
                * 排他配置
                * 自动删除
                * 其他参数
                */
                channel.queueDeclare(QUEUE_NAME, false, false, false, null);
                //轮询发送 10 个
                for (int i = 0; i < 10; i++) {
                    String message = "Hello World!" + i;
                    channel.basicPublish("", QUEUE_NAME, null, message.getBytes(StandardCharsets.UTF_8));
                    System.out.println(" [x] Sent '" + message + "'");
                }
            }
        }
    }
    
  • 消费者代码 1

    public class Recv1 {
    
        private final static String QUEUE_NAME = "work_mq_rr";
    
        public static void main(String[] argv) throws Exception {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("47.97.190.214");
            factory.setUsername("admin");
            factory.setPassword("password");
            factory.setVirtualHost("/dev");
            factory.setPort(5672);
    
            Connection connection = factory.newConnection();
            Channel channel = connection.createChannel();
    
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            System.out.println(" [*]Waiting for messages. To exit press CTRL+C");
    
            DeliverCallback deliverCallback = (consumerTag, delivery) -> {
                //模拟消费缓慢
                try {
                    TimeUnit.SECONDS.sleep(6);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
    
                String message = new String(delivery.getBody(), "UTF-8");
                System.out.println("[x] Received '" + message + "'");
    
                //手工确认消息消费,不是多条确认
                channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
            };
    
            //关闭自动确认消息
            channel.basicConsume(QUEUE_NAME, false, deliverCallback, consumerTag -> {
            });
        }
    }
    
  • 消费者代码 2

    public class Recv2 {
    
        private final static String QUEUE_NAME = "work_mq_rr";
    
        public static void main(String[] argv) throws Exception {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("47.97.190.214");
            factory.setUsername("admin");
            factory.setPassword("password");
            factory.setVirtualHost("/dev");
            factory.setPort(5672);
    
            Connection connection = factory.newConnection();
            Channel channel = connection.createChannel();
    
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            System.out.println(" [*]  Waiting for messages. To exit press CTRL+C");
    
            DeliverCallback deliverCallback = (consumerTag, delivery) -> {
                //模拟消费缓慢
                try {
                    TimeUnit.SECONDS.sleep(2);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
    
                String message = new String(delivery.getBody(), "UTF-8");
                System.out.println(" [x] Received '" + message + "'");
    
                //手工确认消息消费,不是多条确认
                channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
            };
    
            //关闭自动确认消息
            channel.basicConsume(QUEUE_NAME, false, deliverCallback, consumerTag -> {
            });
        }
    }
    
  • 轮询策略验证

    • 先启动两个消费者,再启动生产者
    • 缺点:存在部分节点消费过快,部分节点消费慢,导致不能合理处理消息

RabbitMQ 工作队列公平策略实战

  • 公平策略验证

    • 修改消费者策略
    • 解决消费者能力消费不足的问题,降低消费时间问题
  • 修改策略

    • 两个消费者都修改策略:限制消费者每次只能消费一条消息,处理完成才可以处理下一条