本篇文章包含以下内容:
- 文件备份功能实现思路分析
- Linux环境下使用docker部署RabbitMq
- 若依项目引入RabbitMq并使用
- 如何在微服务中规范使用RabbitMq
- 基于RabbitMq实现延时队列
- 基于延时队列和定时任务实现错峰削峰备份,存量、增量备份
1、文件备份功能实现思路分析
备份的本质是为了确保数据在原始位置发生故障或损坏时能够得以恢复。
方案
- MinIO 同步工具:使用 MinIO 自带的
mc
命令行工具进行同步操作。 - 自定义备份服务:在应用中实现备份逻辑,将所有文件下载到本地或另一个 MinIO 存储桶
增量备份
增量备份只备份自上次备份以来发生变化的数据。
实现方案:
- 使用版本控制:MinIO 支持版本控制,可以备份新版本或新创建的文件。
- 跟踪文件修改时间:记录每个文件的最后修改时间,只备份修改时间晚于上次备份的文件。
出于不想引入和维护过多不常用的第三方工具,并且想顺便学习一下mq使用的目的,我选择使用自定义编码来实现文件备份。
基于MinIO实现降峰错峰备份Spring项目中上传的文件
具体思路
- 集成MinIO客户端到SpringBoot项目
- 设计文件上传策略,实现分片上传
- 集成消息队列以及定时任务调度,实现降峰错峰备份
- 异常处理与日志记录
- 测试与优化
文件错峰备份实现分析
- 分析现有系统
- 了解现有数据库结构:查看现有数据库表结构,确定是否需要添加新字段或新建表。
- 评估现有上传逻辑:理解现有上传功能的实现方式,以便在此基础上添加备份逻辑。
- 数据库设计
- 添加字段或新建表:
- 如果备份信息不多,可以在现有文件信息表中添加字段,如
backup_status
(备份状态)、last_backup_time
(最后备份时间)等。 - 如果备份信息较为复杂,如需要记录备份历史、备份日志等,建议新建一个备份表,例如
file_backup
。
- 如果备份信息不多,可以在现有文件信息表中添加字段,如
- 备份任务表:新建一个表来跟踪备份任务的状态和详情。
- 示例字段:
id
(主键)fileId
(关联文件表的外键ID)file_path
(文件路径)status
(备份状态,如待备份0、备份中1、备份成功2、备份失败3)created_at
(创建时间)updated_at
(更新时间)backup_at
(备份时间)error_message
(错误信息)
- 示例字段:
CREATE TABLE `netdisk_file_backup` ( `id` VARCHAR(20) NOT NULL COMMENT '主键ID', `fileId` varchar(20) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL COMMENT '关联文件表的外键ID', `file_path` VARCHAR(255) NOT NULL COMMENT '文件路径', `status` TINYINT NOT NULL COMMENT '备份状态:0-待备份,1-备份中,2-备份成功,3-备份失败', `created_at` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间', `updated_at` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间', `backup_at` TIMESTAMP NULL DEFAULT NULL COMMENT '备份时间', `error_message` TEXT COMMENT '错误信息', PRIMARY KEY (`id`), FOREIGN KEY (`fileId`) REFERENCES `netdisk_file`(`fileId`) ON DELETE CASCADE ON UPDATE CASCADE ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
- 技术栈选择
- 定时任务调度:可以使用Spring Boot自带的
@Scheduled
注解,或者集成Quartz、XXL-JOB等定时任务框架。 - 消息队列:选择一个适合的消息队列服务,如RabbitMQ、Kafka或Redis等。创建一个延迟队列用于存放待备份的文件任务。
- 实现步骤
- 修改数据库
- 部署rabbitMQ
- 集成消息队列
- 修改上传逻辑
- 当文件创建或上传成功后,添加一条备份任务信息到备份任务表,并将fileId发送到备份队列。
- 实现备份服务(延迟队列实现)
- 文件上传时发送消息到队列并设置任务的过期时间,或使用插件来完成延迟队列。
- 等待任务过期后,死信队列处理队列中的任务信息,将其中的文件备份,上传到另一个存储桶。
- 在备份操作完成后,更新备份任务表中的状态信息。
- 如果备份失败,将任务重新放入队列或标记为待重试状态。
- 备份操作完成后,更新数据库中的备份状态。
- 确保队列中的消息是持久化的,以防止消息丢失。消费者在处理完消息后,需要发送确认信号,从队列中移除该消息。
- 集成定时任务
- 实现定时任务调用备份服务
- 创建定时任务,用于扫描数据库中状态为“待备份”的文件,并将这些文件的fileId信息发送到备份队列。
- 配置定时任务:设置定时任务执行的时间策略,在系统负载较低的时段执行(如凌晨三点)。扫描数据库表中未备份字段为1的数据,并将该文件备份,然后修改备份状态。(优化方案:使用redis存储当天需要更新的文件,从redis中读取需要备份的文件id进行备份,然后批量对数据库进行更新操作。)
- 更新数据库中备份状态和备份时间。
- 异常处理和重试机制
- 监控和日志
- 考虑实现删除文件时同时删除备份文件
简单初版逻辑:上传文件时添加数据库信息->每天定时扫描表中未备份的数据重新备份。
在Docker上安装部署RabbitMQ方便快捷,不需要额外安装Erlang环境。
- 备份存储位置:
- 可以选择将备份文件存储在另一个MinIO实例上,或者使用其他云存储服务,如AWS S3、阿里云OSS等。
- 确保备份存储与原始存储隔离,以防止数据丢失时影响到备份数据。
2、Linux环境下使用docker部署RabbitMq
拉取或导入镜像
使用 docker load -i <input_file>
导入镜像。

创建目录
一个用来存放配置,一个用来存储上传文件的目录
启动前需要先创建Minio外部挂载的配置文件( /root/docker/minio/config),
和存储上传文件的目录( /root/docker/minio/data)
mkdir -p /root/docker/rabbitmq/config mkdir -p /root/docker/rabbitmq/data
创建rabbitmq容器并运行
多行模式
docker run -d \ --name rabbitmq \ -p 5672:5672 -p 15672:15672 \ -v /root/docker/rabbitmq/data:/var/lib/rabbitmq \ --hostname my-rabbitmq-host \ -e RABBITMQ_DEFAULT_VHOST=my_vhost \ -e RABBITMQ_DEFAULT_USER=admin \ -e RABBITMQ_DEFAULT_PASS=admin \ --restart=always \ rabbitmq:3.7.25-management
参数说明:
-d:后台运行容器
-name:指定容器名
-p:指定服务运行的端口(5672:应用访问端口;15672:控制台Web端口号)
-v:映射目录或文件,启动了一个数据卷容器,数据卷路径为:/var/lib/rabbitmq,再将此数据卷映射到住宿主机的/root/docker/rabbitmq/data目录
–hostname:主机名(RabbitMQ的一个重要注意事项是它根据所谓的 “节点名称” 存储数据,默认为主机名)
-e:指定环境变量;(
RABBITMQ_DEFAULT_VHOST:默认虚拟机名;
RABBITMQ_DEFAULT_USER:默认的用户名;
RABBITMQ_DEFAULT_PASS:默认用户名的密码)
–restart=always:当Docker重启时,容器能自动启动
rabbitmq:3.7.25-management:镜像名
RABBITMQ_DEFAULT_VHOST=my_vhost,my_vhost名字请记好,在之后的编程中要用到,如果启动时没指定,默认值为/
单行模式 默认guest用户,密码也是guest
docker run -d -p 5672:5672 -p 15672:15672 --name rabbitmq -v /root/docker/rabbitmq/data:/var/lib/rabbitmq rabbitmq:3.7.25-management 学院外网自用 docker run -d -p 5673:5672 -p 15673:15672 --name rabbitmq -v /root/docker/rabbitmq/data:/var/lib/rabbitmq rabbitmq:3.13.6-management-alpine
查看日志
docker logs -f rabbitmq
此处的rabbitmq为创建容器时的name (或者使用容器id查看)
RabbitMQ的密码修改
进入docker 容器
docker exec -it rabbitmq bash
其中 rabbitmq 是自己的容器中 RabbitMQ 的名称,可通过 docker ps -a 进行查看,然后替换自己容器中的 名称 即可
查看当前用户列表
rabbitmqctl list_users
root@myRabbit:/# rabbitmqctl list_users Listing users ... user tags admin [administrator]
修改密码
命令:rabbitmqctl change_password [username] ‘[NewPassword]’
username:对应不同用户映射的密码,比如需要修改管理员 admin 的密码那么就填 admin
NewPassword:直接写你的新密码即可
root@myRabbit:/# rabbitmqctl change_password admin '17231547026' Changing password for user "admin" ...
创建用户并设置角色
创建管理员用户 创建管理员用户,负责整个MQ的运维,例如: $sudo rabbitmqctl add_user user_admin passwd_admin 赋予其administrator角色: $sudo rabbitmqctl set_user_tags user_admin administrator 创建RabbitMQ监控用户 创建RabbitMQ监控用户,负责整个MQ的监控,例如: $sudo rabbitmqctl add_user user_monitoring passwd_monitor 赋予其monitoring角色: $sudo rabbitmqctl set_user_tags user_monitoring monitoring 创建某个项目的专用用户 创建某个项目的专用用户,只能访问项目自己的virtual hosts $sudo rabbitmqctl add_user user_proj passwd_proj 赋予其monitoring角色: $sudo rabbitmqctl set_user_tags user_proj management 创建和赋角色完成后查看并确认: $sudo rabbitmqctl list_users 查看权限: $sudo rabbitmqctl list_user_permissions user_admin
RabbitMQ 管理页面的概览
RabbitMQ的管理页面是一个内置的、基于web的用户界面,允许用户查看和管理RabbitMQ服务器的状态和行为。
登录RabbitMQ管理页面后,会看到以下几个主要部分:
- Overview: 这个页面显示了RabbitMQ服务器的一般信息,例如节点的名字、状态、运行时间等。
- Connections: 在这里,可以查看、管理和关闭当前所有的TCP连接。
- Channels: 这个页面展示了所有当前打开的通道以及它们的详细信息。
- Exchanges: 可以在这里查看、创建和删除交换机。
- Queues: 这个页面展示了所有当前的队列以及它们的详细信息。
- Admin: 在这里,可以查看系统中所有的操作用户。
3、若依项目引入RabbitMq并使用
若依微服务创建新模块:https://blog.csdn.net/zhaolulu916/article/details/119086193
- 添加依赖
<!--rabbitmq依赖--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency> <groupId>org.springframework.amqp</groupId> <artifactId>spring-rabbit-test</artifactId> <scope>test</scope> </dependency>
- 添加配置
放在spring配置下
# spring配置 spring: rabbitmq: host: 172.18.22.129 port: 5672 username: root password: root virtual-host: / publisher-confirms: false # 消息发送到交换机确认机制,是否确认回调 publisher-returns: true devtools: restart: enabled: false
- 阶段性简单测试
@RestController @RequestMapping("/rabbit") public class TestController extends BaseController { @Autowired private RabbitTemplate rabbitTemplate; @RequestMapping("/test") public String test() { return "whaletest模块"; } @GetMapping("/writeMQ") public String writeMQ() { rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME, "boot.as", "Hello,MQ"); return "添加成功"; } }
问题记录一:org.springframework.amqp.AmqpConnectException: java.net.ConnectException: Connection refused: connec
由于rabbitmq默认用户guest是只能允许本地主机localhost连接的,而我的rabbitmq是在服务器上以docker创建的,属于远程访问,因此连接报错。
解决方法如下:
(1).首先确保5672/15672这两个端口已经开放,
(2).进入rabbitmq客户端,在admin添加新用户root,tag选择administrator
(3).点进root,设置root用户可以管理虚拟主机/,这里点击默认的set permission即可
(4).修改配置

问题记录二:Rabbitmq报unauthorized异常
自定义一下一个消息转换的bean,并在其中设置信任的包,即实体类的路径
生产者:
public class RabbitMQConfig { @Bean public MessageConverter jsonToMapMessageConverter() { DefaultClassMapper defaultClassMapper = new DefaultClassMapper(); defaultClassMapper.setTrustedPackages("cn.xxx.common.mq"); // trusted packages Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter(); jackson2JsonMessageConverter.setClassMapper(defaultClassMapper); return jackson2JsonMessageConverter; } }
消费者:
@Configuration public class RabbitMQConfig { @Bean public MessageConverter messageConverter(){ return new Jackson2JsonMessageConverter(); } }
- 添加监听消费者
package com.filesystem.rabbit.Listener; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @Component public class RabbitMQListener { @RabbitListener(queues = "boot_queue") public void ListenQueue(Message message){ System.out.println("队列boot_queue监听到一条消息"); // 取出即消费System.out.println(new String(message.getBody())); } }
- 深入使用
https://blog.csdn.net/qq_35387940/article/details/100514134
https://blog.csdn.net/weixin_42039228/article/details/123493937
RabbitMQConfig
简单示例,调用在TestController。配置了服务端序列化配置与消费端反序列化配置,配置了回调函数,未配置手动接收
package com.filesystem.rabbit.config; import com.filesystem.rabbit.domain.MessageEntity; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.*; import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.rabbit.listener.RabbitListenerContainerFactory; import org.springframework.amqp.support.converter.DefaultClassMapper; import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter; import org.springframework.amqp.support.converter.MessageConversionException; import org.springframework.amqp.support.converter.MessageConverter; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * @Author: muluo * @CreateTime: 2024/9/25 * @Description: **/ @Slf4j @Configuration public class RabbitMQConfig implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback { public static final String EXCHANGE_NAME="boot_topic_exchange"; public static final String QUEUE_NAME="boot_queue"; //交换机@Bean("bootExchange") public Exchange bootExchange(){ return ExchangeBuilder.topicExchange(EXCHANGE_NAME).durable(true).build(); } //队列@Bean("bootQueue") public Queue bootQueue(){ return QueueBuilder.durable(QUEUE_NAME).build(); } //将交换机和队列进行绑定@Bean public Binding bindQueueExchange(@Qualifier("bootQueue") Queue queue, @Qualifier("bootExchange") Exchange exchange){ return BindingBuilder.bind(queue).to(exchange).with("boot.#").noargs(); } // 消息确认回调函数//推送消息存在四种情况://①消息推送到server,但是在server里找不到交换机 触发 ConfirmCallback 回调函数//②消息推送到server,找到交换机了,但是没找到队列 触发 ConfirmCallback和RetrunCallback两个回调函数。//③消息推送到sever,交换机和队列啥都没找到 触发 ConfirmCallback 回调函数//④消息推送成功 触发 ConfirmCallback 回调函数@Bean public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory, MessageConverter converter){ RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory); // 将发送的信息转换为JSON格式,添加JSON格式转换器rabbitTemplate.setMessageConverter(converter); rabbitTemplate.setConfirmCallback(this); //开启了回退模式并设置了Mandatory,当消息从exchange发送到queue失败了,则会在消息回退到producer,并执行回调函数System.out.println("=======触发回调======="); rabbitTemplate.setMandatory(true); rabbitTemplate.setReturnCallback(this); return rabbitTemplate; } /** * 交换机收到生产者发送消息后的回调函数 * @paramcorrelationData 给消息设置相关消息和全局消息id * @paramack * @paramreason 交换机未收到消息的原因 */@Override public void confirm(CorrelationData correlationData, boolean ack, String reason) { String id = correlationData != null ? correlationData.getId() : ""; if (ack) { log.info("交换机收到消息,消息的id为:"+ id); }else { log.info("交换机未收到消息,消息的id为:"+ id + "原因是:"+ reason); } } /** * 队列未收到交换机路由的消息时触发该函数 * @parammessage 消息的内容 * @paramreplyCode * @paramreplyContext 消息被退回的原因 * @paramexchange 交换机 * @paramrouteKey */@Override public void returnedMessage(Message message, int replyCode, String replyContext, String exchange, String routeKey) { log.error("交换机{}退回了消息{},原因是{},路由是{}",new String(message.getBody()),exchange,replyContext,routeKey); // spring_returned_message_correlation:所退回消息的唯一标识,可用于更新自定义的消息记录表log.info("消息的唯一标识:"+message.getMessageProperties().getHeader("spring_returned_message_correlation").toString()); } }
RabbitMQListenerConfig
package com.filesystem.rabbit.config; import org.springframework.amqp.rabbit.annotation.RabbitListenerConfigurer; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.rabbit.listener.RabbitListenerEndpointRegistrar; import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter; import org.springframework.amqp.support.converter.MessageConverter; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.messaging.converter.MappingJackson2MessageConverter; import org.springframework.messaging.handler.annotation.support.DefaultMessageHandlerMethodFactory; import org.springframework.messaging.handler.annotation.support.MessageHandlerMethodFactory; @Configuration public class RabbitMQListenerConfig implements RabbitListenerConfigurer { // 可以将json串反序列化为对象@Override public void configureRabbitListeners(RabbitListenerEndpointRegistrar rabbitListenerEndpointRegistrar) { rabbitListenerEndpointRegistrar.setMessageHandlerMethodFactory(messageHandlerMethodFactory()); } @Bean MessageHandlerMethodFactory messageHandlerMethodFactory(){ DefaultMessageHandlerMethodFactory messageHandlerMethodFactory = new DefaultMessageHandlerMethodFactory(); messageHandlerMethodFactory.setMessageConverter(mappingJackson2MessageConverter()); return messageHandlerMethodFactory; } @Bean public MappingJackson2MessageConverter mappingJackson2MessageConverter(){ return new MappingJackson2MessageConverter(); } // 将发送的信息转换为JSON格式,添加JSON格式转换器@Bean public MessageConverter messageConverter(){ return new Jackson2JsonMessageConverter(); } // 提供自定义RabbitTemplate,将对象序列化为json串@Bean public RabbitTemplate jacksonRabbitTemplate(ConnectionFactory connectionFactory) { RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory); rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter()); return rabbitTemplate; } }
DirectRabbitConfig
package com.filesystem.rabbit.config; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.DirectExchange; import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * @Author: muluo * @CreateTime: 2024/9/25 * @Description: **///使用direct exchange(直连型交换机) @Configuration public class DirectRabbitConfig { //队列 起名:TestDirectQueue@Bean public Queue TestDirectQueue() { // durable:是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效// exclusive:默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable// autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。// return new Queue("TestDirectQueue",true,true,false);//一般设置一下队列的持久化就好,其余两个就是默认falsereturn new Queue("TestDirectQueue",true); } //Direct交换机 起名:TestDirectExchange@Bean DirectExchange TestDirectExchange() { // return new DirectExchange("TestDirectExchange",true,true);return new DirectExchange("TestDirectExchange",true,false); } //绑定 将队列和交换机绑定, 并设置用于匹配键:TestDirectRouting@Bean Binding bindingDirect() { return BindingBuilder.bind(TestDirectQueue()).to(TestDirectExchange()).with("TestDirectRouting"); } // 未绑定队列,回调函数测试使用@Bean DirectExchange lonelyDirectExchange() { return new DirectExchange("lonelyDirectExchange"); } }
TopicRabbitConfig
package com.filesystem.rabbit.config; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.Queue; import org.springframework.amqp.core.TopicExchange; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * @Author: muluo * @CreateTime: 2024/9/25 * @Description: **/// 主题交换机 @Configuration public class TopicRabbitConfig { //绑定键public final static String man= "topic.man"; public final static String woman= "topic.woman"; @Bean public Queue firstQueue() { return new Queue(TopicRabbitConfig.man); } @Bean public Queue secondQueue() { return new Queue(TopicRabbitConfig.woman); } @Bean TopicExchange exchange() { return new TopicExchange("topicExchange"); } //将firstQueue和topicExchange绑定,而且绑定的键值为topic.man//这样只要是消息携带的路由键是topic.man,才会分发到该队列@Bean Binding bindingExchangeMessage() { return BindingBuilder.bind(firstQueue()).to(exchange()).with(man); } //将secondQueue和topicExchange绑定,而且绑定的键值为用上通配路由键规则topic.#// 这样只要是消息携带的路由键是以topic.开头,都会分发到该队列@Bean Binding bindingExchangeMessage2() { return BindingBuilder.bind(secondQueue()).to(exchange()).with("topic.#"); } }
FanoutRabbitConfig
package com.filesystem.rabbit.config; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.FanoutExchange; import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; // 扇形交换机/** * @Author: muluo * @CreateTime: 2024/9/25 * @Description: **/ @Configuration public class FanoutRabbitConfig { /** * 创建三个队列 :fanout.A fanout.B fanout.C * 将三个队列都绑定在交换机 fanoutExchange 上 * 因为是扇型交换机, 路由键无需配置,配置也不起作用 */@Bean public Queue queueA() { return new Queue("fanout.A"); } @Bean public Queue queueB() { return new Queue("fanout.B"); } @Bean public Queue queueC() { return new Queue("fanout.C"); } @Bean FanoutExchange fanoutExchange() { return new FanoutExchange("fanoutExchange"); } @Bean Binding bindingExchangeA() { return BindingBuilder.bind(queueA()).to(fanoutExchange()); } @Bean Binding bindingExchangeB() { return BindingBuilder.bind(queueB()).to(fanoutExchange()); } @Bean Binding bindingExchangeC() { return BindingBuilder.bind(queueC()).to(fanoutExchange()); } }
MessageEntity
import java.io.Serializable; @Data public class MessageEntity implements Serializable { private static final long serialVersionUID= 1L; private String messageId; private String messageData; private String createTime; // 构造器, 必须有个无参构造器 public MessageEntity() { } }
RabbitMQListener
package com.filesystem.rabbit.Listener; import com.filesystem.rabbit.domain.MessageEntity; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; /** * @Author: muluo * @CreateTime: 2024/9/25 * @Description: **/ @Component public class RabbitMQListener { @RabbitListener(queues = "boot_queue") public void ListenQueue(Message message){ System.out.println("队列boot_queue监听到一条消息"); // 取出即消费System.out.println(new String(message.getBody())); } @RabbitListener(queues = "TestDirectQueue")//监听的队列名称 TestDirectQueuepublic void processDirectQueue(MessageEntity testMessage) { try { System.out.println(testMessage); System.out.println("========================================="); System.out.println("DirectReceiver消费者收到消息 : " + testMessage.toString()); // 在这里处理消息} catch (Exception e) { System.out.println("Error processing message from DirectQueue"); // 根据您的错误处理策略,可以选择重新抛出异常或者进行其他处理} } @RabbitListener(queues = "topic.man") public void processTopic1(MessageEntity testMessage) { System.out.println("TopicManReceiver消费者收到消息 : " + testMessage.toString()); } @RabbitListener(queues = "topic.woman") public void processTopic2(MessageEntity testMessage) { System.out.println("TopicTotalReceiver消费者收到消息 : " + testMessage.toString()); } @RabbitListener(queues = "fanout.A") public void processFanoutA(MessageEntity testMessage) { System.out.println("FanoutReceiverA消费者收到消息 : " +testMessage.toString()); } @RabbitListener(queues = "fanout.B") public void processFanoutB(MessageEntity testMessage) { System.out.println("FanoutReceiverB消费者收到消息 : " +testMessage.toString()); } @RabbitListener(queues = "fanout.C") public void processFanoutC(MessageEntity testMessage) { System.out.println("FanoutReceiverC消费者收到消息 : " +testMessage.toString()); } }
TestController
package com.filesystem.rabbit.controller; import com.filesystem.common.core.web.controller.BaseController; import com.filesystem.rabbit.config.RabbitMQConfig; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; @RestController @RequestMapping("/rabbit") public class TestController extends BaseController { @Autowired private RabbitTemplate rabbitTemplate; @RequestMapping("/test") public String test() { return "whaletest模块"; } @GetMapping("/writeMQ") public String writeMQ() { rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME, "boot.as", "Hello,MQ"); return "添加成功"; } }
SendMessageController
package com.filesystem.rabbit.controller; import com.filesystem.rabbit.config.RabbitMQConfig; import com.filesystem.rabbit.domain.MessageEntity; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RestController; import java.time.LocalDateTime; import java.time.format.DateTimeFormatter; import java.util.HashMap; import java.util.Map; import java.util.UUID; /** * @Author: muluo * @CreateTime: 2024/9/25 * @Description: **/ @RestController public class SendMessageController { @Autowired RabbitTemplate rabbitTemplate; //使用RabbitTemplate,这提供了接收/发送等等方法/** * 定义一个处理HTTP GET请求的方法,用于发送直连消息。 * 当HTTP请求到达指定路径/sendDirectMessage时,此方法会被调用。 */@GetMapping("/sendDirectMessage") public String sendDirectMessage() { // 生成一个唯一的消息ID,使用UUID.randomUUID()生成String messageId = String.valueOf(UUID.randomUUID()); // 定义要发送的消息内容String messageData = "test message, hello!"; // 获取当前时间,并格式化为"yyyy-MM-dd HH:mm:ss"格式String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")); // 创建一个MessageEntity来存储消息的详细信息MessageEntity messageEntity = new MessageEntity(); // 将消息ID、消息内容和创建时间存入Map中messageEntity.setMessageId(messageId); messageEntity.setMessageData(messageData); messageEntity.setCreateTime(createTime); // 使用RabbitTemplate的convertAndSend方法发送消息// 第一个参数是交换机名称,第二个参数是路由键,第三个参数是要发送的消息体// 这里的消息体是一个Map,它将被序列化后发送rabbitTemplate.convertAndSend("TestDirectExchange", "TestDirectRouting", messageEntity); // 方法返回"ok",表示消息发送成功return "ok"; } // 主题交换机测试@GetMapping("/sendTopicMessage1") public String sendTopicMessage1() { String messageId = String.valueOf(UUID.randomUUID()); String messageData = "message: M A N "; String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")); // 创建一个MessageEntity来存储消息的详细信息MessageEntity messageEntity = new MessageEntity(); // 将消息ID、消息内容和创建时间存入Map中messageEntity.setMessageId(messageId); messageEntity.setMessageData(messageData); messageEntity.setCreateTime(createTime); rabbitTemplate.convertAndSend("topicExchange", "topic.man", messageEntity); return "ok"; } // 主题交换机测试@GetMapping("/sendTopicMessage2") public String sendTopicMessage2() { String messageId = String.valueOf(UUID.randomUUID()); String messageData = "message: woman is all "; String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")); // 创建一个MessageEntity来存储消息的详细信息MessageEntity messageEntity = new MessageEntity(); // 将消息ID、消息内容和创建时间存入Map中messageEntity.setMessageId(messageId); messageEntity.setMessageData(messageData); messageEntity.setCreateTime(createTime); rabbitTemplate.convertAndSend("topicExchange", "topic.woman", messageEntity); return "ok"; } // 扇形队列测试@GetMapping("/sendFanoutMessage") public String sendFanoutMessage() { String messageId = String.valueOf(UUID.randomUUID()); String messageData = "message: testFanoutMessage "; String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")); // 创建一个MessageEntity来存储消息的详细信息MessageEntity messageEntity = new MessageEntity(); // 将消息ID、消息内容和创建时间存入Map中messageEntity.setMessageId(messageId); messageEntity.setMessageData(messageData); messageEntity.setCreateTime(createTime); rabbitTemplate.convertAndSend("fanoutExchange", null, messageEntity); return "ok"; } // 触发 ConfirmCallback 回调@GetMapping("/TestMessageAck") public String TestMessageAck() { String messageId = String.valueOf(UUID.randomUUID()); String messageData = "message: non-existent-exchange test message "; String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")); // 创建一个MessageEntity来存储消息的详细信息MessageEntity messageEntity = new MessageEntity(); // 将消息ID、消息内容和创建时间存入Map中messageEntity.setMessageId(messageId); messageEntity.setMessageData(messageData); messageEntity.setCreateTime(createTime); rabbitTemplate.convertAndSend("non-existent-exchange", "TestDirectRouting", messageEntity); return "ok"; } // 把消息推送到名为‘lonelyDirectExchange’的交换机上(这个交换机没有任何队列配置---找到交换机了,但是没找到队列) 因此触发所有回调@GetMapping("/TestMessageAck2") public String TestMessageAck2() { String messageId = String.valueOf(UUID.randomUUID()); String messageData = "message: lonelyDirectExchange test message "; String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")); // 创建一个MessageEntity来存储消息的详细信息MessageEntity messageEntity = new MessageEntity(); // 将消息ID、消息内容和创建时间存入Map中messageEntity.setMessageId(messageId); messageEntity.setMessageData(messageData); messageEntity.setCreateTime(createTime); rabbitTemplate.convertAndSend("lonelyDirectExchange", "TestDirectRouting", messageEntity); return "ok"; } }
问题记录三:必踩天坑----序列化与反序列化问题
解决方案:
1、序列化在任意配置类下提供RabbitTemplate
的bean,覆盖Springboot的自动化配置。将消息转译器设置为springboot提供的Jackson转译器。
// 提供自定义RabbitTemplate,将对象序列化为json串 @Bean public RabbitTemplate jacksonRabbitTemplate(ConnectionFactory connectionFactory) { RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory); rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter()); return rabbitTemplate; }
2、反序列化实现RabbitListenerConfigurer
接口,配置反序列化转译器(可以将序列化转译器的bean也放在这里面)
@Configuration public class RabbitMQConfig implements RabbitListenerConfigurer { // 可以将json串反序列化为对象 @Override public void configureRabbitListeners(RabbitListenerEndpointRegistrar rabbitListenerEndpointRegistrar) { rabbitListenerEndpointRegistrar.setMessageHandlerMethodFactory(messageHandlerMethodFactory()); } @Bean MessageHandlerMethodFactory messageHandlerMethodFactory(){ DefaultMessageHandlerMethodFactory messageHandlerMethodFactory = new DefaultMessageHandlerMethodFactory(); messageHandlerMethodFactory.setMessageConverter(mappingJackson2MessageConverter()); return messageHandlerMethodFactory; } @Bean public MappingJackson2MessageConverter mappingJackson2MessageConverter(){ return new MappingJackson2MessageConverter(); } // 提供自定义RabbitTemplate,将对象序列化为json串 @Bean public RabbitTemplate jacksonRabbitTemplate(ConnectionFactory connectionFactory) { RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory); rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter()); return rabbitTemplate; } }
三、测试
即可定义消费者直接以对象的形式接收消息
@RabbitListener(queues = "muluo") public void huaweiHandler(MessageEntity msg) { System.out.println("muluoHandler>>>>>" + msg); }
生产者直接发送对象
MessageEntity ms = new MessageEntity("yl","123","2024"); rabbitTemplate.convertAndSend("TestDirectExchange", "TestDirectRouting", messageEntity);
4、在微服务中规范使用RabbitMq
消息服务中间件的概述
1,大多应用中,可通过消息服务中间件来提升系统异步通信、扩展解耦能力。
2,消息服务中有两个概念:消息代理和目的地
当消息发送者发送消息以后,将由消息代理接管,消息代理保证消息传递到指定目的地。
3,消息队列主要由两种形式的目的地。
(1)队列:点对点消息通信(一对一)
(2)主题:发布/订阅消息通信(一对多)
1,消息队列的应用场景:
一个网站,用户需要注册,注册后还需要发送注册邮件,还要发送注册短信,假如每一步都需要花费5秒的话,那么注册使用同步的方式,需要花费15秒,如果使用异步的方式注册,让注册邮件和注册短信同时进行,那么花费10秒。
那么问题来了,哪里用到了消息服务中间件呢?
用户注册信息之后,花费0.5秒的时间把信息写入消息中间件,然后提示用户注册成功,用户注册就只花费了5.5秒的时间。
写入消息中间件之后就不用管了,剩下的就交给消息队列自己执行了。
2,消息服务中间件的作用详述
应用解耦:
订单系统---------库存系统
订单系统每次减少一个库存,都要访问库存系统,告诉他已经卖了一单了,库存要减少一个。
订单系统----------消息队列-----------库存系统
订单系统减少一个库存时,写入消息队列,然后库存系统订阅该消息队列,得到订单系统写入的信息,然后库存系统自己减少一个库存。
流量削峰:
双十一那天会同时有超级多的用户同时抢购某一个东西,比如淘宝,那么如果特别多的用户同时访问服务器,服务器肯定是受不了的,解决办法就是把接到的请求订单信息写入消息队列,然后让服务器订阅该消息队列,自己取信息,如果请求数量超过了消息队列的上限,那么消息队列会拒收,所以减小了服务器的压力。
3,消息队列的模式
点对点式:
消息发送消息,消息代理将其放入一个队列中,消息接收者从队列中获取消息内容,消息读取后被移出队列。
消息只有唯一的发送者和接受者。
发布订阅式:
发送者(发布者)发送消息到主题,多个接收者(订阅者)监听(订阅)这个主题,那么就会在消息到达时同时收到消息。
JMS,JAVA消息服务,基于JVM消息代理的规范,ActiveMQ,HornetMQ是JMS实现。 AMQP,高级消息队列协议,也是一个消息代理的规范,兼容JMS,RabbitMQ是AMQP的实现。
简单的消息推送到接收的流程
客户端 -> 服务器交换机 -> 服务器队列 -> 客户端
常用的交换机
常用的交换机有以下三种,因为消费者是从队列获取信息的,队列是绑定交换机的(一般),所以对应的消息推送/接收模式也会有以下几种:
Direct Exchange
直连型交换机,根据消息携带的路由键将消息投递给对应队列。
大致流程,有一个队列绑定到一个直连交换机上,同时赋予一个路由键 routing key 。
然后当一个消息携带着路由值为X,这个消息通过生产者发送给交换机时,交换机就会根据这个路由值X去寻找绑定值也是X的队列。
Fanout Exchange
扇型交换机,这个交换机没有路由键概念,就算你绑了路由键也是无视的。 这个交换机在接收到消息后,会直接转发到绑定到它上面的所有队列。
Topic Exchange
主题交换机,这个交换机其实跟直连交换机流程差不多,但是它的特点就是在它的路由键和绑定键之间是有规则的。
其中
* (星号) 用来表示一个单词 (必须出现的)
# (井号) 用来表示任意数量(零个或多个)单词
通配的绑定键是跟队列进行绑定的,举个小例子
队列Q1 绑定键为 .TT. 队列Q2绑定键为 TT.#
如果一条消息携带的路由键为 A.TT.B,那么队列Q1将会收到;
如果一条消息携带的路由键为TT.AA.BB,那么队列Q2将会收到;
主题交换机是非常强大的,为啥这么膨胀?
当一个队列的绑定键为 "#"(井号) 的时候,这个队列将会无视消息的路由键,接收所有的消息。
当 * (星号) 和 # (井号) 这两个特殊字符都未在绑定键中出现的时候,此时主题交换机就拥有的直连交换机的行为。
所以主题交换机也就实现了扇形交换机的功能,和直连交换机的功能。
另外还有 Header Exchange 头交换机 ,Default Exchange 默认交换机,Dead Letter Exchange 死信交换机,
规范使用
- 定义明确的Queue和Exchange:在设计系统时,应该明确定义每个微服务的Queue和Exchange,Queue用于存储消息,而Exchange则用于将消息路由到相应的Queue,通过明确定义Queue和Exchange,可以提高系统的可读性和可维护性。
- 使用适当的Exchange Type:RabbitMQ提供了多种Exchange Type,如Direct、Fanout、Topic和Headers,根据实际需求选择合适的Exchange Type可以优化消息的路由效率,如果需要将消息发送给多个消费者,可以使用Fanout Exchange;如果需要根据消息的某个属性进行路由,可以使用Topic Exchange。
- 使用Routing Key:在发送消息时,应该为每条消息指定一个Routing Key,Routing Key用于将消息路由到相应的Exchange和Queue,通过合理设置Routing Key,可以实现精确的消息路由和负载均衡。
- 处理消息确认机制:在微服务架构中,消息的可靠性非常重要,为了确保消息被正确处理,应该使用消息确认机制,RabbitMQ提供了两种方式来处理消息确认:自动确认和手动确认,自动确认是默认的方式,它会在消费者收到消息后自动发送确认信号,手动确认则需要在消费者处理完消息后手动发送确认信号,根据实际需求选择合适的确认机制可以提高系统的可靠性和稳定性。
- 处理消息重试机制:在微服务架构中,由于网络延迟或其他原因,可能会出现消息消费失败的情况,为了确保消息被正确处理,应该实现消息重试机制,RabbitMQ提供了内置的消息重试机制,可以通过设置最大重试次数和重试间隔来实现,根据实际需求合理设置重试机制可以提高系统的可靠性和稳定性。
- 监控和管理RabbitMQ集群:在生产环境中,应该对RabbitMQ集群进行监控和管理,可以使用RabbitMQ的管理插件来监控队列的状态、消费者的消费情况等,应该定期备份和恢复RabbitMQ的数据,以防止数据丢失或损坏。
- 限制队列的长度:为了避免队列过长导致性能问题,应该限制队列的长度,可以通过设置队列的最大长度来实现,当队列达到最大长度时,生产者可以选择丢弃消息或者等待队列释放空间。
- 使用多个Consumer:为了提高系统的并发能力和吞吐量,可以使用多个Consumer来消费同一个Queue中的消息,通过合理设置Consumer的数量和负载均衡策略,可以提高系统的并发处理能力。
- 使用TTL(Time to Live)特性:在某些情况下,可能需要设置消息的过期时间,RabbitMQ提供了TTL特性,可以在发送消息时指定消息的过期时间,当消息到达过期时间后,它将自动从Queue中删除。
- 避免死锁和资源竞争:在微服务架构中,多个服务可能会同时操作同一个Queue或Exchange,为了避免死锁和资源竞争,应该合理设计和实现服务之间的同步和互斥机制。
相关问题与解答:
- 问题:如何避免RabbitMQ中的死锁?
为了避免死锁,应该合理设计和实现服务之间的同步和互斥机制,可以使用RabbitMQ的事务机制来确保操作的原子性和一致性,应该避免长时间持有锁或者频繁申请锁,以减少死锁的风险。
- 问题:如何处理RabbitMQ中的大量消息堆积?
当RabbitMQ中的Queue中的消息堆积过多时,可以考虑以下解决方案:增加Consumer的数量来提高并发处理能力;调整消息的过期时间或者重试次数;优化业务逻辑,减少消息的产生量;使用分布式消息队列来分摊压力。
- 问题:如何监控和管理RabbitMQ集群?
可以使用RabbitMQ的管理插件来监控和管理RabbitMQ集群,管理插件提供了丰富的监控和管理功能,包括查看队列的状态、消费者的消费情况、统计信息等,应该定期备份和恢复RabbitMQ的数据,以防止数据丢失或损坏。
- 问题:如何选择合适的Exchange Type?
选择合适的Exchange Type取决于实际需求和系统的设计,可以根据消息的路由方式、消费者的数量、系统的并发能力等因素来选择合适的Exchange Type,常见的选择包括Direct Exchange、Fanout Exchange、Topic Exchange等。
5、基于RabbitMq实现延时队列
- 延迟队列定义
延迟队列指的是存储对应的延迟消息,消息被发送以后,并不想让消费者立刻拿到消息,而是等待特定时间后,消费者才能拿到这个消息进行消费。
RabbitMQ 本身是没有延迟队列的,要实现延迟消息,一般有两种方式:
通过 RabbitMQ 本身队列的特性来实现,需要使用 RabbitMQ 的死信交换机(Exchange)和消息的存活时间 TTL(Time To Live)。或是通过插件实现。
也就是说,AMQP 协议以及 RabbitMQ 本身没有直接支持延迟队列的功能,但是可以通过 TTL 和 DLX 模拟出延迟队列的功能。
- 基于DLX(死信交换机)实现延迟队列
实现思路
假如一条消息需要延迟 30 分钟执行,我们就设置这条消息的有效期为 30 分钟,同时为这条消息配置死信交换机和死信 routing_key,并且不为这个消息队列设置消费者,那么 30 分钟后,这条消息由于没有被消费者消费而进入死信队列,此时我们有一个消费者就在“蹲点”这个死信队列,消息一进入死信队列,就立马被消费了。
- 主要流程
创建两个交换机(Exchange)和两个队列(Queue):
原始消息队列:将需要延迟处理的消息发送到原始消息队列。
死信队列:设置一个 TTL(Time-To-Live)过期时间,未在规定时间内处理的消息会成为死信消息,被发送到死信队列。
使用 DLX(死信交换机)关联原始消息队列和死信队列:
在原始消息队列声明时,设置该队列的 x-dead-letter-exchange 参数为死信交换机的名称。
在原始消息队列声明时,设置该队列的 x-dead-letter-routing-key 参数为死信队列的路由键。
消费者监听死信队列进行延迟消息处理:
消费者监听死信队列,一旦有死信消息到达,即可进行相应的延迟消息处理逻辑。
生产者发送延迟消息到原始消息队列:
生产者发送消息到原始消息队列,并设置消息的 TTL(即过期时间)。
如果消息在规定时间内未被消费者消费,则成为死信消息。
- 实战
DLXRabbitConfig
创建两个消息队列:原始消息队列、死信队列 并且 为原始消息队列关联死信交换机
package com.filesystem.rabbit.config; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.DirectExchange; import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import java.util.HashMap; import java.util.Map; // 基于死信交换机实现延迟队列 @Configuration public class DLXRabbitConfig { // 原始消息队列public static final String MULUO_QUEUE_NAME= "muluo_queue_name"; public static final String MULUO_EXCHANGE_NAME= "muluo_exchange_name"; public static final String MULUO_ROUTING_KEY= "muluo_routing_key"; // 死信消息队列public static final String DLX_QUEUE_NAME= "dlx_queue_name"; public static final String DLX_EXCHANGE_NAME= "dlx_exchange_name"; public static final String DLX_ROUTING_KEY= "dlx_routing_key"; /** * 死信队列 * @return*/@Bean Queue dlxQueue() { return new Queue(DLX_QUEUE_NAME, true, false, false); } /** * 死信交换机 * @return*/@Bean DirectExchange dlxExchange() { return new DirectExchange(DLX_EXCHANGE_NAME, true, false); } /** * 绑定死信队列和死信交换机 * @return*/@Bean Binding dlxBinding() { return BindingBuilder.bind(dlxQueue()).to(dlxExchange()) .with(DLX_ROUTING_KEY); } /** * 声明普通消息队列,并设置对应的死信交换机 * @return*/@Bean Queue MULUOQueue() { Map<String, Object> args = new HashMap<>(); //设置消息过期时间args.put("x-message-ttl", 1000*10); //设置死信交换机args.put("x-dead-letter-exchange", DLX_EXCHANGE_NAME); //设置死信 routing_keyargs.put("x-dead-letter-routing-key", DLX_ROUTING_KEY); return new Queue(MULUO_QUEUE_NAME, true, false, false, args); } /** * 普通交换机 * @return*/@Bean DirectExchange MULUOExchange() { return new DirectExchange(MULUO_EXCHANGE_NAME, true, false); } /** * 绑定普通队列和与之对应的交换机 * @return*/@Bean Binding MULUOBinding() { return BindingBuilder.bind(MULUOQueue()) .to(MULUOExchange()) .with(MULUO_ROUTING_KEY); } }
创建两个消息队列:普通消息队列 / 死信队列(配置队列中的消息过期时间时,默认的时间单位时毫秒。)
每个队列均分别绑定一个交换器(普通交换机通过 routing key绑定该普通消息队列;死信交换机通过 routing key绑定该私死信交换机)
在原始消息队列声明时,设置该队列的 x-dead-letter-exchange 参数为死信交换机的名称。
在原始消息队列声明时,设置该队列的 x-dead-letter-routing-key 参数为死信队列的路由键。
DlxConsumer
为死信队列配置消费者
package com.filesystem.rabbit.Listener; import com.filesystem.rabbit.config.DLXRabbitConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @Component public class DlxConsumer { private static final Logger logger= LoggerFactory.getLogger(DlxConsumer.class); @RabbitListener(queues = DLXRabbitConfig.DLX_QUEUE_NAME) public void handle(String msg) { logger.info(msg); } }
测试
// 延迟队列使用示例 @GetMapping("/TestDLXMessage") public String TestDLXMessage() { System.out.println(new Date()); rabbitTemplate.convertAndSend(DLXRabbitConfig.MULUO_EXCHANGE_NAME, DLXRabbitConfig.MULUO_ROUTING_KEY, "hello muluo!"); return "ok"; }
6、基于延时队列和定时任务实现错峰削峰备份,存量增量备份
实现思路
- 上传服务器中未存在的文件时,将fileId推送至消息队列,并计算上传时间与次日凌晨三点的间隔,设置为过期时间,间隔时间可以设置随机数上下浮动几分钟。 同时,向数据库插入一条备份记录数据,并将备份状态设置为0(未备份);
- 编写接收处理,当消息从原始消息队列过期后,会进入死信队列被消费。在备份业务中,获取到fileId,然后将文件复制备份,备份成功,将修改备份表中的备份状态为2(备份成功),备份失败,将备份表中的备份状态修改为1(备份异常);
- 凌晨五点时,扫描备份表中近两天备份的文件备份状态为2的条目,进行重新备份,成功则修改备份状态为2,失败则发起一条异常消息通知管理员。
- 可以为备份表中的创建时间字段设置索引,从而实现备份固定时间段内的文件,增量备份,存量备份的功能。
注:当文件还未完成备份且此文件从数据库中删除,会导致此条消息无法被成功消费,从而一直报错,因此,在执行备份前需要确定文件是否还存在。
部分代码如下:
插入备份表记录
@Slf4j @Service public class FileBackupService extends ServiceImpl<FileBackupMapper, FileBackup> implements IFileBackupService { @Autowired private FileBackupMapper fileBackupMapper; @Override public boolean saveBackupFile(String fileId) { // 创建 QueryWrapperQueryWrapper<FileBackup> queryWrapper = new QueryWrapper<>(); queryWrapper.eq("fileId", fileId); // 查询记录FileBackup backupFile = fileBackupMapper.selectOne(queryWrapper); if (backupFile != null) { // 存在记录记录System.out.println("存在文件备份记录"); log.info("存在文件备份记录: " + backupFile); return false; }else{ FileBackup newBackupFile = new FileBackup(); newBackupFile.setFileId(fileId); newBackupFile.setCreatedAt(new Date()); newBackupFile.setUpdatedAt(new Date()); newBackupFile.setStatus(0); // 插入数据fileBackupMapper.insert(newBackupFile); } return true; } }
文件备份消息队列配置代码
//文件备份相关消息队列 @Configuration public class FileBackupRabbitConfig { // 原始消息队列public static final String FILE_BACKUP_QUEUE_NAME= "file_backup_queue_name"; public static final String FILE_BACKUP_EXCHANGE_NAME= "file_backup_exchange_name"; public static final String FILE_BACKUP_ROUTING_KEY= "file_backup_routing_key"; // 死信消息队列public static final String DLX_FILE_BACKUP_QUEUE_NAME= "dlx_file_backup_queue_name"; public static final String DLX_FILE_BACKUP_EXCHANGE_NAME= "dlx_file_backup_exchange_name"; public static final String DLX_FILE_BACKUP_ROUTING_KEY= "dlx_file_backup_routing_key"; /** * 死信队列 * @return*/@Bean Queue dlxFileBackupQueue() { return new Queue(DLX_FILE_BACKUP_QUEUE_NAME, true, false, false); } /** * 死信交换机 * @return*/@Bean DirectExchange dlxFileBackupExchange() { return new DirectExchange(DLX_FILE_BACKUP_EXCHANGE_NAME, true, false); } /** * 绑定死信队列和死信交换机 * @return*/@Bean Binding dlxFileBackupBinding() { return BindingBuilder.bind(dlxFileBackupQueue()).to(dlxFileBackupExchange()) .with(DLX_FILE_BACKUP_ROUTING_KEY); } /** * 声明普通消息队列,并设置对应的死信交换机 * @return*/@Bean Queue fileBackupQueue() { Map<String, Object> args = new HashMap<>(); //设置死信交换机args.put("x-dead-letter-exchange", DLX_FILE_BACKUP_EXCHANGE_NAME); //设置死信 routing_keyargs.put("x-dead-letter-routing-key", DLX_FILE_BACKUP_ROUTING_KEY); return new Queue(FILE_BACKUP_QUEUE_NAME, true, false, false, args); } /** * 普通交换机 * @return*/@Bean DirectExchange fileBackupExchange() { return new DirectExchange(FILE_BACKUP_EXCHANGE_NAME, true, false); } /** * 绑定普通队列和与之对应的交换机 * @return*/@Bean Binding fileBackupBinding() { return BindingBuilder.bind(fileBackupQueue()) .to(fileBackupExchange()) .with(FILE_BACKUP_ROUTING_KEY); } }
死信队列监听消费类
@Component public class DlxConsumer { @Autowired private FileMapper fileMapper; @Autowired private FileBackupMapper fileBackupMapper; @Autowired MinioBackupService minioBackupService; private static final Logger logger= LoggerFactory.getLogger(DlxConsumer.class); @RabbitListener(queues = DLXRabbitConfig.DLX_QUEUE_NAME) public void handle(String msg) { logger.info(msg); } @RabbitListener(queues = FileBackupRabbitConfig.DLX_FILE_BACKUP_QUEUE_NAME) public void handleFileBackup(String fileId) { // 处理监听文件备份死信队列中的消息,通过fileId查询文件路径并备份。// 使用fileId查询文件相关信息。System.out.println("死信队列监听到消息: " + fileId); // 使用fileId查询文件相关信息。QueryWrapper<FileBean> queryWrapperFB = new QueryWrapper<>(); queryWrapperFB.eq("fileId", fileId); queryWrapperFB.select("fileId", "fileUrl"); // 指定数据库中的列名FileBean fileBean = fileMapper.selectOne(queryWrapperFB); System.out.println("死信队列查询的fileBean: " + fileBean); // 检查fileBean对象是否为nullif (fileBean == null) { // 处理异常return; } // 备份文件到不同桶中的一样的路径minioBackupService.backupFile(fileBean.getFileUrl(),fileBean.getFileUrl()); // 将数据表中的备份状态更新为2已备份,并更新文件路径// 创建 QueryWrapperSystem.out.println("开始更新文件状态"); QueryWrapper<FileBackup> queryWrapper = new QueryWrapper<>(); queryWrapper.eq("fileId", fileId); // 查询记录FileBackup backupFile = fileBackupMapper.selectOne(queryWrapper); if (backupFile == null) { // 没有找到记录return; } // 更新状态backupFile.setStatus(2); backupFile.setUpdatedAt(new Date()); backupFile.setBackupAt(new Date()); backupFile.setFilePath(fileBean.getFileUrl()); // 更新记录fileBackupMapper.update(backupFile, queryWrapper); System.out.println("更新记录结束。"); logger.info(fileId); } }
定时任务代码
@Component public class BackupTask { @Autowired private FileBackupMapper fileBackupMapper; @Autowired RemoteRabbitService remoteRabbitService; // 每天凌晨三点执行任务@Scheduled(cron = "0 0 3 * * ?") // @Scheduled(fixedRate = 60000, initialDelay = 0) // 测试使用,启动项目立即执行,每分钟执行一次public void performBackupTask() { System.out.println("-------开始执行定时任务--------"); // 查询status为0的记录List<FileBackup> backups = fileBackupMapper.selectByStatus(0); // for (FileBackup backup : backups) {// // 在这里使用backup对象// System.out.println("Backup ID: " + backup.getFileId());// }// 提取ID列表List<String> ids = backups.stream() .map(FileBackup::getFileId) .collect(Collectors.toList()); // 将List转换为逗号分隔的字符串String idsParam = ids.stream() .map(String::valueOf) .collect(Collectors.joining(",")); // 打印提取的ID列表// System.out.println(idsParam);// 远程调用filesystem-rabbit服务remoteRabbitService.backupFiles(idsParam); } }
备份执行逻辑
此处简单的将文件从一个桶备份到了另一个桶,实际情况可以根据不同的需求采用不同的备份策略,比如备份到其他服务器的minio,直接备份到本地或其他服务器的磁盘等。
@Service public class MinioBackupServiceImpl implements MinioBackupService { @Resource private MinioClient minioClient; @Resource private MinIOBackupConfig minIOBackupConfig; @Override public void backupFile(String sourceObjectPath, String targetObjectPath) { // 检查目标桶是否存在,如果不存在则创建// boolean found = minioClient.bucketExists(BucketExistsArgs.builder().bucket(targetBucket).build());// if (!found) {// minioClient.makeBucket(MakeBucketArgs.builder().bucket(targetBucket).build());// }// 检查源桶中的文件是否存在boolean objectExists = false; try { minioClient.statObject( StatObjectArgs.builder() .bucket(minIOBackupConfig.getBucketName()) .object(sourceObjectPath) .build() ); objectExists = true; // 如果没有抛出异常,则对象存在} catch (ErrorResponseException e) { if ("NoSuchKey".equals(e.errorResponse().code())) { System.err.println("源文件不存在: " + minIOBackupConfig.getBucketName() + "/" + sourceObjectPath); return; // 结束方法执行,因为源文件不存在} else { throw new RuntimeException(e); } } catch (Exception e) { throw new RuntimeException(e); } // 如果文件存在,执行备份操作if (objectExists) { try { minioClient.copyObject( CopyObjectArgs.builder() .bucket(minIOBackupConfig.getBucketBackupName()) .object(targetObjectPath) .source( CopySource.builder() .bucket(minIOBackupConfig.getBucketName()) .object(sourceObjectPath) .build() ) .build() ); System.out.println("文件已从 " + minIOBackupConfig.getBucketName() + "/" + sourceObjectPath + " 备份到 " + minIOBackupConfig.getBucketBackupName() + "/" + targetObjectPath); } catch (InsufficientDataException | InternalException | InvalidKeyException | InvalidResponseException | IOException | NoSuchAlgorithmException | ServerException | XmlParserException | ErrorResponseException e) { throw new RuntimeException(e); } } } }