领导:谁再用定时任务实现关闭订单,立马滚蛋
lipiwang 2024-12-01 00:44 7 浏览 0 评论
领导:谁再用定时任务实现关闭订单,立马滚蛋!
在电商、支付等领域,往往会有这样的场景,用户下单后放弃支付了,那这笔订单会在指定的时间段后进行关闭操作,细心的你一定发现了像某宝、某东都有这样的逻辑,而且时间很准确,误差在1s内;那他们是怎么实现的呢?
文章转载:乐字节
一般的做法有如下几种
- 定时任务关闭订单
- rocketmq延迟队列
- rabbitmq死信队列
- 时间轮算法
- redis过期监听
一、定时任务关闭订单(最low)
一般情况下,最不推荐的方式就是关单方式就是定时任务方式,原因我们可以看下面的图来说明
我们假设,关单时间为下单后10分钟,定时任务间隔也是10分钟;通过上图我们看出,如果在第1分钟下单,在第20分钟的时候才能被扫描到执行关单操作,这样误差达到10分钟,这在很多场景下是不可接受的,另外需要频繁扫描主订单号造成网络IO和磁盘IO的消耗,对实时交易造成一定的冲击,所以PASS
二、rocketmq延迟队列方式
延迟消息 生产者把消息发送到消息服务器后,并不希望被立即消费,而是等待指定时间后才可以被消费者消费,这类消息通常被称为延迟消息。 在RocketMQ开源版本中,支持延迟消息,但是不支持任意时间精度的延迟消息,只支持特定级别的延迟消息。 消息延迟级别分别为1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h,共18个级别。
发送延迟消息(生产者)
/**
* 推送延迟消息
* @param topic
* @param body
* @param producerGroup
* @return boolean
*/
public boolean sendMessage(String topic, String body, String producerGroup)
{
try
{
Message recordMsg = new Message(topic, body.getBytes());
producer.setProducerGroup(producerGroup);
//设置消息延迟级别,我这里设置14,对应就是延时10分钟
// "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h"
recordMsg.setDelayTimeLevel(14);
// 发送消息到一个Broker
SendResult sendResult = producer.send(recordMsg);
// 通过sendResult返回消息是否成功送达
log.info("发送延迟消息结果:======sendResult:{}", sendResult);
DateFormat format =new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
log.info("发送时间:{}", format.format(new Date()));
return true;
}
catch (Exception e)
{
e.printStackTrace();
log.error("延迟消息队列推送消息异常:{},推送内容:{}", e.getMessage(), body);
}
return false;
}
消费延迟消息(消费者)
/**
* 接收延迟消息
*
* @param topic
* @param consumerGroup
* @param messageHandler
*/
public void messageListener(String topic, String consumerGroup, MessageListenerConcurrently messageHandler)
{
ThreadPoolUtil.execute(() ->
{
try
{
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer();
consumer.setConsumerGroup(consumerGroup);
consumer.setVipChannelEnabled(false);
consumer.setNamesrvAddr(address);
//设置消费者拉取消息的策略,*表示消费该topic下的所有消息,也可以指定tag进行消息过滤
consumer.subscribe(topic, "*");
//消费者端启动消息监听,一旦生产者发送消息被监听到,就打印消息,和rabbitmq中的handlerDelivery类似
consumer.registerMessageListener(messageHandler);
consumer.start();
log.info("启动延迟消息队列监听成功:" + topic);
}
catch (MQClientException e)
{
log.error("启动延迟消息队列监听失败:{}", e.getErrorMessage());
System.exit(1);
}
});
}
实现监听类,处理具体逻辑
/**
* 延迟消息监听
*
*/
@Component
public class CourseOrderTimeoutListener implements ApplicationListener<ApplicationReadyEvent>
{
@Resource
private MQUtil mqUtil;
@Resource
private CourseOrderTimeoutHandler courseOrderTimeoutHandler;
@Override
public void onApplicationEvent(ApplicationReadyEvent applicationReadyEvent)
{
// 订单超时监听
mqUtil.messageListener(EnumTopic.ORDER_TIMEOUT, EnumGroup.ORDER_TIMEOUT_GROUP, courseOrderTimeoutHandler);
}
}
/**
* 实现监听
*/
@Slf4j
@Component
public class CourseOrderTimeoutHandler implements MessageListenerConcurrently
{
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
for (MessageExt msg : list)
{
// 得到消息体
String body = new String(msg.getBody());
JSONObject userJson = JSONObject.parseObject(body);
TCourseBuy courseBuyDetails = JSON.toJavaObject(userJson, TCourseBuy.class);
// 处理具体的业务逻辑,,,,,
DateFormat format =new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
log.info("消费时间:{}", format.format(new Date()));
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
}
这种方式相比定时任务好了很多,但是有一个致命的缺点,就是延迟等级只有18种(商业版本支持自定义时间),如果我们想把关闭订单时间设置在15分钟该如何处理呢?显然不够灵活。
三、rabbitmq死信队列的方式
Rabbitmq本身是没有延迟队列的,只能通过Rabbitmq本身队列的特性来实现,想要Rabbitmq实现延迟队列,需要使用Rabbitmq的死信交换机(Exchange)和消息的存活时间TTL(Time To Live)
死信交换机 一个消息在满足如下条件下,会进死信交换机,记住这里是交换机而不是队列,一个交换机可以对应很多队列。
一个消息被Consumer拒收了,并且reject方法的参数里requeue是false。也就是说不会被再次放在队列里,被其他消费者使用。 上面的消息的TTL到了,消息过期了。
队列的长度限制满了。排在前面的消息会被丢弃或者扔到死信路由上。 死信交换机就是普通的交换机,只是因为我们把过期的消息扔进去,所以叫死信交换机,并不是说死信交换机是某种特定的交换机
消息TTL(消息存活时间) 消息的TTL就是消息的存活时间。RabbitMQ可以对队列和消息分别设置TTL。对队列设置就是队列没有消费者连着的保留时间,也可以对每一个单独的消息做单独的设置。超过了这个时间,我们认为这个消息就死了,称之为死信。如果队列设置了,消息也设置了,那么会取值较小的。所以一个消息如果被路由到不同的队列中,这个消息死亡的时间有可能不一样(不同的队列设置)。这里单讲单个消息的TTL,因为它才是实现延迟任务的关键。
byte[] messageBodyBytes = "Hello, world!".getBytes();
AMQP.BasicProperties properties = new AMQP.BasicProperties();
properties.setExpiration("60000");
channel.basicPublish("my-exchange", "queue-key", properties, messageBodyBytes);
可以通过设置消息的expiration字段或者x-message-ttl属性来设置时间,两者是一样的效果。只是expiration字段是字符串参数,所以要写个int类型的字符串:当上面的消息扔到队列中后,过了60秒,如果没有被消费,它就死了。不会被消费者消费到。这个消息后面的,没有“死掉”的消息对顶上来,被消费者消费。死信在队列中并不会被删除和释放,它会被统计到队列的消息数中去
处理流程图
创建交换机(Exchanges)和队列(Queues)
创建死信交换机
如图所示,就是创建一个普通的交换机,这里为了方便区分,把交换机的名字取为:delay
创建自动过期消息队列 这个队列的主要作用是让消息定时过期的,比如我们需要2小时候关闭订单,我们就需要把消息放进这个队列里面,把消息过期时间设置为2小时
创建一个一个名为delay_queue1的自动过期的队列,当然图片上面的参数并不会让消息自动过期,因为我们并没有设置x-message-ttl参数,如果整个队列的消息有消息都是相同的,可以设置,这里为了灵活,所以并没有设置,另外两个参数x-dead-letter-exchange代表消息过期后,消息要进入的交换机,这里配置的是delay,也就是死信交换机,x-dead-letter-routing-key是配置消息过期后,进入死信交换机的routing-key,跟发送消息的routing-key一个道理,根据这个key将消息放入不同的队列
创建消息处理队列 这个队列才是真正处理消息的队列,所有进入这个队列的消息都会被处理
消息队列的名字为delay_queue2 消息队列绑定到交换机 进入交换机详情页面,将创建的2个队列(delayqueue1和delayqueue2)绑定到交换机上面
自动过期消息队列的routing key 设置为delay 绑定delayqueue2
delayqueue2 的key要设置为创建自动过期的队列的x-dead-letter-routing-key参数,这样当消息过期的时候就可以自动把消息放入delay_queue2这个队列中了 绑定后的管理页面如下图:
当然这个绑定也可以使用代码来实现,只是为了直观表现,所以本文使用的管理平台来操作 发送消息
String msg = "hello word";
MessageProperties messageProperties = newMessageProperties();
messageProperties.setExpiration("6000");
messageProperties.setCorrelationId(UUID.randomUUID().toString().getBytes());
Message message = newMessage(msg.getBytes(), messageProperties);
rabbitTemplate.convertAndSend("delay", "delay",message);
设置了让消息6秒后过期 注意:因为要让消息自动过期,所以一定不能设置delay_queue1的监听,不能让这个队列里面的消息被接受到,否则消息一旦被消费,就不存在过期了
接收消息 接收消息配置好delay_queue2的监听就好了
package wang.raye.rabbitmq.demo1;
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
publicclassDelayQueue{
/** 消息交换机的名字*/
publicstaticfinalString EXCHANGE = "delay";
/** 队列key1*/
publicstaticfinalString ROUTINGKEY1 = "delay";
/** 队列key2*/
publicstaticfinalString ROUTINGKEY2 = "delay_key";
/**
* 配置链接信息
* @return
*/
@Bean
publicConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory = newCachingConnectionFactory("120.76.237.8",5672);
connectionFactory.setUsername("kberp");
connectionFactory.setPassword("kberp");
connectionFactory.setVirtualHost("/");
connectionFactory.setPublisherConfirms(true); // 必须要设置
return connectionFactory;
}
/**
* 配置消息交换机
* 针对消费者配置
FanoutExchange: 将消息分发到所有的绑定队列,无routingkey的概念
HeadersExchange :通过添加属性key-value匹配
DirectExchange:按照routingkey分发到指定队列
TopicExchange:多关键字匹配
*/
@Bean
publicDirectExchange defaultExchange() {
returnnewDirectExchange(EXCHANGE, true, false);
}
/**
* 配置消息队列2
* 针对消费者配置
* @return
*/
@Bean
publicQueue queue() {
returnnewQueue("delay_queue2", true); //队列持久
}
/**
* 将消息队列2与交换机绑定
* 针对消费者配置
* @return
*/
@Bean
@Autowired
publicBinding binding() {
returnBindingBuilder.bind(queue()).to(defaultExchange()).with(DelayQueue.ROUTINGKEY2);
}
/**
* 接受消息的监听,这个监听会接受消息队列1的消息
* 针对消费者配置
* @return
*/
@Bean
@Autowired
publicSimpleMessageListenerContainer messageContainer2(ConnectionFactory connectionFactory) {
SimpleMessageListenerContainer container = newSimpleMessageListenerContainer(connectionFactory());
container.setQueues(queue());
container.setExposeListenerChannel(true);
container.setMaxConcurrentConsumers(1);
container.setConcurrentConsumers(1);
container.setAcknowledgeMode(AcknowledgeMode.MANUAL); //设置确认模式手工确认
container.setMessageListener(newChannelAwareMessageListener() {
publicvoid onMessage(Message message, com.rabbitmq.client.Channel channel) throwsException{
byte[] body = message.getBody();
System.out.println("delay_queue2 收到消息 : "+ newString(body));
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); //确认消息成功消费
}
});
return container;
}
}
这种方式可以自定义进入死信队列的时间;是不是很完美,但是有的小伙伴的情况是消息中间件就是rocketmq,公司也不可能会用商业版,怎么办?那就进入下一节
四、时间轮算法
(1)创建环形队列,例如可以创建一个包含3600个slot的环形队列(本质是个数组)
(2)任务集合,环上每一个slot是一个Set 同时,启动一个timer,这个timer每隔1s,在上述环形队列中移动一格,有一个Current Index指针来标识正在检测的slot。
Task结构中有两个很重要的属性: (1)Cycle-Num:当Current Index第几圈扫描到这个Slot时,执行任务 (2)订单号,要关闭的订单号(也可以是其他信息,比如:是一个基于某个订单号的任务)
假设当前Current Index指向第0格,例如在3610秒之后,有一个订单需要关闭,只需: (1)计算这个订单应该放在哪一个slot,当我们计算的时候现在指向1,3610秒之后,应该是第10格,所以这个Task应该放在第10个slot的Set中 (2)计算这个Task的Cycle-Num,由于环形队列是3600格(每秒移动一格,正好1小时),这个任务是3610秒后执行,所以应该绕3610/3600=1圈之后再执行,于是Cycle-Num=1
Current Index不停的移动,每秒移动到一个新slot,这个slot中对应的Set,每个Task看Cycle-Num是不是0: (1)如果不是0,说明还需要多移动几圈,将Cycle-Num减1 (2)如果是0,说明马上要执行这个关单Task了,取出订单号执行关单(可以用单独的线程来执行Task),并把这个订单信息从Set中删除即可。 (1)无需再轮询全部订单,效率高 (2)一个订单,任务只执行一次 (3)时效性好,精确到秒(控制timer移动频率可以控制精度)
五、redis过期监听
1.修改redis.windows.conf配置文件中notify-keyspace-events的值 默认配置notify-keyspace-events的值为 "" 修改为 notify-keyspace-events Ex 这样便开启了过期事件
2. 创建配置类RedisListenerConfig(配置RedisMessageListenerContainer这个Bean)
package com.zjt.shop.config;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.serializer.GenericJackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;
@Configuration
public class RedisListenerConfig {
@Autowired
private RedisTemplate redisTemplate;
/**
* @return
*/
@Bean
public RedisTemplate redisTemplateInit() {
// key序列化
redisTemplate.setKeySerializer(new StringRedisSerializer());
//val实例化
redisTemplate.setValueSerializer(new GenericJackson2JsonRedisSerializer());
return redisTemplate;
}
@Bean
RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
return container;
}
}
3.继承KeyExpirationEventMessageListener创建redis过期事件的监听类
package com.zjt.shop.common.util;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.baomidou.mybatisplus.core.conditions.update.UpdateWrapper;
import com.zjt.shop.modules.order.service.OrderInfoService;
import com.zjt.shop.modules.product.entity.OrderInfoEntity;
import com.zjt.shop.modules.product.mapper.OrderInfoMapper;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.listener.KeyExpirationEventMessageListener;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.stereotype.Component;
@Slf4j
@Component
public class RedisKeyExpirationListener extends KeyExpirationEventMessageListener {
public RedisKeyExpirationListener(RedisMessageListenerContainer listenerContainer) {
super(listenerContainer);
}
@Autowired
private OrderInfoMapper orderInfoMapper;
/**
* 针对redis数据失效事件,进行数据处理
* @param message
* @param pattern
*/
@Override
public void onMessage(Message message, byte[] pattern) {
try {
String key = message.toString();
//从失效key中筛选代表订单失效的key
if (key != null && key.startsWith("order_")) {
//截取订单号,查询订单,如果是未支付状态则为-取消订单
String orderNo = key.substring(6);
QueryWrapper<OrderInfoEntity> queryWrapper = new QueryWrapper<>();
queryWrapper.eq("order_no",orderNo);
OrderInfoEntity orderInfo = orderInfoMapper.selectOne(queryWrapper);
if (orderInfo != null) {
if (orderInfo.getOrderState() == 0) { //待支付
orderInfo.setOrderState(4); //已取消
orderInfoMapper.updateById(orderInfo);
log.info("订单号为【" + orderNo + "】超时未支付-自动修改为已取消状态");
}
}
}
} catch (Exception e) {
e.printStackTrace();
log.error("【修改支付订单过期状态异常】:" + e.getMessage());
}
}
}
4:测试 通过redis客户端存一个有效时间为3s的订单:
结果:
总结: 以上方法只是个人对于关单的一些想法,可能有些地方有疏漏,请在公众号直接留言进行指出,当然如果你有更好的关单方式也可以随时沟通交流
相关推荐
- 那些加上姓就很美的名字,想要起名的可以参照一些啊
-
刷到是缘分,感谢大家的阅读,希望您能动动小手帮忙点赞,关注,评论。你的支持和鼓励是我前进的动力。在此祝福大家天天快乐,日日开心!0102我们公司新来的实习生叫林晚,第一次听到这名字就觉得特别有韵味。上...
- 手把手教你写出不被研发怼的需求文档
-
产品经理这岗位都多少年了,我还以为早就体系成熟、流程闭环了,所以这个系列我也安心停更了。结果最近被研发同事轮番吐槽:需求文档东漏一句西漏一段,“这也叫专业?”虽然不是在点名骂我,但谁让是我带的队呢,脸...
- 学会这五个练字规律,解决所有笔画长短问题
-
大家好,用5个规律讲清楚什么时候笔画长,什么时候笔画短。·第一、写的时候是短的笔画多,长的笔画少,记住这一个。·第二、同方向上面只能有一个长,不同方向上面可以同时加长,比如横和竖或者撇捺。这里就有一个...
- 不要再用“质疑”的眼光看草书,5个字说明草书促进汉字简化发展
-
【问题思考】我们知道简体字改革之前,我们用的是繁体字,比如“会”字一直写作“會”,那么是什么依据如是作出简化呢?而在简化字改革前的唐代,我们会发现,孙过庭的《书谱》就是写作“会”的样子的,他是穿越了?...
- IDEA如何将工程转为maven工程
-
有时候在使用IDEA编辑器时,从Git上Pull一个maven工程下来是没有自动将工程转换成maven工程,相应的依赖jar包不会自动下载。此时要将它转换成maven工程并自动下载jar包。方法/步骤...
- Maven多模块项目构建实战:打造高效开发的模块化体系
-
Maven多模块项目构建实战:打造高效开发的模块化体系Maven作为Java世界中最为流行的构建工具之一,以其强大的模块化支持能力著称。当你需要管理一个复杂的大规模项目时,合理使用Maven的多模块功...
- Maven常用命令有哪些?
-
Maven是一个强大的项目管理工具,广泛用于构建、管理和部署Java项目。以下是Maven的一些常用命令,这些命令可以帮助开发者完成项目的清理、编译、测试、打包和部署等任务。所有命令都以m...
- SpringBoot项目jar、war包启动解析
-
一、jar包和war包的区别1.1war包war包是JavaWeb应用程序的一种打包方式符合Servlet标准,它是WebArchive的缩写,主要用于存储Web应用程序相关的文件,包括Java...
- Maven多模块项目构建:打造高效协作的软件工程基石
-
Maven多模块项目构建:打造高效协作的软件工程基石在软件开发的世界里,随着项目的复杂度不断攀升,单一模块的架构逐渐显得力不从心。这时,Maven多模块项目应运而生,它像一座桥梁,将不同的功能模块有机...
- 如何将 Spring Boot 工程打包成独立的可执行 JAR 包
-
导语:通过将SpringBoot项目打包成独立的可执行JAR包,可以方便地在任何支持Java环境的机器上运行项目。本文将详细介绍如何通过Maven构建插件将SpringBoot...
- java maven 工具初步使用
-
安装与配置下载Maven访问官网https://maven.apache.org下载最新版,解压到本地目录(如C:\maven)。配置环境变量添加以下环境变量:M2_HOME:Maven安...
- Maven工程如何使用非Maven仓库jar包
-
使用Maven之前,一直都是自己手工在网上搜索需要的jar包,然后添加到工程中。以这样的方式开发,工作了好多年,曾经以为以后也会一直这样下去。直到碰上Maven,用了第一次,就抛弃老方法了。Maven...
- 从原理和源码梳理Springboot FatJar 的机制
-
一、概述SpringBootFatJar的设计,打破了标准jar的结构,在jar包内携带了其所依赖的jar包,通过jar中的main方法创建自己的类加载器,来识别加载运行其不规...
- Maven初步——Maven的下载、配环境、换源、编译及运行
-
一.maven的基本概念相关定义:Maven是一个项目管理和构建工具,用于构建、发布和管理Java项目,用它的好处很多比如自动帮我们管理包依赖等。1.Maven的四大特性1.Maven引入了一个...
- 「曹工杂谈」Maven和Tomcat能有啥联系呢,都穿打补丁的衣服吗
-
前奏我们上篇文章,跟大家说了下,怎么调试maven插件的代码,注意,是插件的代码。插件,是要让主框架来执行的,主框架是谁呢,就是mavencore,可以称之为maven核心吧。maven核心,类似于...
你 发表评论:
欢迎- 一周热门
- 最近发表
- 标签列表
-
- maven镜像 (69)
- undefined reference to (60)
- zip格式 (63)
- oracle over (62)
- date_format函数用法 (67)
- 在线代理服务器 (60)
- shell 字符串比较 (74)
- x509证书 (61)
- localhost (65)
- java.awt.headless (66)
- syn_sent (64)
- settings.xml (59)
- 弹出窗口 (56)
- applicationcontextaware (72)
- my.cnf (73)
- httpsession (62)
- pkcs7 (62)
- session cookie (63)
- java 生成uuid (58)
- could not initialize class (58)
- beanpropertyrowmapper (58)
- word空格下划线不显示 (73)
- jar文件 (60)
- jsp内置对象 (58)
- makefile编写规则 (58)