博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
45. 源代码阅读-RocketMQ-tools
阅读量:6263 次
发布时间:2019-06-22

本文共 13858 字,大约阅读时间需要 46 分钟。

一. 简要介绍

RocketMQ-tools分为3部分

  1. admin
  2. command
  3. monitor

下面一一介绍

二. admin

提供了管理操作接口

三. command

command提供了命令行控制MQ的一些方法。

启动方法

进入RocketMQ安装目录,执行sh bin/mqadmin

有几个参数的执行方法

1. sh bin/mqadmin

打印命令提示,如下:

The most commonly used mqadmin commands are:   updateTopic          Update or create topic   deleteTopic          Delete topic from broker and NameServer.   updateSubGroup       Update or create subscription group   deleteSubGroup       Delete subscription group from broker.   updateBrokerConfig   Update broker's config   updateTopicPerm      Update topic perm   topicRoute           Examine topic route info   topicStatus          Examine topic Status info   topicClusterList     get cluster info for topic   brokerStatus         Fetch broker runtime status data   queryMsgById         Query Message by Id   queryMsgByKey        Query Message by Key   queryMsgByUniqueKey  Query Message by Unique key   queryMsgByOffset     Query Message by offset   queryMsgByUniqueKey  Query Message by Unique key   printMsg             Print Message Detail   printMsgByQueue      Print Message Detail   sendMsgStatus        send msg to broker.     ...

2. sh bin/mqadmin help xxx

打印某个命令的提示参数,比如sh bin/mqadmin helop updatetopic(不需要区分大小写)

[root@bogon apache-rocketmq]# sh bin/mqadmin help updatetopicJava HotSpot(TM) 64-Bit Server VM warning: ignoring option PermSize=128m; support was removed in 8.0Java HotSpot(TM) 64-Bit Server VM warning: ignoring option MaxPermSize=128m; support was removed in 8.0usage: mqadmin updateTopic [-b 
] [-c
] [-h] [-n
] [-o
] [-p
] [-r
] [-s
] -t
[-u
] [-w
] -b,--brokerAddr
create topic to which broker -c,--clusterName
create topic to which cluster -h,--help Print help -n,--namesrvAddr
Name server address list, eg: 192.168.0.1:9876;192.168.0.2:9876 -o,--order
set topic's order(true|false -p,--perm
set topic's permission(2|4|6), intro[2:W 4:R; 6:RW] -r,--readQueueNums
set read queue nums -s,--hasUnitSub
has unit sub (true|false -t,--topic
topic name -u,--unit
is unit topic (true|false -w,--writeQueueNums
set write queue nums

3. sh bin/mqadmin xx 具体执行某个命令

比如 sh bin/mqadmin updatetopic -x xxx

源代码分析

首先来看bin/mqadmin这个启动脚本

1. bin/mqadmin

if [ -z "$ROCKETMQ_HOME" ] ; then  ## resolve links - $0 may be a link to maven's home  PRG="$0"  # need this for relative symlinks  while [ -h "$PRG" ] ; do    ls=`ls -ld "$PRG"`    link=`expr "$ls" : '.*-> \(.*\)$'`    if expr "$link" : '/.*' > /dev/null; then      PRG="$link"    else      PRG="`dirname "$PRG"`/$link"    fi  done  saveddir=`pwd`  ROCKETMQ_HOME=`dirname "$PRG"`/..  # make it fully qualified  ROCKETMQ_HOME=`cd "$ROCKETMQ_HOME" && pwd`  cd "$saveddir"fiexport ROCKETMQ_HOMEsh ${ROCKETMQ_HOME}/bin/tools.sh org.apache.rocketmq.tools.command.MQAdminStartup $@

前面在设置ROCKETMQ_HOME环境变量,然后启动MQAdminStartup这个类。

2. MQAdminStartup

public class MQAdminStartup {    protected static List
subCommandList = new ArrayList
(); public static void main(String[] args) { main0(args, null); } public static void main0(String[] args, RPCHook rpcHook) { ... initCommand(); try { initLogback(); switch (args.length) { case 0: printHelp(); break; case 2: if (args[0].equals("help")) { SubCommand cmd = findSubCommand(args[1]); if (cmd != null) { Options options = ServerUtil.buildCommandlineOptions(new Options()); options = cmd.buildCommandlineOptions(options); if (options != null) { ServerUtil.printCommandLineHelp("mqadmin " + cmd.commandName(), options); } } else { System.out.printf("The sub command \'" + args[1] + "\' not exist.%n"); } break; } case 1: default: SubCommand cmd = findSubCommand(args[0]); if (cmd != null) { .... cmd.execute(commandLine, options, rpcHook); } else { System.out.printf("The sub command \'" + args[0] + "\' not exist.%n"); } break; } } ... } public static void initCommand() { initCommand(new UpdateTopicSubCommand()); initCommand(new DeleteTopicSubCommand()); initCommand(new UpdateSubGroupSubCommand()); initCommand(new DeleteSubscriptionGroupCommand()); initCommand(new UpdateBrokerConfigSubCommand()); initCommand(new UpdateTopicPermSubCommand()); initCommand(new TopicRouteSubCommand()); initCommand(new TopicStatusSubCommand()); initCommand(new TopicClusterSubCommand()); initCommand(new BrokerStatusSubCommand()); initCommand(new QueryMsgByIdSubCommand()); initCommand(new QueryMsgByKeySubCommand()); initCommand(new QueryMsgByUniqueKeySubCommand()); initCommand(new QueryMsgByOffsetSubCommand()); initCommand(new QueryMsgByUniqueKeySubCommand()); initCommand(new PrintMessageSubCommand()); initCommand(new PrintMessageByQueueCommand()); initCommand(new SendMsgStatusCommand()); initCommand(new BrokerConsumeStatsSubCommad()); initCommand(new ProducerConnectionSubCommand()); initCommand(new ConsumerConnectionSubCommand()); initCommand(new ConsumerProgressSubCommand()); initCommand(new ConsumerStatusSubCommand()); initCommand(new CloneGroupOffsetCommand()); initCommand(new ClusterListSubCommand()); initCommand(new TopicListSubCommand()); initCommand(new UpdateKvConfigCommand()); initCommand(new DeleteKvConfigCommand()); initCommand(new WipeWritePermSubCommand()); initCommand(new ResetOffsetByTimeCommand()); initCommand(new UpdateOrderConfCommand()); initCommand(new CleanExpiredCQSubCommand()); initCommand(new CleanUnusedTopicCommand()); initCommand(new StartMonitoringSubCommand()); initCommand(new StatsAllSubCommand()); initCommand(new AllocateMQSubCommand()); initCommand(new CheckMsgSendRTCommand()); initCommand(new CLusterSendMsgRTCommand()); initCommand(new GetNamesrvConfigCommand()); initCommand(new UpdateNamesrvConfigCommand()); initCommand(new GetBrokerConfigCommand()); initCommand(new QueryConsumeQueueCommand()); } ...

逻辑很简单,

  • 首先初始化可以支持的Command - initCommand
  • initLogback根据logback_tools.xml初始化,主要是日志系统。
  • 然后根据传入过来的参数,选择不同的执行分支,也就是上面举例的参数。

3. 某个指令执行过程

以updatetopic为例

....cmd.execute(commandLine, options, rpcHook);

进入UpdateTopicSubCommand.execute(xx)方法

@Override    public void execute(final CommandLine commandLine, final Options options,        RPCHook rpcHook) throws SubCommandException {        ...        try {            TopicConfig topicConfig = new TopicConfig();            topicConfig.setReadQueueNums(8);            topicConfig.setWriteQueueNums(8);            topicConfig.setTopicName(commandLine.getOptionValue('t').trim());            // readQueueNums            if (commandLine.hasOption('r')) {                topicConfig.setReadQueueNums(Integer.parseInt(commandLine.getOptionValue('r').trim()));            }            // writeQueueNums            if (commandLine.hasOption('w')) {                topicConfig.setWriteQueueNums(Integer.parseInt(commandLine.getOptionValue('w').trim()));            }            ...            if (commandLine.hasOption('b')) {                String addr = commandLine.getOptionValue('b').trim();                defaultMQAdminExt.start();                defaultMQAdminExt.createAndUpdateTopicConfig(addr, topicConfig);                ...                return;            } else if (commandLine.hasOption('c')) {                String clusterName = commandLine.getOptionValue('c').trim();                defaultMQAdminExt.start();                Set
masterSet = CommandUtil.fetchMasterAddrByClusterName(defaultMQAdminExt, clusterName); for (String addr : masterSet) { defaultMQAdminExt.createAndUpdateTopicConfig(addr, topicConfig); System.out.printf("create topic to %s success.%n", addr); } ... } System.out.printf("%s", topicConfig); return; } ServerUtil.printCommandLineHelp("mqadmin " + this.commandName(), options); } catch (Exception e) { throw new SubCommandException(this.getClass().getSimpleName() + " command failed", e); } finally { defaultMQAdminExt.shutdown(); } }

execute的逻辑就是获取传入的参数,-t是必须的,然后还必须带上-b或者-c才会执行。

如果没有执行,那么会打印updatetopic的参数列表。
-b代表某个broker, -c代表集群

所以大概格式是: sh ./bin/mqadmin updatetopic -t topictest -b xxx

其他参数是可选的。

4. 调用执行

如果确定要更新topic,那么就会调用RocketMQ-cli里面的接口进行更新。

更新成功会打印更新成功输出,否则会报异常。

附上所有命令的操作参数

四. monitor

监控相关

启动类是参数是startMonitoring

sh bin/mqadmin startMonitoring

代码如下:

public class StartMonitoringSubCommand implements SubCommand {    private final Logger log = ClientLogger.getLog();    @Override    public String commandName() {        return "startMonitoring";    }    @Override    public String commandDesc() {        return "Start Monitoring";    }    @Override    public Options buildCommandlineOptions(Options options) {        return options;    }    @Override    public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) throws SubCommandException {        try {            MonitorService monitorService =                new MonitorService(new MonitorConfig(), new DefaultMonitorListener(), rpcHook);            monitorService.start();        } catch (Exception e) {            throw new SubCommandException(this.getClass().getSimpleName() + " command failed", e);        }    }}

五. 命令执行和相关代码分析

获取所有的topic -- topiclist

1. 命令如下

sh bin/mqadmin topiclist -n localhost:9876

2. 结果如下:

huangrongweideMacBook-Pro:apache-rocketmq huangrongwei$ sh bin/mqadmin topiclist -n localhost:9876Java HotSpot(TM) 64-Bit Server VM warning: ignoring option PermSize=128m; support was removed in 8.0Java HotSpot(TM) 64-Bit Server VM warning: ignoring option MaxPermSize=128m; support was removed in 8.0%RETRY%lalaase_rename_unique_group_name_4%RETRY%please_rename_unique_group_name_4huangrongweideMacBook-Pro.local%RETRY%bybybse_rename_unique_group_name_4BenchmarkTestOFFSET_MOVED_EVENT%RETRY%phihomebse_rename_unique_group_name_4wangyuan.freecomm-networks.comTBW102SELF_TEST_TOPICmmmzzzDefaultCluster

3. 参数

  • -n表示要显示连接到的namesrv
  • -c

4. 实现这个功能的类

rocketmq/tools/src/main/java/org/apache/rocketmq/tools/command/topic/TopicListSubCommand.java

5. 代码分析

a. 入口类是MQAdminStartup.main方法,走的是case 1分支,如下:
public static void main0(String[] args, RPCHook rpcHook) {        ...        initCommand();        try {                                ...                case 1:                default:                    SubCommand cmd = findSubCommand(args[0]);                    if (cmd != null) {                        ...                        if (commandLine.hasOption('n')) {                            String namesrvAddr = commandLine.getOptionValue('n');                            System.setProperty(MixAll.NAMESRV_ADDR_PROPERTY, namesrvAddr);                        }                        cmd.execute(commandLine, options, rpcHook);                    } else {                        System.out.printf("The sub command \'" + args[0] + "\' not exist.%n");                    }                    break;            }        } catch (Exception e) {            e.printStackTrace();        }}

如果commandLine包含n参数,也就是如果输入命令有带-n的参数,比如sh bin/mqadmin topiclist -n localhost:9876。那么把它带的参数值拿出来,然后设置到SystemProperty里面。

System.setProperty(MixAll.NAMESRV_ADDR_PROPERTY, namesrvAddr);

这样做是为了其他地方可以拿这个参数,也就是RocketMQ传递namesrv address的时候,不是用普通的参数传递,而是用系统属性。

这样做有好处也有不好的地方吧,好处就是省心,一个地址设置了,很多地方都有可以得到这个值。不需要每个参数传来传去。不好的地方就是如果设置的地方多了,就容易混淆,互相影响。

然后执行command的execute方法, 也就是TopicListSubCommand.execute方法。

cmd.execute(commandLine, options, rpcHook);

所有的command都实现了SubCommand接口。

public class TopicListSubCommand implements SubCommand {}

SubCommand提供了几个接口方法

public interface SubCommand {    String commandName(); //命令名称,用户用这个名称调用这个命令,不区分大小写,比如topiclist。    String commandDesc(); //命令描述    Options buildCommandlineOptions(final Options options); //参数构成    void execute(final CommandLine commandLine, final Options options, RPCHook rpcHook) throws SubCommandException;//命令执行}
b. TopicListSubCommand.execute
public void execute(final CommandLine commandLine, final Options options,        RPCHook rpcHook) throws SubCommandException {        ...        try {            ...            defaultMQAdminExt.start();            if (commandLine.hasOption('c')) {                ClusterInfo clusterInfo = defaultMQAdminExt.examineBrokerClusterInfo();                ...                TopicList topicList = defaultMQAdminExt.fetchAllTopicList();                ...            } else {                ...                TopicList topicList = defaultMQAdminExt.fetchAllTopicList();                for (String topic : topicList.getTopicList()) {                    System.out.printf("%s%n", topic);                }            }        } catch (Exception e) {            throw new SubCommandException(this.getClass().getSimpleName() + " command failed", e);        } finally {            defaultMQAdminExt.shutdown();        }    }

这个类主要就是根据传入的参数是否有-c 或者-n 获取相应的topic,然后打印出来。

以-n为例,它的逻辑就是去namesrv获取所有的topic,然后打印出来。

转载于:https://blog.51cto.com/483181/2043859

你可能感兴趣的文章
简单两步走 中兴V880获取权限方法
查看>>
外部 BLOB 存储体系结构
查看>>
导入文本文件时如何指定字段类型.sql
查看>>
C# 对象二进制序列化
查看>>
收藏的几个好的网站
查看>>
linux中shell变量$#,$@,$*,$?,$0,$1,$2的含义解释
查看>>
前端精选文摘:那些年我们一起清除过的浮动
查看>>
实现一种快速查找Richedit中可见区域内OLE对象的方法
查看>>
Java虚拟机工作原理详解 ( 二 )
查看>>
对象的序列化(Serialization)
查看>>
理解 Glance - 每天5分钟玩转 OpenStack(20)
查看>>
编译pure-ftpd时提示错误Your MySQL client libraries aren't properly installed
查看>>
Impala SQL
查看>>
STL源代码分析--萃取编程(traits)技术的实现
查看>>
Linux ALSA声卡驱动之一:ALSA架构简介【转】
查看>>
为了解决linux配置Nginx 只能关闭防火墙才能访问的问题
查看>>
CentOS7.2 创建本地YUM源和局域网YUM源
查看>>
ubuntu设置root密码及 Xftp连接linux(ubuntu)时提示ssh服务器拒绝了密码,请再试一次...
查看>>
[转]WCF RIA Services
查看>>
R的绘图实例集锦
查看>>