一、功能介绍
要实现一个消息的定时发送功能,也就是让消息可以在某一天某一个时间具体节点进行发送。而我们公司的业务场景是类似短信的业务,而且数量不小,用户会进行号码、消息内容、定时发送时间等信息的提交。等到了设定的定时时间,则进行消息的发送工作。
二、思考实现逻辑
前提准备:
MySQL
RocketMQ,最好broker开启队列自动创建的配置
刚开始我想的是基于MySQL去实现定时发送,后来觉得这种扫描方式单线程的时候并发能力不够,多线程也得需要做并发控制,还得做一些任务的调度,比如线程执行一半卡死或者异常的时候,还得做任务的补偿工作。所以,后面我选择了通过组件的方式实现,这是我想到了消息队列的延迟发送功能,刚好我司系统用到了RocketMQ,我就想通过该组件实现该消息的定时发送功能。RocketMQ的文件默认只保存72小时,这个要注意一下,有需要调整时间的需要调整一下。我也不建议调整的太大,这样容易引起系统的资源浪费。
具体实现逻辑如下:
1)用户创建消息,记录到数据库(号码,内容,定时发送时间)
2)如果是是可以立即发送的消息,发送消息到RocketMQ的立即发送主题(TOPIC_NOW_SEND)中,后续有线程消费该消息,直接进行数据发送。
3)如果是需要定时发送的消息,发送消息到RocketMQ的延迟发送主题(TOPIC_DELAY_SEND)中。
这里有一些细节设置:
tags:可以用于同一主题下区分不同类型的消息。此处可以设定为当前日期(我是用的是YYYYMMDD,例如:),因为设定为需要发送的当天日期后,程序就可以通过该字段分辨是否需要处理。后续消费者就可以通过当天日期筛选需要当天发送的消息,过滤掉不是当天需要发送的消息。
keys:索引,可以用于查询消息。我们设定的格式可以是,例如:FS123456+01
4)启动两个(或者多个,这个和文件保存的时间有关系)消费者进行轮换,这里我称之为masterConsumer(启动时负责消费前一天的数据)和slaveConsumer(启动时负责消费当天的数据)。这里我设计的初衷是每天有自己的消费者,例如消费tags=的数据,消费tags=的数据。这里需要消费前一天的数据原因是,需要防止出现晚上23:59分提交消息的时候,当天没处理完。所以,需要在第二天进行额外的补偿操作。
程序启动第1天(例如当天是):
masterConsumer:MASTER_CONSUMER_TOPIC_DELAY_SEND
消费tags=
slaveConsumer:SLAVE_CONSUMER_TOPIC_DELAY_SEND
消费tags=
程序启动第2天(例如当天是):
slaveConsumer:SLAVE_CONSUMER_TOPIC_DELAY_SEND
消费tags=
masterConsumer:MASTER_CONSUMER_TOPIC_DELAY_SEND
消费tags=
程序启动第3天(例如当天是):
masterConsumer:MASTER_CONSUMER_TOPIC_DELAY_SEND
消费tags=
slaveConsumer:SLAVE_CONSUMER_TOPIC_DELAY_SEND
消费tags=
程序启动第4天(例如当天是):
slaveConsumer:SLAVE_CONSUMER_TOPIC_DELAY_SEND
消费tags=
masterConsumer:MASTER_CONSUMER_TOPIC_DELAY_SEND
消费tags=
......
程序的masterConsumer和slaveConsumer就这样持续轮换消费数据。
5)消费者的消费数据逻辑:
消费者需要设置consumeFromWhere参数=CONSUME_FROM_FIRST_OFFSET(默认是ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET,从最新的位置开始消费),这样数据才可以第一次启动从队列的最前位置开始消费。
消费到数据之后,获取keys的内容,根据之前发送的设定获取消息定时发送的时间戳,然后和当前的时间进行比对。这里要比对的主要原因是当前RocketMQ不支持任意时间的延迟。生产者发送延迟消息前需要设置几个固定的延迟级别,分别对应1s到2h的1到18个延迟级,消息消费失败会进入延迟消息队列,消息发送时间与设置的延迟级别和重试次数有关。
当前支持的消息延迟级别有:
privateStringmessageDelayLevel="1s5s10s30s1m2m3m4m5m6m7m8m9m10m20m30m1h2h";
如果比对时间=1秒,则直接发送到RocketMQ的立即发送主题(TOPIC_NOW_SEND)中,后续有线程进行消费发送;否则,发送时间根据RocketMQhi吃的延迟级别进行选择,按照最大可支持的时间为准,等待后续的消息重新消费,直到最终消息符合条件,可以投递到RocketMQ的立即发送主题(TOPIC_NOW_SEND)为止。例如:如果比对时间2h,则重新投递到延迟发送主题(TOPIC_DELAY_SEND)中,延迟发送的等级为18(也就是2h)。如果比对时间2m,比对时间3m,则重新投递到延迟发送主题(TOPIC_DELAY_SEND)中,延迟发送的等级为6(也就是2m)。
6)在第4步骤中的masterConsumer和slaveConsumer的轮换逻辑支持,即在过了0点之后,前一天的consumer需要进行重置。
以上就是我的实现逻辑。
三、敲代码
1、定义延迟级别与时间的对应关系
packagecn.lxw.mq.constant;importjava.util.Arrays;publicenumMessageDelayLevel{SECOND_1(1,1*L),SECOND_5(2,5*L),SECOND_10(3,10*L),SECOND_30(4,30*L),MINUTE_1(5,1*60*L),MINUTE_2(6,2*60*L),MINUTE_3(7,3*60*L),MINUTE_4(8,4*60*L),MINUTE_5(9,5*60*L),MINUTE_6(10,6*60*L),MINUTE_7(11,7*60*L),MINUTE_8(12,8*60*L),MINUTE_9(13,9*60*L),MINUTE_10(14,10*60*L),MINUTE_20(15,20*60*L),MINUTE_30(16,30*60*L),HOUR_1(17,1*60*60*L),HOUR_2(18,2*60*60*L),;//1s5s10s30s1m2m3m4m5m6m7m8m9m10m20m30m1h2hprivateIntegerlevel;privateLongmills;MessageDelayLevel(Integerlevel,Longmills){this.level=level;this.mills=mills;}publicIntegergetLevel(){returnlevel;}publicvoidsetLevel(Integerlevel){this.level=level;}publicLonggetMills(){returnmills;}publicvoidsetMills(Longmills){this.mills=mills;}/***功能描述:br*〈计算需等待的时间,最长2小时〉*
Param:[timeMills]*Return:{linkMessageDelayLevel}*Author:luoxw*Date:/7/:33*/publicstaticMessageDelayLevelgetMaxLevel(longtimeMills){longmillsBwt=timeMills-System.currentTimeMillis();if(millsBwtL){millsBwt=L;}finallongparamMills=millsBwt;returnArrays.asList(MessageDelayLevel.values()).stream().filter(p-p.mills.packagecn.lxw.task;importcn.hutool.core.thread.ThreadUtil;import
publicSend_DelayMessageTask(){
try{StringrocketMQAddr=AppEnvContext.getPropValue("rocketmq.address");defaultSendProducer=ProducerUtil.init(rocketMQAddr,"PRODUCER_TOPIC_NOW_SEND");defaultDelayProducer=ProducerUtil.init(rocketMQAddr,"PRODUCER_TOPIC_DELAY_SEND");//主从消费者交替进行数据消费。//初始化的时候,主消费者消费昨天(1月1日)的数据,从消费者消费今天(1月2日)的数据。intconsumeThreadMin=AppEnvContext.getPropValueIntOrDefault("delay.consumer.consumeThreadMin",10);intconsumeThreadMax=AppEnvContext.getPropValueIntOrDefault("delay.consumer.consumeThreadMax",10);intconsumeMessageBatchMaxSize=AppEnvContext.getPropValueIntOrDefault("delay.consumer.consumeMessageBatchMaxSize",10);masterDelayConsumer=Consumer.getInstance("TOPIC_DELAY_SEND","MASTER_CONSUMER_TOPIC_DELAY_SEND",rocketMQAddr,lastDate,consumeThreadMin,consumeThreadMax,consumeMessageBatchMaxSize);slaveDelayConsumer=Consumer.getInstance("TOPIC_DELAY_SEND","SLAVE_CONSUMER_TOPIC_DELAY_SEND",rocketMQAddr,currentDate,consumeThreadMin,consumeThreadMax,consumeMessageBatchMaxSize);}catch(Exceptione){log.error(LogConst.PREFIX+"初始化异常",e);}}//消费者重启privatevoidrestartConsumer(DefaultMQPushConsumerconsumer,StringconsumerName)throwsException{StringrocketMQAddr=AppEnvContext.getPropValue("rocketmq.address");consumer.shutdown();intconsumeThreadMin=AppEnvContext.getPropValueIntOrDefault("delay.consumer.consumeThreadMin",10);intconsumeThreadMax=AppEnvContext.getPropValueIntOrDefault("delay.consumer.consumeThreadMax",10);intconsumeMessageBatchMaxSize=AppEnvContext.getPropValueIntOrDefault("delay.consumer.consumeMessageBatchMaxSize",10);consumer=Consumer.getInstance("TOPIC_DELAY_SEND",consumerName,rocketMQAddr,currentDate,consumeThreadMin,consumeThreadMax,consumeMessageBatchMaxSize);MessageListenerConcurrentlylistener=getDelayListener();consumer.registerMessageListener(listener);consumer.start();closeCnt.incrementAndGet();}//重新初始化消费者privatesynchronizedvoidreInitComsumer(){try{StringthisDate=DateUtil.operateDate(newDate(),0);intlastCloseCnt=closeCnt.get();//初始化的时候不执行and时间没跨度的时候不执行,只有当第二天的时候才执行数据if(lastCloseCnt0!thisDate.equals(currentDate)){//获取今天的时间currentDate=thisDate;//如果时间对2取余的值为1,则更新主消费者,之前的昨天(1月1日)更新为第二天(1月3日)的数据if(closeCnt.get()%2==1){restartConsumer(masterDelayConsumer,"MASTER_CONSUMER_TOPIC_DELAY_SEND");}//如果时间对2取余的值为0,则更新从消费者,之前的今天(1月2日)更新为第三天(1月4日)的数据if(closeCnt.get()%2==0){restartConsumer(slaveDelayConsumer,"SLAVE_CONSUMER_TOPIC_DELAY_SEND");}}}catch(Exceptione){log.error(LogConst.PREFIX+"消费者重新初始化异常",e);}}//消费逻辑privateMessageListenerConcurrentlygetDelayListener(){MessageListenerConcurrentlylistener=(ListMessageExtlist,ConsumeConcurrentlyContextcontext)-{for(MessageExtmsg:list){StringmsgBody=null;try{msgBody=newString(msg.getBody());StringmsgKeys=msg.getKeys();//订单号+发送时间戳String[]split=msgKeys.split("\\+");if(split.length==2){StringdelaySendTimeStr=split[1];longtime=DateUtil.parseStr2TimeMills(delaySendTimeStr);//时间比对,获取最大支持的延迟级别MessageDelayLevelmaxLevel=MessageDelayLevel.getMaxLevel(time);Integerlevel=maxLevel.getLevel();//如果延迟级别=1,也就是小于等于1秒的时候,直接发送到立即发送主题(TOPIC_NOW_SEND)if(MessageDelayLevel.SECOND_1.getLevel().equals(level)){SendResultsendResult=ProducerUtil.syncSend(defaultSendProducer,"TOPIC_NOW_SEND",msg.getTags(),msg.getKeys(),msgBody);log.info(LogConst.PREFIX+"定时消息[{}]发送到[立即发送主题]结果:{}",msgBody,sendResult);}else{SendResultsendResult=ProducerUtil.syncSendDelay(defaultDelayProducer,"TOPIC_DELAY_SEND",msg.getTags(),msg.getKeys(),level,msgBody);log.info(LogConst.PREFIX+"定时消息[{}]发送到[延迟发送主题]结果:{}",msgBody,sendResult);}}}catch(Exceptionex){log.error(LogConst.PREFIX+"定时消息[{}]发送异常",msgBody,ex);}}returnConsumeConcurrentlyStatus.CONSUME_SUCCESS;};returnlistener;}
3、其余的逻辑就不展示了,可以根据自己实际业务情况进行处理。
四、总结
这是我实现的一个方式,其实是不太完美的,因为量大的时候性能也是有问题的,因为根据RocketMQ文件保存的时间(默认72小时)因素决定。如果保存的时间太长,消息太多,而我们的程序第一次启动从队列的最前位置开始消费,此时的消息数据是巨大的。不管是对RocketMQ组件,还是对我们程序都是一个压力,而且目前的消息过滤功能只能在客户端进行处理。所以,这里我建议的是一天一个主题,以定时发送的当天进行发送。例如:需要发送的数据用TOPIC_DELAY_SEND_主题;需要发送的数据用TOPIC_DELAY_SEND_主题。这样的话,数据消费的时候也不需要通过tags进行过滤了,只需要通过keys的时间戳数据进行判断,消费逻辑大致和文中写法一样,只需要调整延迟发送的主题名为,发送和轮换的延迟发送主题也需要根据发送时间进行调整。
以上思路仅供各位参考使用,有更好的实现方式可以在下方进行留言。谢谢大家的观看!衷心感谢!
来源: