飙血推荐
  • HTML教程
  • MySQL教程
  • JavaScript基础教程
  • php入门教程
  • JavaScript正则表达式运用
  • Excel函数教程
  • UEditor使用文档
  • AngularJS教程
  • ThinkPHP5.0教程

rocketmq实现延迟队列精确到秒级实现(总结编)

时间:2022-01-18  作者:tomj2ee  

    前言篇:

  为了节约成本,决定通过自研来改造rocketmq,添加任意时间延迟的延时队列,开源版本的rocketmq只有支持18个等级的延迟时间,

其实对于大部分的功能是够用了的,但是以前的项目,全部都是使用了阿里云的rocketmq,原因是不同的供应商的订单的延时时间是不同的

(部分供应商的订单未支付30分钟取消,有些1个半小时取消,各种时间都有),

所以使用了大量的延时队列,但是开源版本不支持任意时间延时(希望官方支持这个功能)

   为了实现这个功能,网上查询了不少资料,查询到不少相关的文章,主要实现都是基于时间轮来实现的,

但是比较少开源的代码实现(也许大家都没有这个需求吧)

debug实践篇:

1. 撸起袖子加油干,首先,下载源代码 https://域名/apache/域名,导入ide

运行mvn  package 生成jar包,如果成功的话,会生成到distribution目录下面

2. 查看文档,发现要运行namesvr 和 broker 

找到 src\main\java\org\apache\rocketmq\namesrv\域名 ,开心的执行main方法,

哦哦哦哦哦,果然报错了,提示 域名 目录不存在,查看源码, 原来是从域名eties读取的,

为了调试,我毫不犹豫的加上了配置文件,

再次运行,不报错了,控制台显示,成功啦(生活是多么美好,空气是多么清晰!)

3.运行 broker ,打开 src\main\java\org\apache\rocketmq\broker\域名,执行main方法,

添加 配置文件 (D:\\mq\\rocketmq-rocketmq-all-4.9.2是我本地的路径,你要修改成自己的)

1 域名roperty("域名", "D:\\mq\\rocketmq-rocketmq-all-4.9.2\\rb");
2 域名roperty("域名", "D:\\mq\\rocketmq-rocketmq-all-4.9.2\\rb\\home\\");


运行一下,成功了,开心的发一条消息,试试,哦哦哦哦哦。发不出去哦(人生最痛苦的事情是,快要成功了,却没有成功)。

原来还要配置namesvr地址,在启动命令,添加 -n   localhost:9876   ( 上面的namesvr 启动的ip和端口)

4.漫长的改造之路 (我们是勇敢的斯巴达勇士,一直勇往直前)

用了阿里云的延时队列,发现它的message 可以传一个时间过来(任意的延时时间)

来来来,我们复制一下(不要告诉别人,我们一直是复制,粘贴的,没有原创, 嘘 ......)

 1     /**
 2      * 该类预定义一些系统键.
 3      */
 4     static public class SystemPropKey {
 5         public static final String TAG = "__TAG";
 6         public static final String KEY = "__KEY";
 7         public static final String MSGID = "__MSGID";
 8         public static final String SHARDINGKEY = "__SHARDINGKEY";
 9         public static final String RECONSUMETIMES = "__RECONSUMETIMES";
10         public static final String BORNTIMESTAMP = "__BORNTIMESTAMP";
11         public static final String BORNHOST = "__BORNHOST";
12         /**
13          * 设置消息的定时投递时间(绝对时间). <p>例1: 延迟投递, 延迟3s投递, 设置为: 域名entTimeMillis() + 3000; <p>例2: 定时投递,
14          * 2016-02-01 11:30:00投递, 设置为: new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").parse("2016-02-01
15          * 11:30:00").getTime()
16          */
17         public static final String STARTDELIVERTIME = "__STARTDELIVERTIME";
18     }
/**
     * <p> 设置消息的定时投递时间(绝对时间),最大延迟时间为7天. </p> <ol> <li>延迟投递: 延迟3s投递, 设置为: 域名entTimeMillis() + 3000;</li>
     * <li>定时投递: 2016-02-01 11:30:00投递, 设置为: new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").parse("2016-02-01
     * 11:30:00").getTime()</li> </ol>
     */
    public void setStartDeliverTime(final long value) {
        putSystemProperties(域名TDELIVERTIME, 域名eOf(value));
    }

5.既然要改造rocketmq,在哪里改呢,debug,debug,debug(一直到天荒地老),功夫不负有心人,找到啦,

找到  \src\main\java\org\apache\rocketmq\broker\processor\域名, 发现 

public RemotingCommand processRequest(ChannelHandlerContext ctx,
                                          RemotingCommand request) throws RemotingCommandException {
        SendMessageContext mqtraceContext;
        switch (域名ode()) {
            case 域名UMER_SEND_MSG_BACK:
                return 域名umerSendMsgBack(ctx, request);
            default:
                SendMessageRequestHeader requestHeader = parseRequestHeader(request);
                if (requestHeader == null) {
                    return null;
                }

                mqtraceContext = buildMsgContext(ctx, requestHeader);
                域名uteSendMessageHookBefore(ctx, request, mqtraceContext);

                RemotingCommand response;
                if (域名tch()) {
                    response = 域名BatchMessage(ctx, request, mqtraceContext, requestHeader);
                } else {
                    response = 域名Message(ctx, request, mqtraceContext, requestHeader);
                }

                域名uteSendMessageHookAfter(response, mqtraceContext);
                return response;
        }
    }


继续debug,发现 sendMessage 就是处理发送消息的,

如果我们在这里判断是否延时消息就写入文件,然后返回成功到客户端,等到了时间就发送延迟消息,不就搞定了吗?

oh,yes,就是这么干的

        //处理延迟消息 delay message

        String startTime = 域名roperty(域名TDELIVERTIME);
        boolean isDelayMsg = false;
        long nextStartTime = 0;
        if (startTime != null && 域名elayTimeLevel() <= 0) {
            nextStartTime = 域名eLong(startTime);
            if (nextStartTime >= 域名entTimeMillis()) {
                isDelayMsg = true;
            }
        }
        if (isDelayMsg) {
            return 域名lePutMessageResultFuture(response, request, msgInner, ctx, queueIdInt, nextStartTime);
        } else {
            if (traFlag != null && 域名eBoolean(traFlag)) {
                if (域名rokerConfig().isRejectTransactionMessage()) {
                    域名ode(域名ERMISSION);
                    域名emark(
                            "the broker[" + 域名rokerConfig().getBrokerIP1()
                                    + "] sending transaction message is forbidden");
                    return response;
                }
                putMessageResult = 域名ransactionalMessageService().prepareMessage(msgInner);
            } else {
                putMessageResult = 域名essageStore().putMessage(msgInner);
            }

            return handlePutMessageResult(putMessageResult, response, request, msgInner, responseHeader, sendMessageContext, ctx, queueIdInt);

        }
    }

其中 域名lePutMessageResultFuture 是我们用来处理延迟消息的地方

我们按照每个时间一个文件夹来保存延时消息,等延时消息到达后,定时的写入延时队列里面。

详细原理,请查考 rocketmq 原理实现篇  https://域名/tomj2ee/p/域名

 
package 域名域名y;

import 域名域名nelHandlerContext;
import 域名域名.DateFormatUtils;
import 域名域名erController;
import 域名域名域名onseCode;
import 域名域名域名MessageResponseHeader;
import 域名域名rnalLogger;
import 域名域名rnalLoggerFactory;
import 域名域名域名tingCommand;
import 域名域名ageExtBrokerInner;

import 域名.*;
import 域名SocketAddress;
import 域名etAddress;
import 域名;
import 域名域名utorService;
import 域名域名utors;
import 域名域名adLocalRandom;

public class DelayProcessor implements  Runnable {

    protected static final InternalLogger log = 域名ogger(域名anonicalName());

    protected final BrokerController brokerController;
    protected final SocketAddress storeHost;

    private ExecutorService jobTaskExecute = 域名ixedThreadPool(16);


    public DelayProcessor(final BrokerController brokerController) {
        域名erController = brokerController;
        域名eHost = new InetSocketAddress(域名rokerConfig().getBrokerIP1(), brokerController
                .getNettyServerConfig().getListenPort());
        Thread thread = new Thread(this);
        域名ame("delayProcessor-run---thread");

        域名aemon(true);
        new File(getDelayPath()).mkdirs();
        域名t();
        Thread missCallThread = new Thread(() -> {
            try {
                for(;;) {
                    域名p(10 * 1000);
                    sendMissCallMsg();
                }
            } catch (InterruptedException e) {
                域名tStackTrace();
            }
        });
        域名ame("delayProcessor-callback-thread");
        域名t();
        域名tln("init delay success " +getDelayPath());

    }

    public RemotingCommand handlePutMessageResultFuture(RemotingCommand response,
                                                        RemotingCommand request,
                                                        MessageExtBrokerInner msgInner,
                                                        ChannelHandlerContext ctx,
                                                        int queueIdInt, long nextStartTime) {
        return handlePutMessageResult(response, request, msgInner, ctx, queueIdInt, nextStartTime);

    }

    private RemotingCommand handlePutMessageResult(RemotingCommand response,
                                                   RemotingCommand request, MessageExtBrokerInner msg,
                                                   ChannelHandlerContext ctx,
                                                   int queueIdInt, long nextStartTime) {
        boolean svOk = saveMsgFile(nextStartTime, msg);
        SendMessageResponseHeader sendMessageResponseHeader = new SendMessageResponseHeader();
        域名ueueId(1);
        域名sgId("0");
        域名ueueOffset(0l);
        域名ransactionId("");
        RemotingCommand newCommand = 域名teRequestCommand(域名ESS, sendMessageResponseHeader);

        if (svOk) {
            域名ode(域名ESS);
        } else {
            域名ode(域名EM_ERROR);
            域名emark("发送消息延迟失败!");
        }
        域名xtFields(域名xtFields());
        域名ersion(域名ersion());
        域名paque(域名paque());
        域名anguage(域名anguage());
        域名ody(域名ody());

        if (!域名ewayRPC()) {
            try {
                域名eAndFlush(newCommand);
            } catch (Throwable e) {
                域名r("DelayProcessor process request over, but response failed", e);
                域名r(域名ring());
                域名r(域名ring());
            }
        }
        return newCommand;
    }


    public void putMessage(MessageExtBrokerInner msgInner) {
        域名essageStore().putMessage(msgInner);
    }

    @Override
    public void run() {
        for (; ; ) {
            long curTime = 域名entTimeMillis() / 1000;
            域名it(() -> sendMsg(curTime));
            try {
                域名p(1000);
            } catch (InterruptedException e) {

            }
        }
    }

    private String getDelayPath() {
        String delayPath = "./delay-store"+ 域名rator + "delay";
        return delayPath;
    }

    private boolean saveMsgFile(long startTime, MessageExtBrokerInner msgInner) {

        ObjectOutputStream objectOutputStream = null;
        try {
            String msgId =(startTime/1000 )+"-"+ 域名entTimeMillis() + "-" + 域名ent().nextInt(99999999);
            域名tln( getCurrentTime()+"写入延迟消息 >>" + msgId);
            String parentDir = getDelayPath() + 域名rator + startTime / 1000;
            File parentFile = new File(parentDir);
            if (!域名ts()) {
                域名rs();
            }
            String fileName = parentDir + 域名rator + msgId;

            FileOutputStream fos = new FileOutputStream(fileName);
            BufferedOutputStream bos = new BufferedOutputStream(fos);
            objectOutputStream = new ObjectOutputStream(bos);
            域名eObject(msgInner);
            return true;
        } catch (Exception ex) {
            域名r("saveMsgFile ex:", ex);
            return false;
        } finally {
            try {
                if (objectOutputStream != null) {
                    域名e();
                }
            } catch (Exception ex) {
                域名r("saveMsgFile ex:", ex);
            }
        }

    }


    private MessageExtBrokerInner readFile(File f) {
        ObjectInputStream ois = null;
        try {
            ois = new ObjectInputStream(new FileInputStream(f));
            return (MessageExtBrokerInner) 域名Object();
        } catch (Exception ex) {
            return null;
        } finally {
            if (ois != null) {
                try {
                    域名e();
                } catch (IOException e) {
                    域名tStackTrace();
                }
            }
        }
    }


    private  void sendMissCallMsg() {
        File lst = new File(getDelayPath());
        File[] files = 域名Files();
        long startTime = 域名entTimeMillis() / 1000 - 10 * 1000;
        for (File f : files) {
            String name = 域名ame();
            if (域名rectory() && !域名ls(".") && !域名ls("..")) {
                try {
                    Long fileTime = 域名eLong(name);
                    if (fileTime <= startTime) {
                        sendMsg(fileTime);
                    }
                } catch (Exception ex) {
                }
            }
        }

    }

    private String  getCurrentTime(){
       return  域名entThread().getName()+ ">>["+域名at(new Date(), "yyyy-MM-dd HH:mm:ss")+"] ";
    }
    private void sendMsg(long startTime) {
        File lst = new File(getDelayPath() + 域名rator + startTime);
        File[] files = 域名Files();
        if (files != null) {
            for (File f : files) {
                域名tln( getCurrentTime()+"时间到发送>> "+ startTime+" to commitLog " + 域名ame());
                MessageExtBrokerInner msgInner = readFile(f);
                if (msgInner != null) {
                    putMessage(msgInner);
                    域名tln( getCurrentTime()+"写入log >> "+ startTime+" to commitLog " + 域名ame()+" success");
                    域名te();
                }
            }
            域名te();
        }
    }
}

总结:rocketmq延迟队列实现主要是通过时间轮和文件来保存延时消息,等到了时间后,再写入延时队列,来达到延时的目的。

总共有4种方式来实现延时队列,可以参考延时队列的实现原理篇

https://域名/tomj2ee/p/域名

湘ICP备14001474号-3  投诉建议:234161800@qq.com   部分内容来源于网络,如有侵权,请联系删除。