RabbitMQ之顺序消费

什么是顺序消费
例如:业务上产生者发送三条消息, 分别是对同一条数据的增加、修改、删除操作, 如果没有保证顺序消费,执行顺序可能变成删除、修改、增加,这就乱了。
如何保证顺序性
一般我们讨论如何保证消息的顺序性,会从下面三个方面考虑
1:发送消息的顺序
2:队列中消息的顺序
3:消费消息的顺序
发送消息的顺序
消息发送端的顺序,大部分业务不做要求,谁先发消息无所谓,如果遇到业务一定要发送消息也确保顺序,那意味着,只能全局加锁一个个的操作,一个个的发消息,不能并发发送消息。

队列中消息的顺序
RabbitMQ 中,消息最终会保存在队列中,在同一个队列中,消息是顺序的,先进先出原则,这个由 RabbitMQ 保证,通常也不需要开发关心。

不同队列 中的消息顺序,是没有保证的,例如:进地铁站的时候,排了三个队伍,不同队伍之间的,不能确保谁先进站。

消费消息的顺序
我们说如何保证消息顺序性,通常说的就是消费者消费消息的顺序,在多个消费者消费同一个消息队列的场景,通常是无法保证消息顺序的,

虽然消息队列的消息是顺序的,但是多个消费者并发消费消息,获取的消息的速度、执行业务逻辑的速度快慢、执行异常等等原因都会导致消息顺序不一致。
例如:消息A、B、C按顺序进入队列,消费者A1拿到消息A、消费者B1拿到消息B, 结果消费者B执行速度快,就跑完了,又或者消费者A1挂了,都会导致消息顺序不一致。
解决消费顺序的问题, 通常就是一个队列只有一个消费者 , 这样就可以一个个消息按顺序处理, 缺点就是并发能力下降了,无法并发消费消息,这是个取舍问题。

如果业务又要顺序消费,又要增加并发,通常思路就是开启多个队列,业务根据规则将消息分发到不同的队列,通过增加队列的数量来提高并发度,例如:电商订单场景,只需要保证同一个用户的订单消息的顺序性就行,不同用户之间没有关系,所以只要让同一个用户的订单消息进入同一个队列就行,其他用户的订单消息,可以进入不同的队列。

以下为代码设计过程实现
首先我们必须保证只有一个消费者 那么问题就来了,我们的项目一般是多副本的,如何保证只有一个副本在消费呢
这时就会用到消费者 单活模式 x-single-active-consumer
使用下述配置实现


private Queue creatQueue(String name){
        // 创建一个 单活模式 队列
        HashMap<String, Object> args=new HashMap<>();
        args.put("x-single-active-consumer",true);
        return new Queue(name,true,false,false,args);
        }

创建之后,我们可以在控制台看到 消费者的激活状态
在这里插入图片描述

=======================>配置类
@Configuration
@SuppressWarnings("all")
public class DirectExchangeConfiguration {
    @Bean
    public Queue queue15_0() {
        return creatQueue(Message15.QUEUE_0);
    }


    @Bean
    public Queue queue15_1() {
        return creatQueue(Message15.QUEUE_1);
    }

    @Bean
    public Queue queue15_2() {
        return creatQueue(Message15.QUEUE_2);
    }

    @Bean
    public Queue queue15_3() {
        return creatQueue(Message15.QUEUE_3);
    }

    @Bean
    public DirectExchange exchange15() {
        // name: 交换机名字 | durable: 是否持久化 | exclusive: 是否排它
        return new DirectExchange(Message15.EXCHANGE, true, false);
    }


    @Bean
    public Binding binding15_0() {
        return BindingBuilder.bind(queue15_0()).to(exchange15()).with("0");
    }

    @Bean
    public Binding binding15_1() {
        return BindingBuilder.bind(queue15_1()).to(exchange15()).with("1");
    }

    @Bean
    public Binding binding15_2() {
        return BindingBuilder.bind(queue15_2()).to(exchange15()).with("2");
    }

    @Bean
    public Binding binding15_3() {
        return BindingBuilder.bind(queue15_3()).to(exchange15()).with("3");
    }

    /**
     * 创建一个 单活 模式的队列
     * 注意 :
     * <p>
     * 如果一个队列已经创建为非x-single-active-consumer,而你想更改其为x-single-active-consumer,要把之前创建的队列删除
     *
     * @param name
     * @return queue
     */
    private Queue creatQueue(String name) {
        // 创建一个 单活模式 队列
        HashMap<String, Object> args = new HashMap<>();
        args.put("x-single-active-consumer", true);
        return new Queue(name, true, false, false, args);
    }
=================================》生产者
@Component
@Slf4j
public class Producer15 {
    @Resource
    private RabbitTemplate rabbitTemplate;

    /**
     * 这里的发送是 拟投递到多个队列中
     *
     * @param id  业务id
     * @param msg 业务信息
     */
    public void syncSend(int id, String msg) {
        Message15 message = new Message15(id, msg);
        rabbitTemplate.convertAndSend(Message15.EXCHANGE, this.getRoutingKey(id), message);
    }

    /**
     * 根据 id 取余来决定丢到那个队列中去
     *
     * @param id id
     * @return routingKey
     */
    private String getRoutingKey(int id) {
        return String.valueOf(id % Message15.QUEUE_COUNT);
    }
}
============================》消费者
/**
 * 要想保证消息的顺序,每个队列只能有一个消费者
 *
 * @author 深漂码农@明哥
 * @date 2024-03-18
 */
@Component
@RabbitListener(queues = Message15.QUEUE_0)
@RabbitListener(queues = Message15.QUEUE_1)
@RabbitListener(queues = Message15.QUEUE_2)
@RabbitListener(queues = Message15.QUEUE_3)
@Slf4j
public class Consumer15 {

    @RabbitHandler
    public void onMessage(Message15 message) throws InterruptedException {
        log.info("[{}][Consumer15 onMessage][线程编号:{} 消息内容:{}]", LocalDateTime.now(), Thread.currentThread().getId(), message);
        // 这里随机睡一会,模拟业务处理时候的耗时
        long l = new Random(1000).nextLong();
        TimeUnit.MILLISECONDS.sleep(l);
    }
}
==============================》测试类
@Test
    void mock() throws InterruptedException {
        // 先启动这个测试类,模拟多个副本情况下,看如何消费
        new CountDownLatch(1).await();
    }

    @Test
    void syncSend() throws InterruptedException {
        // 模拟每个队列中扔 10 个数据,看看效果
        for (int i = 0; i < 10; i++) {
            for (int j = 0; j < 4; j++) {
                producer15.syncSend(j, " 编号:" + j + " 第:" + i + " 条消息");
            }
        }

        TimeUnit.SECONDS.sleep(20);
    }
}

ps:测试的时候时候 先启动 mock 方式。 在启动 syncSend 方法,模拟多个副本同时消费,观察是否可以
以上的是RabbitMQ之顺序消费实现的代码 若不了解rabbitmq的基本使用 建议先看看我前面对应的文章 文章链接:点我—>let’s go
若需完整代码 可识别二维码后 给您发代码。
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/592568.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

【Linux】进程的隔离和控制:namespace 隔离、cgroup 控制

文章目录 五、namespace 隔离dd -- 读取、转换并输出数据mkfs -- 格式化文件系统df -- 显示文件系统磁盘使用情况mount -- 加载文件系统到指定的加载点unshare -- 创建子进程&#xff0c;同时与父程序不共享namespace一个 demo 六、cgroup(Control Group) 相关命令pidstat -- 监…

【LeetCode刷题记录】230. 二叉搜索树中第K小的元素

230 二叉搜索树中第K小的元素 给定一个二叉搜索树的根节点 root &#xff0c;和一个整数 k &#xff0c;请你设计一个算法查找其中第 k 个最小元素&#xff08;从 1 开始计数&#xff09;。 示例 1&#xff1a; 输入&#xff1a;root [3,1,4,null,2], k 1 输出&#xff1…

2024年深圳杯东三省联赛数模竞赛A题代码改进-更加合理的结果

4月下旬深圳杯开赛后第二天就推出了完整版的论文&#xff0c;经过长达半个月大家再售后群的讨论分析&#xff0c;我们又重新对之前思路下写的代码进行了改进。本次改进的结果&#xff0c;我们特地参考了网上一些常见的火箭及其相关的级别分离高度&#xff1a;&#xff08;我们的…

c语言刷题——输出图案

1.输出用“*”组成的X形图案 题目&#xff1a;请打印用“*”组成的X形图案 描述&#xff1a; 多组输入&#xff0c;一个整数&#xff08;2~20&#xff09;&#xff0c;表示输出的行数&#xff0c;也表示组成“X”的反斜线和正斜线的长度。 输出描述&#xff1a; 针对每行输…

【C语言】指针篇- 深度解析Sizeof和Strlen:热门面试题探究(5/5)

&#x1f308;个人主页&#xff1a;是店小二呀 &#x1f308;C语言笔记专栏&#xff1a;C语言笔记 &#x1f308;C笔记专栏&#xff1a; C笔记 &#x1f308;喜欢的诗句:无人扶我青云志 我自踏雪至山巅 文章目录 一、简单介绍Sizeof和Strlen1.1 Sizeof1.2 Strlen函数1.3 Sie…

零基础学习数据库SQL语句之查询表中数据的DQL语句

是用来查询数据库表的记录的语句 在SQL语句中占有90%以上 也是最为复杂的操作 最为繁琐的操作 DQL语句很重要很重要 初始化数据库和表 USE dduo;create table tb_emp(id int unsigned primary key auto_increment comment ID,username varchar(20) not null unique comment…

大语言模型中的第一性原理:Scaling laws

大语言模型的尺度定律在大语言模型的训练过程中起到了非常重要的作用。即使读者不参与大语言模型的训练过程&#xff0c;但了解大语言模型的尺度定律仍然是很重要的&#xff0c;因为它能帮助我们更好的理解未来大语言模型的发展路径。 1. 什么是尺度定律 尺度定律&#xff08…

stamps做sbas-insar,时序沉降图怎么画?

&#x1f3c6;本文收录于「Bug调优」专栏&#xff0c;主要记录项目实战过程中的Bug之前因后果及提供真实有效的解决方案&#xff0c;希望能够助你一臂之力&#xff0c;帮你早日登顶实现财富自由&#x1f680;&#xff1b;同时&#xff0c;欢迎大家关注&&收藏&&…

【跟我学RISC-V】(二)RISC-V的基础知识学习与汇编练习

写在前面&#xff1a; 这篇文章是跟我学RISC-V的第二期&#xff0c;是第一期的延续&#xff0c;第一期主要是带大家了解一下什么是RISC-V,是比较大体、宽泛的概念。这一期主要是讲一些基础知识&#xff0c;然后进行RISC-V汇编语言与c语言的编程。在第一期里我们搭建了好几个环…

WPF之绑定验证(错误模板使用)

1&#xff0c;前言&#xff1a; 默认情况下&#xff0c;WPF XAML 中使用的绑定并未开启绑定验证&#xff0c;这样导致用户在UI上对绑定的属性进行赋值时即使因不符合规范内部已抛出异常&#xff08;此情况仅限WPF中的数据绑定操作&#xff09;&#xff0c;也被程序默认忽略&…

4.【Orangepi Zero2】Linux定时器(signal、setitimer),软件PWM驱动舵机(SG90)

Linux定时器&#xff08;signal、setitimer&#xff09;&#xff0c;软件PWM驱动舵机&#xff08;SG90&#xff09; signalsetitimer示例 软件PWM驱动舵机&#xff08;SG90&#xff09; signal 详情请看Linux 3.进程间通信&#xff08;shmget shmat shmdt shmctl 共享内存、si…

【计算机网络】循环冗余校验:Cyclic Redundancy Check

1. 任务目标 利用循环冗余校验&#xff08;CRC&#xff09;检测错误。 循环冗余校验&#xff08;英语&#xff1a;Cyclic redundancy check&#xff0c;通称 CRC&#xff09;是一种根据网上数据包或计算机文件等数据产生简短固定位数校验码的一种散列函数&#xff0c;主要用来…

智慧文旅展现文化新风貌,科技助力旅行品质升级:借助智慧技术,文旅产业焕发新生机,为旅行者带来更高品质的文化体验之旅

一、引言 在数字化、智能化的浪潮下&#xff0c;文旅产业正迎来前所未有的发展机遇。智慧文旅作为文旅产业与信息技术深度融合的产物&#xff0c;不仅为旅行者带来了全新的文化体验&#xff0c;也为文旅产业注入了新的活力。本文旨在探讨智慧文旅如何借助智慧技术展现文化新风…

C++:二叉搜索树的底层模拟实现

概念&#xff1a; 二叉搜索树又称二叉排序树&#xff0c;它或者是一棵空树&#xff0c;或者是具有以下性质的二叉树&#xff1a; 搜索二叉树的操作&#xff1a; int a[] {8, 3, 1, 10, 6, 4, 7, 14, 13};二叉搜索树需要满足左子树比根小&#xff0c;右子树比根大&#xff0c;…

Leetcode—1235. 规划兼职工作【困难】(upper_bound、自定义排序规则)

2024每日刷题&#xff08;125&#xff09; Leetcode—1235. 规划兼职工作 算法思想 实现代码 class Solution { public:int jobScheduling(vector<int>& startTime, vector<int>& endTime, vector<int>& profit) {int n startTime.size();vec…

循环神经网络完整实现(Pytorch 13)

一 循环神经网络的从零开始实现 从头开始基于循环神经网络实现字符级语言模型。 %matplotlib inline import math import torch from torch import nn from torch.nn import functional as F from d2l import torch as d2lbatch_size, num_steps 32, 35 train_iter, vocab …

(六)SQL系列练习题(下)#CDA学习打卡

目录 三. 查询信息 16&#xff09;检索"1"课程分数小于60&#xff0c;按分数降序排列的学生信息​ 17&#xff09;*按平均成绩从高到低显示所有学生的所有课程的成绩以及平均成绩 18&#xff09;*查询各科成绩最高分、最低分和平均分 19&#xff09;*按各科成绩…

PHP 反序列化

一、PHP 序列化 1、对象的序列化 <?php class people{public $nameGaming;private $NationLiyue;protected $Birthday12/22;public function say(){echo "老板你好呀&#xff0c;我是和记厅的镖师&#xff0c;叫我嘉明就行&#xff0c;要运货吗你&#xff1f;"…

手机恢复出厂设置ip地址会变吗

当我们对手机进行恢复出厂设置时&#xff0c;很多人会担心手机的IP地址是否会发生变化。IP地址对于手机的网络连接至关重要&#xff0c;它决定了手机在网络中的身份和位置。那么&#xff0c;手机恢复出厂设置后&#xff0c;IP地址到底会不会发生变化呢&#xff1f;虎观代理小二…

Jenkins docker部署springboot项目

1、创建jenkins容器 1&#xff0c;首先&#xff0c;我们需要创建一个 Jenkins 数据卷&#xff0c;用于存储 Jenkins 的配置信息。可以通过以下命令创建一个数据卷&#xff1a; docker volume create jenkins_data启动 Jenkins 容器并挂载数据卷&#xff1a; docker run -dit…
最新文章