RabbitMQ的元数据重建

RabbitMQ的元数据重建

1.概述

对于RabbitMQ运维层面来说,扩容和迁移是必不可少。扩容比较简单,一般往集群中加入新的机器节点即可,不过新的机器节点中是没有消息的,如果想要新加入的节点能快速的存储消息还是需要做点小手术的。不过这是后话,本文的主要内容是迁移,而迁移的首要工作就是为新的集群重建原集群的元数据。

重建RabbitMQ元数据,说白了就是在新的集群上重新创建exchange、queue以及彼此的binding关系。当然最好连policy,vhost,users等都能重建。

本文介绍三种重建元数据的方法:

  1. 程序化重建,即编写程序制成可执行jar包。
  2. 使用WEB UI进行重建
  3. 使用http API重建

2.使用程序化重建

程序化重建之前首先要准备原集群的元数据,包括exchange、queue、bindingkey、exchange类型。

示例元数据如下(保存成文本文件metadata.txt):

exchange.migrate.demo1 queue.migrate.demo1 demo1 direct
exchange.migrate.demo2 queue.migrate.demo2 demo2 direct

注:彼此之间用空格隔开,最后一个exchange类型可以缺省,缺省值为direct。

我们的程序首先会读取这个元数据文本,然后保存在内存之中,方便之后创建。这里与这个元数据对应的类为BindingObject。详细代码如下:

public class BindingObject {
private String channel;
private String queue;
private String routingKey;
private String exchangeType;

public BindingObject(String channel, String queue, String routingKey) {
super();
this.channel = channel;
this.queue = queue;
this.routingKey = routingKey;
this.exchangeType = "direct";
}

public BindingObject(String channel, String queue, String routingKey,
String exchangeType) {
super();
this.channel = channel;
this.queue = queue;
this.routingKey = routingKey;
this.exchangeType = exchangeType;
}

//此处省略各个成员变量的Getter和Setter方法

@Override
public String toString(){
return "[channel="+channel+", queue="+queue+", routingKey="+routingKey+",exchangeType="+exchangeType+"]";
}
}

之后建立主程序——RmqMetadataRebuild.java。最后打包成jar包,我们取名为rebuild.jar

在真正创建的时候调用:

java -jar rebuild.jar connection=192.168.0.2:5672,192.168.0.3:5672 filename=/root/util/metadata.txt username=root password=root vhost=/

其中有两个参数是必须的,即connection和filename。username、password、vhost可以根据实际情况修改代码来实现其缺省值。

RmqMetadataRebuild.java完整代码如下:

package com.vms.rabbitmq.rebuild;

import java.io.BufferedReader;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeoutException;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.rabbitmq.client.Address;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class RmqMetadataRebuild {
private static final Logger logger = LoggerFactory.getLogger(RmqMetadataRebuild.class);

private static List<IpPortKV> addressList = new ArrayList<IpPortKV>();
private static String username = "root";
private static String password = "root";
private static String vhost = "/";
private static String filename = null;

private static Connection connection = null;
private static Channel channel = null;

//xxx.jar [connection=] [filename=] [username=] [password=] [vhost=] [filename=]
//connection=192.168.0.2:5672,192.168.0.3:5672 filename=/root/util/metadata.txt username=root password=root vhost=/
public static void main(String[] args) {
logger.debug("begin rebuild rabbitmq metadata....");
for(int i=0;i<args.length;i++){
logger.debug("{}",args[i]);
}

if(!args[0].startsWith("connection=")){
logger.error("no connection parameters!");
printTipInfo();
System.exit(1);
}

try {
parseConnection(args[0]);
} catch (Exception e) {
logger.error("{}",e);
System.exit(1);
}

if(addressList.size()<1){
logger.error("no connection parameters!");
printTipInfo();
System.exit(1);
}

if(!args[1].startsWith("filename=")){
logger.error("no rebuild metadata file!");
printTipInfo();
System.exit(1);
}
filename = args[1].substring("filename=".length());

if(args.length>2){
for(int i=2;i<args.length;i++){
if(args[i].startsWith("username=")){
username = args[i].substring("username=".length());
}else if(args[i].startsWith("password=")){
password = args[i].substring("password=".length());
}else if(args[i].startsWith("vhost=")){
vhost =args[i].substring("vhost=".length());
}
}
}

logger.debug("addressList={}",addressList);
logger.debug("filename={}",filename);
logger.debug("username={}",username);
logger.debug("password={}",password);
logger.debug("vhost={}",vhost);

RmqMetadataRebuild rmr = new RmqMetadataRebuild();

try {
rmr.start();
} catch (IOException e) {
logger.error("{}",e);
rmr.shutdown();
} catch (TimeoutException e) {
logger.error("{}",e);
rmr.shutdown();
}

try {
List<BindingObject> list = rmr.getBindingList(filename);
for(BindingObject bindingObject: list){
String exchange = bindingObject.getChannel();
String queue = bindingObject.getQueue();
String rk = bindingObject.getRoutingKey();
String exchangeType = bindingObject.getExchangeType();
channel.exchangeDeclare(exchange, exchangeType,true,false,null);
channel.queueDeclare(queue, true, false, false, null);
channel.queueBind(queue, exchange, rk);
}
logger.info("rebuild rabbitmq metadata successfully!");

} catch (IOException e) {
logger.error("{}",e);
} finally{
rmr.shutdown();
}
}

private void start() throws IOException, TimeoutException{
int addressNum = addressList.size();
Address[] addresses = new Address[addressNum];
for(int i=0;i<addressNum;i++){
IpPortKV ipPortKV = addressList.get(i);
Address address = new Address(ipPortKV.getIp(),ipPortKV.getPort());
addresses[i] = address;
}

ConnectionFactory factory = new ConnectionFactory();
factory.setUsername(username);
factory.setPassword(password);
factory.setVirtualHost(vhost);
connection = factory.newConnection(addresses);
channel = connection.createChannel();
logger.info("connection and channel create successfully....");
}

private void shutdown(){
try {
channel.close();
connection.close();
} catch (IOException e) {
logger.error("{}",e);
} catch (TimeoutException e) {
logger.error("{}",e);
}
}

private static void parseConnection(String connection){
String addresses = connection.substring("connection=".length());
String addressArray[] = addresses.split(",");
for(String address:addressArray){
String ipPortArray[] = address.split(":");
IpPortKV ipPortKV = new IpPortKV(ipPortArray[0],Integer.parseInt(ipPortArray[1]));
addressList.add(ipPortKV);
}
}

private List<BindingObject> getBindingList(String fileName) throws IOException{
List<BindingObject> list = new ArrayList<BindingObject>();

FileInputStream fis = new FileInputStream(fileName);
InputStreamReader isr = new InputStreamReader(fis);
BufferedReader br = new BufferedReader(isr);
String str = null;
while((str = br.readLine())!=null){
String[] tempBindArray = str.split(" ");
if(tempBindArray.length>=3){
BindingObject bindingObject = new BindingObject(tempBindArray[0],tempBindArray[1],tempBindArray[2]);
if(tempBindArray.length==4){
bindingObject.setExchangeType(tempBindArray[3]);
}
list.add(bindingObject);
}
}
fis.close();
isr.close();
br.close();

return list;
}

private static void printTipInfo(){
System.out.println("use like this: xxx.jar [connection=] [filename=] [username=] [password=] [vhost=] [filename=]");
System.out.println("connection and filename is necessary.");
System.out.println("use demo: connection=192.168.0.2:5672,192.168.0.3:5672 filename=/root/util/metadata.txt username=root password=root vhost=/ ");
System.out.println("please try it again....");
}
}

其中的IpPortKV是用来解析connection时做一下缓存过渡的。参考代码如下:

public class IpPortKV {
private String ip;
private int port;

//此处省略各个成员变量的Getter和Setter方法

@Override
public String toString(){
return "[ip="+ip+", port="+port+"]";
}
}

最后将项目打成可执行jar包即可。注意这里还用到了slf4j-log4j,可以删除相关的代码,也可以导入相关的jar包即可运行。

上面的代码中并没有重建users、policy、vhost等元数据,如果需要重建这些信息需要丰富一样整个代码。或者直接选用下面的方式。


3. 使用WEB UI重建

这个相对于上面的重建方式而言显得非常的简单方便。前提是开启了rabbitmq_management插件(rabbtimq-plugins enable rabbitmq_management),并且有可以WEB UI的管理员用户,具备可配置、可读、可写的权限。

在WEB UI的Overview页面下方可以找到:

只需要在原集群的WEB UI中下载(左边“Download broker definitions”)元数据配置文件,然后再导入到新集群的WEB UI中即可(上图右边“Upload broker defintions”)。

元数据配置文件是一个json文件,可以参考下面的内容:

{
"rabbit_version": "3.5.7",
"users": [
{
"name": "guest",
"password_hash": "8oKfdYGw1Ivr91EvK53S9cR9s0=",
"tags": "administrator"
},
{
"name": "root",
"password_hash": "XQrOsQGncx5aX/QVLSe5CmM7FE=",
"tags": "administrator"
}
],
"vhosts": [
{
"name": "/"
},
{
"name": "default"
}
],
"permissions": [
{
"user": "root",
"vhost": "default",
"configure": ".*",
"write": ".*",
"read": ".*"
},
{
"user": "root",
"vhost": "/",
"configure": ".*",
"write": ".*",
"read": ".*"
},
{
"user": "guest",
"vhost": "/",
"configure": ".*",
"write": ".*",
"read": ".*"
}
],
"parameters": [],
"policies": [
{
"vhost": "default",
"name": "policy.migrate",
"pattern": "^queue",
"apply-to": "queues",
"definition": {
"ha-mode": "exactly",
"ha-params": 2,
"ha-sync-mode": "automatic"
},
"priority": 0
}
],
"queues": [
{
"name": "queue.migrate.demo",
"vhost": "default",
"durable": true,
"auto_delete": false,
"arguments": {}
}
],
"exchanges": [
{
"name": "exchange.migrate.demo",
"vhost": "default",
"type": "direct",
"durable": true,
"auto_delete": false,
"internal": false,
"arguments": {}
}
],
"bindings": [
{
"source": "exchange.migrate.demo",
"vhost": "default",
"destination": "queue.migrate.demo",
"destination_type": "queue",
"routing_key": "demo",
"arguments": {}
}
]
}

由上可知,配置文件中包含rabbit_version,users, vhosts, permissions, parameters, policies, queues, exchanges,bindings等内容,概括了RabbitMQ所涉及的所有元数据配置。

如果备份集群中已有元数据与导入的元数据冲突,则导入的元数据会覆盖;如果没有冲突,则会保留。

这种重建元数据的方法简单、方便、高效,但是有个问题值得注意,那就是不同的RabbitMQ版本之间的元数据配置可能会不兼容,如果无法解决,那就只能采用第一种程序化的重建方式。如果原集群由于某种原因无法启动,那么此种方法也无法奏效,不过可以定时备份这些元数据(或者在元数据有变更时备份)来得到解决。


4. Http API的方式重建

Http API的重建方式和上面的WEB UI方式相同,都是基于RabbitMQ元数据配置文件的,只不过图形化界面操作封装了一下Http API(https://rawcdn.githack.com/rabbitmq/rabbitmq-management/rabbitmq_v3_6_10/priv/www/api/index.html)。

获取元数据配置json的命令:

wget --user root --password root http://192.168.0.2:15672/api/definitions -O /root/util/rabbit_source.json

然后通过Http Post的方式将rabbit_source.json文件上传到新的备份集群中:

curl -T /root/util/rabbit_source.json -X POST -u root:root -H "Content-Type: application/json" http://192.168.0.3:15672/api/definitions

对于Http API的重建方式,当然也可以使用HttpClient进行操作,而非上面的命令行的形式。


欢迎支持笔者的作品《深入理解Kafka: 核心设计与实践原理》和《RabbitMQ实战指南》,同时欢迎关注笔者的微信公众号:朱小厮的博客(ID: hiddenkafka)。
本文作者: 朱小厮

评论

Your browser is out-of-date!

Update your browser to view this website correctly. Update my browser now

×