OpenEdv-开源电子网

 找回密码
 立即注册
正点原子全套STM32/Linux/FPGA开发资料,上千讲STM32视频教程免费下载...
查看: 4868|回复: 0

RabbitMQ高级之消息限流与延时队列

[复制链接]

143

主题

145

帖子

0

精华

高级会员

Rank: 4

积分
585
金钱
585
注册时间
2020-5-25
在线时间
42 小时
发表于 2020-9-3 15:49:49 | 显示全部楼层 |阅读模式
1. 🔍消息队列如何限流?
消息队列限流是指在服务器面临巨额流量时,为了进行自保,进行的一种救急措施。
因为巨大的流量代表着非常多的消息,这些消息如果多到服务器处理不过来就会造成服务器瘫痪,影响用户体验,造成不良影响。
所以要进行一次降级操作,把处理不了的流量隔绝在系统之外,避免它们打垮系统。
基本上任何一个消息队列都有限流的功能,今天我们就来看看在RabbitMQ之中进行限流具体应该怎么做?
RabbitMQ提供了一种QOS(服务质量保证)功能,即在非自动确认消息的前提下,如果一定数目的消息还未被消费确认,则不进行新消息的消费。

spring:  rabbitmq:    addresses: 127.0.0.1    host: 5672    username: guest    password: guest    virtual-host: /    # 手动确认消息    listener:      simple:          acknowledge-mode: manual          prefetch: 2
我们只需要配置一下rabbitmq.listener.simple下的prefetch属性即可,为了演示方便我这里配置为两条,语义即为:如果队列中有两条以上未签收的消息,则不进行新的消息消费。
我往我的队列中发送三条信息,并不进行签收,来看看效果:
发送完显示我们系统中有三条Ready消息,这代表这三条消息还在队列中没有消费端去消费。
这时我打开消费端进行消费但依旧不进行签收,接着来看效果:
unacked=2,ready=1,这就代表有两条消息在服务端消费了但是没有签收,还有一条消息还在队列中没有往服务端推送,因为我们设置了prefetch=2,所以现在队列的最大同时在消费的消息数量为2,通过此种方式,我们就完成了消费限流。
Tip : 这种方式下消息一定要进行手动签收,因为之前的文章中我们讲过,自动签收是消息一达到消费端就进行签收了,可能我们的业务逻辑还没运行就已经进行签收了,所以自动签收状态下开启限流几乎没有作用。
2. 📑RabbitMQ控制台
上一节我的截图中,大家可以发现居然出现了可视化的界面,以往在我的截图中一般都是DOS命令操作台界面,但其实RabbitMQ是自带了可视化界面的插件的,我们只需要开启即可。
在我们的RabbitMQ中输入如下命令:rabbitmq-plugins.bat enable rabbitmq_management
就可以开启可视化页面了,紧接着访问:http://localhost:15672/
默认用户名和密码都是 guest,直接登录即可。
很方便的控制台,大家可以自己试一下~
3. 📔TTL消息/队列
TTL是Time To Live的缩写,也就是生存时间的意思,RabbitMQ支持消息的过期时间,在消息发送时可以进行指定,也支持队列的过期时间,从消息入队列开始计算,只要超过了队列的超时时间配置,那么消息会自动的清除。
设置队列的话就是整个队列的消息到时都会过期,设置消息的话就是单条消息到时自动过期。
    // TTL队列示例    @Bean    public Queue ttlQueue() {        Map<String, Object> arguments = new HashMap<>();        // 设置3s过期        arguments.put("x-message-ttl",3000);        return new Queue("topicQueue1",false,false,false, arguments);    }
上面的代码就是演示如何创建一个TTL队列,需要放入参数才行,队列构造中的其他参数我为了方便直接填了false。
    public void sendTtl() {        String message = "Hello 我是作者和耳朵,欢迎关注我。" + LocalDateTime.now().toString();        System.out.println("Message content : " + message);        // 设置过期3s        MessageProperties props = MessagePropertiesBuilder.newInstance()                .setExpiration("3000").build();        rabbitTemplate.send(Producer.QUEUE_NAME,new Message(message.getBytes(StandardCharsets.UTF_8),props));        System.out.println("消息发送完毕。");    }
设置消息的TTL也是设置参数即可。
以上就是RabbitMQ中关于TTL的知识点。
4. &#128204;DLX死信队列
DLX死信队列虽然叫队列,但其实指的是Exchange,或者说指的Exchange和它所属的Queue,他俩一块构成了死信队列。
当一条消息:
  • 消费被拒绝(basic.reject/basic.nack)并且requeue=false
  • TTL过期
  • 要进入的队列达到最大长度
这三种情况,就可以判定一条消息死了,这种消息如果我们没有做处理,它就会被自动删除。
但其实我们可以在队列上加上一个参数,使当队列中发现了死亡的消息之后会将它自动转发到某个Exchange,由指定的Exchange来处理这些死亡的消息。
这个处理死亡消息的Exchange和之前我们讲述的Exchange没什么区别,依然可以绑定队列然后进行消息消费。
    // DLX队列示例    @Bean    public Queue dlxQueue() {        Map<String, Object> arguments = new HashMap<>();        // 指定消息死亡后发送到ExchangeName="dlx.exchange"的交换机去        arguments.put("x-dead-letter-exchange","dlx.exchange");        return new Queue("topicQueue1", false, false, false, arguments);    }
如上代码,就是设置了一个队列中的消息死亡后的去处,就等于消息死亡后给它不把它删掉而是做一次转发,发到其他Exchange去。
那这样搞有什么用呢?这就取决于业务需求了,不过下一节会用到它,接着往下看~
5. &#128161;延时队列
RabbitMQ的基因中没有延时队列这回事,它不能直接指定一个队列类型为延时队列,然后去延时处理,但是经过上面两节的铺垫,我们可以将TTL+DLX相结合,这就能组成一个延时队列。
设想一个场景,下完订单之后15分钟未付款我们就要将订单关闭,这就是一个很经典的演示消费的场景,如果拿RabbitMQ来做,我们就需要结合TTL+DLX了。
先把订单消息设置好15分钟过期时间,然后过期后队列将消息转发给我们设置好的DLX-Exchange,DLX-Exchange再将分发给它绑定的队列,我们的消费者再消费这个队列中的消息,就做到了延时十五分钟消费。
真是super~~~简单呢

正点原子逻辑分析仪DL16劲爆上市
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则



关闭

原子哥极力推荐上一条 /2 下一条

正点原子公众号

QQ|手机版|OpenEdv-开源电子网 ( 粤ICP备12000418号-1 )

GMT+8, 2025-1-19 14:12

Powered by OpenEdv-开源电子网

© 2001-2030 OpenEdv-开源电子网

快速回复 返回顶部 返回列表