rocketmq实现延迟队列精确到秒级实现(总结编)
前言篇:
为了节约成本,决定通过自研来改造rocketmq,添加任意时间延迟的延时队列,开源版本的rocketmq只有支持18个等级的延迟时间,
其实对于大部分的功能是够用了的,但是以前的项目,全部都是使用了阿里云的rocketmq,原因是不同的供应商的订单的延时时间是不同的
(部分供应商的订单未支付30分钟取消,有些1个半小时取消,各种时间都有),
所以使用了大量的延时队列,但是开源版本不支持任意时间延时(希望官方支持这个功能)
为了实现这个功能,网上查询了不少资料,查询到不少相关的文章,主要实现都是基于时间轮来实现的,
但是比较少开源的代码实现(也许大家都没有这个需求吧)
debug实践篇:
1. 撸起袖子加油干,首先,下载源代码 https://域名/apache/域名,导入ide
运行mvn package 生成jar包,如果成功的话,会生成到distribution目录下面
2. 查看文档,发现要运行namesvr 和 broker
找到 src\main\java\org\apache\rocketmq\namesrv\域名 ,开心的执行main方法,
哦哦哦哦哦,果然报错了,提示 域名 目录不存在,查看源码, 原来是从域名eties读取的,
为了调试,我毫不犹豫的加上了配置文件,
再次运行,不报错了,控制台显示,成功啦(生活是多么美好,空气是多么清晰!)
3.运行 broker ,打开 src\main\java\org\apache\rocketmq\broker\域名,执行main方法,
添加 配置文件 (D:\\mq\\rocketmq-rocketmq-all-4.9.2是我本地的路径,你要修改成自己的)
1 域名roperty("域名", "D:\\mq\\rocketmq-rocketmq-all-4.9.2\\rb"); 2 域名roperty("域名", "D:\\mq\\rocketmq-rocketmq-all-4.9.2\\rb\\home\\");
运行一下,成功了,开心的发一条消息,试试,哦哦哦哦哦。发不出去哦(人生最痛苦的事情是,快要成功了,却没有成功)。
原来还要配置namesvr地址,在启动命令,添加 -n localhost:9876 ( 上面的namesvr 启动的ip和端口)
4.漫长的改造之路 (我们是勇敢的斯巴达勇士,一直勇往直前)
用了阿里云的延时队列,发现它的message 可以传一个时间过来(任意的延时时间)
来来来,我们复制一下(不要告诉别人,我们一直是复制,粘贴的,没有原创, 嘘 ......)
1 /** 2 * 该类预定义一些系统键. 3 */ 4 static public class SystemPropKey { 5 public static final String TAG = "__TAG"; 6 public static final String KEY = "__KEY"; 7 public static final String MSGID = "__MSGID"; 8 public static final String SHARDINGKEY = "__SHARDINGKEY"; 9 public static final String RECONSUMETIMES = "__RECONSUMETIMES"; 10 public static final String BORNTIMESTAMP = "__BORNTIMESTAMP"; 11 public static final String BORNHOST = "__BORNHOST"; 12 /** 13 * 设置消息的定时投递时间(绝对时间). <p>例1: 延迟投递, 延迟3s投递, 设置为: 域名entTimeMillis() + 3000; <p>例2: 定时投递, 14 * 2016-02-01 11:30:00投递, 设置为: new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").parse("2016-02-01 15 * 11:30:00").getTime() 16 */ 17 public static final String STARTDELIVERTIME = "__STARTDELIVERTIME"; 18 }
/** * <p> 设置消息的定时投递时间(绝对时间),最大延迟时间为7天. </p> <ol> <li>延迟投递: 延迟3s投递, 设置为: 域名entTimeMillis() + 3000;</li> * <li>定时投递: 2016-02-01 11:30:00投递, 设置为: new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").parse("2016-02-01 * 11:30:00").getTime()</li> </ol> */ public void setStartDeliverTime(final long value) { putSystemProperties(域名TDELIVERTIME, 域名eOf(value)); }
5.既然要改造rocketmq,在哪里改呢,debug,debug,debug(一直到天荒地老),功夫不负有心人,找到啦,
找到 \src\main\java\org\apache\rocketmq\broker\processor\域名, 发现
public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { SendMessageContext mqtraceContext; switch (域名ode()) { case 域名UMER_SEND_MSG_BACK: return 域名umerSendMsgBack(ctx, request); default: SendMessageRequestHeader requestHeader = parseRequestHeader(request); if (requestHeader == null) { return null; } mqtraceContext = buildMsgContext(ctx, requestHeader); 域名uteSendMessageHookBefore(ctx, request, mqtraceContext); RemotingCommand response; if (域名tch()) { response = 域名BatchMessage(ctx, request, mqtraceContext, requestHeader); } else { response = 域名Message(ctx, request, mqtraceContext, requestHeader); } 域名uteSendMessageHookAfter(response, mqtraceContext); return response; } }
继续debug,发现 sendMessage 就是处理发送消息的,
如果我们在这里判断是否延时消息就写入文件,然后返回成功到客户端,等到了时间就发送延迟消息,不就搞定了吗?
oh,yes,就是这么干的
//处理延迟消息 delay message String startTime = 域名roperty(域名TDELIVERTIME); boolean isDelayMsg = false; long nextStartTime = 0; if (startTime != null && 域名elayTimeLevel() <= 0) { nextStartTime = 域名eLong(startTime); if (nextStartTime >= 域名entTimeMillis()) { isDelayMsg = true; } } if (isDelayMsg) { return 域名lePutMessageResultFuture(response, request, msgInner, ctx, queueIdInt, nextStartTime); } else { if (traFlag != null && 域名eBoolean(traFlag)) { if (域名rokerConfig().isRejectTransactionMessage()) { 域名ode(域名ERMISSION); 域名emark( "the broker[" + 域名rokerConfig().getBrokerIP1() + "] sending transaction message is forbidden"); return response; } putMessageResult = 域名ransactionalMessageService().prepareMessage(msgInner); } else { putMessageResult = 域名essageStore().putMessage(msgInner); } return handlePutMessageResult(putMessageResult, response, request, msgInner, responseHeader, sendMessageContext, ctx, queueIdInt); } }
其中 域名lePutMessageResultFuture 是我们用来处理延迟消息的地方
我们按照每个时间一个文件夹来保存延时消息,等延时消息到达后,定时的写入延时队列里面。
详细原理,请查考 rocketmq 原理实现篇 https://域名/tomj2ee/p/域名
package 域名域名y; import 域名域名nelHandlerContext; import 域名域名.DateFormatUtils; import 域名域名erController; import 域名域名域名onseCode; import 域名域名域名MessageResponseHeader; import 域名域名rnalLogger; import 域名域名rnalLoggerFactory; import 域名域名域名tingCommand; import 域名域名ageExtBrokerInner; import 域名.*; import 域名SocketAddress; import 域名etAddress; import 域名; import 域名域名utorService; import 域名域名utors; import 域名域名adLocalRandom; public class DelayProcessor implements Runnable { protected static final InternalLogger log = 域名ogger(域名anonicalName()); protected final BrokerController brokerController; protected final SocketAddress storeHost; private ExecutorService jobTaskExecute = 域名ixedThreadPool(16); public DelayProcessor(final BrokerController brokerController) { 域名erController = brokerController; 域名eHost = new InetSocketAddress(域名rokerConfig().getBrokerIP1(), brokerController .getNettyServerConfig().getListenPort()); Thread thread = new Thread(this); 域名ame("delayProcessor-run---thread"); 域名aemon(true); new File(getDelayPath()).mkdirs(); 域名t(); Thread missCallThread = new Thread(() -> { try { for(;;) { 域名p(10 * 1000); sendMissCallMsg(); } } catch (InterruptedException e) { 域名tStackTrace(); } }); 域名ame("delayProcessor-callback-thread"); 域名t(); 域名tln("init delay success " +getDelayPath()); } public RemotingCommand handlePutMessageResultFuture(RemotingCommand response, RemotingCommand request, MessageExtBrokerInner msgInner, ChannelHandlerContext ctx, int queueIdInt, long nextStartTime) { return handlePutMessageResult(response, request, msgInner, ctx, queueIdInt, nextStartTime); } private RemotingCommand handlePutMessageResult(RemotingCommand response, RemotingCommand request, MessageExtBrokerInner msg, ChannelHandlerContext ctx, int queueIdInt, long nextStartTime) { boolean svOk = saveMsgFile(nextStartTime, msg); SendMessageResponseHeader sendMessageResponseHeader = new SendMessageResponseHeader(); 域名ueueId(1); 域名sgId("0"); 域名ueueOffset(0l); 域名ransactionId(""); RemotingCommand newCommand = 域名teRequestCommand(域名ESS, sendMessageResponseHeader); if (svOk) { 域名ode(域名ESS); } else { 域名ode(域名EM_ERROR); 域名emark("发送消息延迟失败!"); } 域名xtFields(域名xtFields()); 域名ersion(域名ersion()); 域名paque(域名paque()); 域名anguage(域名anguage()); 域名ody(域名ody()); if (!域名ewayRPC()) { try { 域名eAndFlush(newCommand); } catch (Throwable e) { 域名r("DelayProcessor process request over, but response failed", e); 域名r(域名ring()); 域名r(域名ring()); } } return newCommand; } public void putMessage(MessageExtBrokerInner msgInner) { 域名essageStore().putMessage(msgInner); } @Override public void run() { for (; ; ) { long curTime = 域名entTimeMillis() / 1000; 域名it(() -> sendMsg(curTime)); try { 域名p(1000); } catch (InterruptedException e) { } } } private String getDelayPath() { String delayPath = "./delay-store"+ 域名rator + "delay"; return delayPath; } private boolean saveMsgFile(long startTime, MessageExtBrokerInner msgInner) { ObjectOutputStream objectOutputStream = null; try { String msgId =(startTime/1000 )+"-"+ 域名entTimeMillis() + "-" + 域名ent().nextInt(99999999); 域名tln( getCurrentTime()+"写入延迟消息 >>" + msgId); String parentDir = getDelayPath() + 域名rator + startTime / 1000; File parentFile = new File(parentDir); if (!域名ts()) { 域名rs(); } String fileName = parentDir + 域名rator + msgId; FileOutputStream fos = new FileOutputStream(fileName); BufferedOutputStream bos = new BufferedOutputStream(fos); objectOutputStream = new ObjectOutputStream(bos); 域名eObject(msgInner); return true; } catch (Exception ex) { 域名r("saveMsgFile ex:", ex); return false; } finally { try { if (objectOutputStream != null) { 域名e(); } } catch (Exception ex) { 域名r("saveMsgFile ex:", ex); } } } private MessageExtBrokerInner readFile(File f) { ObjectInputStream ois = null; try { ois = new ObjectInputStream(new FileInputStream(f)); return (MessageExtBrokerInner) 域名Object(); } catch (Exception ex) { return null; } finally { if (ois != null) { try { 域名e(); } catch (IOException e) { 域名tStackTrace(); } } } } private void sendMissCallMsg() { File lst = new File(getDelayPath()); File[] files = 域名Files(); long startTime = 域名entTimeMillis() / 1000 - 10 * 1000; for (File f : files) { String name = 域名ame(); if (域名rectory() && !域名ls(".") && !域名ls("..")) { try { Long fileTime = 域名eLong(name); if (fileTime <= startTime) { sendMsg(fileTime); } } catch (Exception ex) { } } } } private String getCurrentTime(){ return 域名entThread().getName()+ ">>["+域名at(new Date(), "yyyy-MM-dd HH:mm:ss")+"] "; } private void sendMsg(long startTime) { File lst = new File(getDelayPath() + 域名rator + startTime); File[] files = 域名Files(); if (files != null) { for (File f : files) { 域名tln( getCurrentTime()+"时间到发送>> "+ startTime+" to commitLog " + 域名ame()); MessageExtBrokerInner msgInner = readFile(f); if (msgInner != null) { putMessage(msgInner); 域名tln( getCurrentTime()+"写入log >> "+ startTime+" to commitLog " + 域名ame()+" success"); 域名te(); } } 域名te(); } } }
总结:rocketmq延迟队列实现主要是通过时间轮和文件来保存延时消息,等到了时间后,再写入延时队列,来达到延时的目的。
总共有4种方式来实现延时队列,可以参考延时队列的实现原理篇
https://域名/tomj2ee/p/域名