采用OpenReplicator解析MySQL binlog

采用OpenReplicator解析MySQL binlog

Open Replicator是一个用Java编写的MySQL binlog分析程序。Open Replicator 首先连接到MySQL(就像一个普通的MySQL Slave一样),然后接收和分析binlog,最终将分析得出的binlog events以回调的方式通知应用。Open Replicator可以被应用到MySQL数据变化的实时推送,多Master到单Slave的数据同步等多种应用场景。Open Replicator目前只支持MySQL5.0及以上版本。

Open Replicator项目地址:https://github.com/whitesock/open-replicator

binlog事件分析结构图

在阅读下面的内容时,首先需要对binlog有一定的了解,可以 参考《MySQL Binlog解析》。

这里通过open-replicator解析binlog日志事件(binlog-format = row)。binlog日志事件里存在两种操作:DDL和DML,当DDL时输出一条sql,当DML时输出相关行信息。可以参考下面:

DDL(CREATE, ALTER, DROP, TRUNCATE,主要用在定义或改变表的结构):

{
"eventId": 1,
"databaseName": "canal_test",
"tableName": "`company`",
"eventType": 2,
"timestamp": 1477033198000,
"timestampReceipt": 1477033248780,
"binlogName": "mysql-bin.000006",
"position": 353,
"nextPostion": 468,
"serverId": 2,
"before": null,
"after": null,
"isDdl": true,
"sql": "DROP TABLE `company` /* generated by server */"
}

DML(SELECT, UPDATE, INSERT, DELETE,对数据库里的数据进行操作):

{
"eventId": 0,
"databaseName": "canal_test",
"tableName": "person",
"eventType": 24,
"timestamp": 1477030734000,
"timestampReceipt": 1477032161988,
"binlogName": "mysql-bin.000006",
"position": 242,
"nextPostion": 326,
"serverId": 2,
"before": {
"id": "3",
"sex": "f",
"address": "shanghai",
"age": "23",
"name": "zzh3"
},
"after": {
"id": "3",
"sex": "m",
"address": "shanghai",
"age": "23",
"name": "zzh3"
},
"isDdl": false,
"sql": null
}

相关的类文件如下:
CDCEvent.java

package or;

import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;

import com.google.code.or.binlog.BinlogEventV4;
import com.google.code.or.binlog.BinlogEventV4Header;
import com.google.code.or.binlog.impl.event.AbstractBinlogEventV4;

public class CDCEvent {
private long eventId = 0;//事件唯一标识
private String databaseName = null;
private String tableName = null;
private int eventType = 0;//事件类型
private long timestamp = 0;//事件发生的时间戳[MySQL服务器的时间]
private long timestampReceipt = 0;//Open-replicator接收到的时间戳[CDC执行的时间戳]
private String binlogName = null;// binlog file name
private long position = 0;
private long nextPostion = 0;
private long serverId = 0;
private Map<String,String> before = null;
private Map<String,String> after = null;
private Boolean isDdl= null;
private String sql = null;

private static AtomicLong uuid = new AtomicLong(0);
public CDCEvent(){}

public CDCEvent(final AbstractBinlogEventV4 are, String databaseName, String tableName){
this.init(are);
this.databaseName = databaseName;
this.tableName = tableName;
}

private void init(final BinlogEventV4 be){
this.eventId = uuid.getAndAdd(1);
BinlogEventV4Header header = be.getHeader();

this.timestamp = header.getTimestamp();
this.eventType = header.getEventType();
this.serverId = header.getServerId();
this.timestampReceipt = header.getTimestampOfReceipt();
this.position = header.getPosition();
this.nextPostion = header.getNextPosition();
this.binlogName = header.getBinlogFileName();
}

@Override
public String toString(){
StringBuilder builder = new StringBuilder();
builder.append("{ eventId:").append(eventId);
builder.append(",databaseName:").append(databaseName);
builder.append(",tableName:").append(tableName);
builder.append(",eventType:").append(eventType);
builder.append(",timestamp:").append(timestamp);
builder.append(",timestampReceipt:").append(timestampReceipt);
builder.append(",binlogName:").append(binlogName);
builder.append(",position:").append(position);
builder.append(",nextPostion:").append(nextPostion);
builder.append(",serverId:").append(serverId);
builder.append(",isDdl:").append(isDdl);
builder.append(",sql:").append(sql);
builder.append(",before:").append(before);
builder.append(",after:").append(after).append("}");

return builder.toString();
}
// 省略Getter和Setter方法
}

open-replicator的解析主要是通过注册Listener的形式实现的,整个过程最重要的步骤在下面:

InstanceListener.java

package or;

import java.util.HashMap;
import java.util.List;
import java.util.Map;

import or.keeper.TableInfoKeeper;
import or.manager.CDCEventManager;
import or.model.ColumnInfo;
import or.model.TableInfo;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.code.or.binlog.BinlogEventListener;
import com.google.code.or.binlog.BinlogEventV4;
import com.google.code.or.binlog.impl.event.DeleteRowsEvent;
import com.google.code.or.binlog.impl.event.FormatDescriptionEvent;
import com.google.code.or.binlog.impl.event.QueryEvent;
import com.google.code.or.binlog.impl.event.TableMapEvent;
import com.google.code.or.binlog.impl.event.UpdateRowsEvent;
import com.google.code.or.binlog.impl.event.WriteRowsEvent;
import com.google.code.or.binlog.impl.event.XidEvent;
import com.google.code.or.common.glossary.Column;
import com.google.code.or.common.glossary.Pair;
import com.google.code.or.common.glossary.Row;
import com.google.code.or.common.util.MySQLConstants;

public class InstanceListener implements BinlogEventListener{
private static final Logger logger = LoggerFactory.getLogger(InstanceListener.class);

@Override
public void onEvents(BinlogEventV4 be) {
if(be == null){
logger.error("binlog event is null");
return;
}

int eventType = be.getHeader().getEventType();
switch(eventType){
case MySQLConstants.FORMAT_DESCRIPTION_EVENT:
{
logger.trace("FORMAT_DESCRIPTION_EVENT");
break;
}
case MySQLConstants.TABLE_MAP_EVENT://每次ROW_EVENT前都伴随一个TABLE_MAP_EVENT事件,保存一些表信息,如tableId, tableName, databaseName, 而ROW_EVENT只有tableId
{
TableMapEvent tme = (TableMapEvent)be;
TableInfoKeeper.saveTableIdMap(tme);
logger.trace("TABLE_MAP_EVENT:tableId:{}",tme.getTableId());
break;
}
case MySQLConstants.DELETE_ROWS_EVENT:
{
DeleteRowsEvent dre = (DeleteRowsEvent) be;
long tableId = dre.getTableId();
logger.trace("DELETE_ROW_EVENT:tableId:{}",tableId);

TableInfo tableInfo = TableInfoKeeper.getTableInfo(tableId);
String databaseName = tableInfo.getDatabaseName();
String tableName = tableInfo.getTableName();

List<Row> rows = dre.getRows();
for(Row row:rows){
List<Column> before = row.getColumns();
Map<String,String> beforeMap = getMap(before,databaseName,tableName);
if(beforeMap !=null && beforeMap.size()>0){
CDCEvent cdcEvent = new CDCEvent(dre,databaseName,tableName);
cdcEvent.setIsDdl(false);
cdcEvent.setSql(null);
cdcEvent.setBefore(beforeMap);
CDCEventManager.queue.addLast(cdcEvent);
logger.info("cdcEvent:{}",cdcEvent);
}
}
break;
}
case MySQLConstants.UPDATE_ROWS_EVENT:
{
UpdateRowsEvent upe = (UpdateRowsEvent)be;
long tableId = upe.getTableId();
logger.info("UPDATE_ROWS_EVENT:tableId:{}",tableId);

TableInfo tableInfo = TableInfoKeeper.getTableInfo(tableId);
String databaseName = tableInfo.getDatabaseName();
String tableName = tableInfo.getTableName();

List<Pair<Row>> rows = upe.getRows();
for(Pair<Row> p:rows){
List<Column> colsBefore = p.getBefore().getColumns();
List<Column> colsAfter = p.getAfter().getColumns();

Map<String,String> beforeMap = getMap(colsBefore,databaseName,tableName);
Map<String,String> afterMap = getMap(colsAfter,databaseName,tableName);
if(beforeMap!=null && afterMap!=null && beforeMap.size()>0 && afterMap.size()>0){
CDCEvent cdcEvent = new CDCEvent(upe,databaseName,tableName);
cdcEvent.setIsDdl(false);
cdcEvent.setSql(null);
cdcEvent.setBefore(beforeMap);
cdcEvent.setAfter(afterMap);
CDCEventManager.queue.addLast(cdcEvent);
logger.info("cdcEvent:{}",cdcEvent);
}
}
break;
}
case MySQLConstants.WRITE_ROWS_EVENT:
{
WriteRowsEvent wre = (WriteRowsEvent)be;
long tableId = wre.getTableId();
logger.trace("WRITE_ROWS_EVENT:tableId:{}",tableId);

TableInfo tableInfo = TableInfoKeeper.getTableInfo(tableId);
String databaseName = tableInfo.getDatabaseName();
String tableName = tableInfo.getTableName();

List<Row> rows = wre.getRows();
for(Row row: rows){
List<Column> after = row.getColumns();
Map<String,String> afterMap = getMap(after,databaseName,tableName);
if(afterMap!=null && afterMap.size()>0){
CDCEvent cdcEvent = new CDCEvent(wre,databaseName,tableName);
cdcEvent.setIsDdl(false);
cdcEvent.setSql(null);
cdcEvent.setAfter(afterMap);
CDCEventManager.queue.addLast(cdcEvent);
logger.info("cdcEvent:{}",cdcEvent);
}
}
break;
}
case MySQLConstants.QUERY_EVENT:
{
QueryEvent qe = (QueryEvent)be;
TableInfo tableInfo = createTableInfo(qe);
if(tableInfo == null)
break;
String databaseName = tableInfo.getDatabaseName();
String tableName = tableInfo.getTableName();
logger.trace("QUERY_EVENT:databaseName:{},tableName:{}",databaseName,tableName);

CDCEvent cdcEvent = new CDCEvent(qe,databaseName,tableName);
cdcEvent.setIsDdl(true);
cdcEvent.setSql(qe.getSql().toString());

CDCEventManager.queue.addLast(cdcEvent);
logger.info("cdcEvent:{}",cdcEvent);

break;
}
case MySQLConstants.XID_EVENT:{
XidEvent xe = (XidEvent)be;
logger.trace("XID_EVENT: xid:{}",xe.getXid());
break;
}
default:
{
logger.trace("DEFAULT:{}",eventType);
break;
}
}

}

/**
* ROW_EVENT中是没有Column信息的,需要通过MysqlConnection(下面会讲到)的方式读取列名信息,
* 然后跟取回的List<Column>进行映射。
*
* @param cols
* @param databaseName
* @param tableName
* @return
*/
private Map<String,String> getMap(List<Column> cols, String databaseName, String tableName){
Map<String,String> map = new HashMap<>();
if(cols == null || cols.size()==0){
return null;
}

String fullName = databaseName+"."+tableName;
List<ColumnInfo> columnInfoList = TableInfoKeeper.getColumns(fullName);
if(columnInfoList == null)
return null;
if(columnInfoList.size() != cols.size()){
TableInfoKeeper.refreshColumnsMap();
if(columnInfoList.size() != cols.size())
{
logger.warn("columnInfoList.size is not equal to cols.");
return null;
}
}

for(int i=0;i<columnInfoList.size(); i++){
if(cols.get(i).getValue()==null)
map.put(columnInfoList.get(i).getName(),"");
else
map.put(columnInfoList.get(i).getName(), cols.get(i).toString());
}

return map;
}

/**
* 从sql中提取Table信息,因为QUERY_EVENT是对应DATABASE这一级别的,不像ROW_EVENT是对应TABLE这一级别的,
* 所以需要通过从sql中提取TABLE信息,封装到TableInfo对象中
*
* @param qe
* @return
*/
private TableInfo createTableInfo(QueryEvent qe){
String sql = qe.getSql().toString().toLowerCase();

TableInfo ti = new TableInfo();
String databaseName = qe.getDatabaseName().toString();
String tableName = null;
if(checkFlag(sql,"table")){
tableName = getTableName(sql,"table");
} else if(checkFlag(sql,"truncate")){
tableName = getTableName(sql,"truncate");
} else{
logger.warn("can not find table name from sql:{}",sql);
return null;
}
ti.setDatabaseName(databaseName);
ti.setTableName(tableName);
ti.setFullName(databaseName+"."+tableName);

return ti;
}

private boolean checkFlag(String sql, String flag){
String[] ss = sql.split(" ");
for(String s:ss){
if(s.equals(flag)){
return true;
}
}
return false;
}

private String getTableName(String sql, String flag){
String[] ss = sql.split("\\.");
String tName = null;
if (ss.length > 1) {
String[] strs = ss[1].split(" ");
tName = strs[0];
} else {
String[] strs = sql.split(" ");
boolean start = false;
for (String s : strs) {
if (s.indexOf(flag) >= 0) {
start = true;
continue;
}
if (start && !s.isEmpty()) {
tName = s;
break;
}
}
}
tName.replaceAll("`", "").replaceAll(";", "");

//del "("[create table person(....]
int index = tName.indexOf('(');
if(index>0){
tName = tName.substring(0, index);
}

return tName;
}
}

上面所涉及到的TableInfo .java如下:

package or.model;

public class TableInfo {

private String databaseName;
private String tableName;
private String fullName;
// 省略Getter和Setter

@Override
public boolean equals(Object o){
if(this == o)
return true;
if(o == null || this.getClass()!=o.getClass())
return false;
TableInfo tableInfo = (TableInfo)o;
if(!this.databaseName.equals(tableInfo.getDatabaseName()))
return false;
if(!this.tableName.equals(tableInfo.getTableName()))
return false;
if(!this.fullName.equals(tableInfo.getFullName()))
return false;
return true;
}

@Override
public int hashCode(){
int result = this.tableName.hashCode();
result = 31*result+this.databaseName.hashCode();
result = 31*result+this.fullName.hashCode();
return result;
}
}

接着需要有个地方保存从TABLE_MAP_EVENT中提取到的信息,TableInfoKeeper .java

package or.keeper;

import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

import or.MysqlConnection;
import or.model.ColumnInfo;
import or.model.TableInfo;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.code.or.binlog.impl.event.TableMapEvent;

public class TableInfoKeeper {

private static final Logger logger = LoggerFactory.getLogger(TableInfoKeeper.class);

private static Map<Long,TableInfo> tabledIdMap = new ConcurrentHashMap<>();
private static Map<String,List<ColumnInfo>> columnsMap = new ConcurrentHashMap<>();

static{
columnsMap = MysqlConnection.getColumns();
}

public static void saveTableIdMap(TableMapEvent tme){
long tableId = tme.getTableId();
tabledIdMap.remove(tableId);

TableInfo table = new TableInfo();
table.setDatabaseName(tme.getDatabaseName().toString());
table.setTableName(tme.getTableName().toString());
table.setFullName(tme.getDatabaseName()+"."+tme.getTableName());

tabledIdMap.put(tableId, table);
}

public static synchronized void refreshColumnsMap(){
Map<String,List<ColumnInfo>> map = MysqlConnection.getColumns();
if(map.size()>0){
// logger.warn("refresh and clear cols.");
columnsMap = map;
// logger.warn("refresh and switch cols:{}",map);
}
else
{
logger.error("refresh columnsMap error.");
}
}

public static TableInfo getTableInfo(long tableId){
return tabledIdMap.get(tableId);
}

public static List<ColumnInfo> getColumns(String fullName){
return columnsMap.get(fullName);
}
}

正如上面InstanceListener中提到的,有些信息需要直接从MySQL中读取,比如数据库表的列信息,相关的类MysqlConnection如下:

package or;

import java.sql.Connection;
import java.sql.DatabaseMetaData;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import or.model.BinlogInfo;
import or.model.BinlogMasterStatus;
import or.model.ColumnInfo;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MysqlConnection {
private static final Logger logger = LoggerFactory.getLogger(MysqlConnection.class);

private static Connection conn;

private static String host;
private static int port;
private static String user;
private static String password;

public static void setConnection(String hostArg, int portArg, String userArg, String passwordArg){
try {
if(conn == null || conn.isClosed()){
Class.forName("com.mysql.jdbc.Driver");

host = hostArg;
port = portArg;
user = userArg;
password = passwordArg;

conn = DriverManager.getConnection("jdbc:mysql://"+host+":"+port+"/",user,password);
logger.info("connected to mysql:{} : {}",user,password);
}
} catch (ClassNotFoundException e) {
logger.error(e.getMessage(),e);
} catch (SQLException e) {
logger.error(e.getMessage(),e);
}
}

public static Connection getConnection(){
try {
if(conn == null || conn.isClosed()){
setConnection(host,port,user,password);
}
} catch (SQLException e) {
logger.error(e.getMessage(),e);
}
return conn;
}

/**
* 获取Column信息
*
* @return
*/
public static Map<String,List<ColumnInfo>> getColumns(){
Map<String,List<ColumnInfo>> cols = new HashMap<>();
Connection conn = getConnection();

try {
DatabaseMetaData metaData = conn.getMetaData();
ResultSet r = metaData.getCatalogs();
String tableType[] = {"TABLE"};
while(r.next()){
String databaseName = r.getString("TABLE_CAT");
ResultSet result = metaData.getTables(databaseName, null, null, tableType);
while(result.next()){
String tableName = result.getString("TABLE_NAME");
// System.out.println(result.getInt("TABLE_ID"));
String key = databaseName +"."+tableName;
ResultSet colSet = metaData.getColumns(databaseName, null, tableName, null);
cols.put(key, new ArrayList<ColumnInfo>());
while(colSet.next()){
ColumnInfo columnInfo = new ColumnInfo(colSet.getString("COLUMN_NAME"),colSet.getString("TYPE_NAME"));
cols.get(key).add(columnInfo);
}

}
}
} catch (SQLException e) {
logger.error(e.getMessage(),e);
}
return cols;
}

/**
* 参考
* mysql> show binary logs
* +------------------+-----------+
* | Log_name | File_size |
* +------------------+-----------+
* | mysql-bin.000001 | 126 |
* | mysql-bin.000002 | 126 |
* | mysql-bin.000003 | 6819 |
* | mysql-bin.000004 | 1868 |
* +------------------+-----------+
*/
public static List<BinlogInfo> getBinlogInfo(){
List<BinlogInfo> binlogList = new ArrayList<>();

Connection conn = null;
Statement statement = null;
ResultSet resultSet = null;

try {
conn = getConnection();
statement = conn.createStatement();
resultSet = statement.executeQuery("show binary logs");
while(resultSet.next()){
BinlogInfo binlogInfo = new BinlogInfo(resultSet.getString("Log_name"),resultSet.getLong("File_size"));
binlogList.add(binlogInfo);
}
} catch (Exception e) {
logger.error(e.getMessage(),e);
} finally{
try {
if(resultSet != null)
resultSet.close();
if(statement != null)
statement.close();
if(conn != null)
conn.close();
} catch (SQLException e) {
logger.error(e.getMessage(),e);
}
}

return binlogList;
}

/**
* 参考:
* mysql> show master status;
* +------------------+----------+--------------+------------------+
* | File | Position | Binlog_Do_DB | Binlog_Ignore_DB |
* +------------------+----------+--------------+------------------+
* | mysql-bin.000004 | 1868 | | |
* +------------------+----------+--------------+------------------+
* @return
*/
public static BinlogMasterStatus getBinlogMasterStatus(){
BinlogMasterStatus binlogMasterStatus = new BinlogMasterStatus();

Connection conn = null;
Statement statement = null;
ResultSet resultSet = null;

try {
conn = getConnection();
statement = conn.createStatement();
resultSet = statement.executeQuery("show master status");
while(resultSet.next()){
binlogMasterStatus.setBinlogName(resultSet.getString("File"));
binlogMasterStatus.setPosition(resultSet.getLong("Position"));
}
} catch (Exception e) {
logger.error(e.getMessage(),e);
} finally{
try {
if(resultSet != null)
resultSet.close();
if(statement != null)
statement.close();
if(conn != null)
conn.close();
} catch (SQLException e) {
logger.error(e.getMessage(),e);
}
}

return binlogMasterStatus;
}

/**
* 获取open-replicator所连接的mysql服务器的serverid信息
* @return
*/
public static int getServerId(){
int serverId=6789;
Connection conn = null;
Statement statement = null;
ResultSet resultSet = null;

try {
conn = getConnection();
statement = conn.createStatement();
resultSet = statement.executeQuery("show variables like 'server_id'");
while(resultSet.next()){
serverId = resultSet.getInt("Value");
}
} catch (Exception e) {
logger.error(e.getMessage(),e);
} finally{
try {
if(resultSet != null)
resultSet.close();
if(statement != null)
statement.close();
if(conn != null)
conn.close();
} catch (SQLException e) {
logger.error(e.getMessage(),e);
}
}

return serverId;
}
}

上面代码设计的附加类(BinlogInfo.java; BinlogMasterStatus.java; ColumnInfo.java)

package or.model;
public class BinlogInfo {
private String binlogName;
private Long fileSize;
// 省略Getter和Setter
}

package or.model;
public class BinlogMasterStatus {
private String binlogName;
private long position;
// 省略Getter和Setter
}

package or.model;
public class ColumnInfo {
private String name;
private String type;
// 省略Getter和Setter
}

最后还要有个地方存储解析之后的事件信息,这里简要设计下,采用一个ConcurrentLinkedDeque好了(CDCEventManager.java)

package or.manager;
import java.util.concurrent.ConcurrentLinkedDeque;
import or.CDCEvent;
public class CDCEventManager {
public static final ConcurrentLinkedDeque<CDCEvent> queue = new ConcurrentLinkedDeque<>();
}

所有的准备工作都完成了,下面可以解析binlog日志了:

package or.test;

import java.util.concurrent.TimeUnit;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import or.CDCEvent;
import or.InstanceListener;
import or.MysqlConnection;
import or.OpenReplicatorPlus;
import or.manager.CDCEventManager;
import or.model.BinlogMasterStatus;

import com.google.code.or.OpenReplicator;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.google.gson.JsonElement;
import com.google.gson.JsonParser;

public class OpenReplicatorTest {
private static final Logger logger = LoggerFactory.getLogger(OpenReplicatorTest.class);
private static final String host = "xx.xx.xx.60";
private static final int port = 3306;
private static final String user = "****";
private static final String password = "****";

public static void main(String[] args){
OpenReplicator or = new OpenReplicator ();
or.setUser(user);
or.setPassword(password);
or.setHost(host);
or.setPort(port);
MysqlConnection.setConnection(host, port, user, password);

// or.setServerId(MysqlConnection.getServerId());
//配置里的serverId是open-replicator(作为一个slave)的id,不是master的serverId

BinlogMasterStatus bms = MysqlConnection.getBinlogMasterStatus();
or.setBinlogFileName(bms.getBinlogName());
// or.setBinlogFileName("mysql-bin.000004");
or.setBinlogPosition(4);
or.setBinlogEventListener(new InstanceListener());
try {
or.start();
} catch (Exception e) {
logger.error(e.getMessage(),e);
}

Thread thread = new Thread(new PrintCDCEvent());
thread.start();
}

public static class PrintCDCEvent implements Runnable{
@Override
public void run() {
while(true){
if(CDCEventManager.queue.isEmpty() == false)
{
CDCEvent ce = CDCEventManager.queue.pollFirst();
Gson gson = new GsonBuilder().setPrettyPrinting().serializeNulls().create();
String prettyStr1 = gson.toJson(ce);
System.out.println(prettyStr1);
}
else{
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
}

时间运行旧了会遇到这样一个问题:

16-10-21 10:41:49.365 ERROR[binlog-parser-1 AbstractBinlogParser.run:247] failed to parse binlog
java.io.EOFException: null
at com.google.code.or.io.util.ActiveBufferedInputStream.read(ActiveBufferedInputStream.java:169) ~[open-replicator-1.0.7.jar:na]
at com.google.code.or.io.impl.XInputStreamImpl.doFill(XInputStreamImpl.java:236) ~[open-replicator-1.0.7.jar:na]
at com.google.code.or.io.impl.XInputStreamImpl.read(XInputStreamImpl.java:213) ~[open-replicator-1.0.7.jar:na]
at com.google.code.or.io.impl.XInputStreamImpl.readInt(XInputStreamImpl.java:141) ~[open-replicator-1.0.7.jar:na]
at com.google.code.or.io.impl.XInputStreamImpl.readInt(XInputStreamImpl.java:61) ~[open-replicator-1.0.7.jar:na]
at com.google.code.or.binlog.impl.ReplicationBasedBinlogParser.doParse(ReplicationBasedBinlogParser.java:91) ~[open-replicator-1.0.7.jar:na]
at com.google.code.or.binlog.impl.AbstractBinlogParser$Task.run(AbstractBinlogParser.java:244) ~[open-replicator-1.0.7.jar:na]
at java.lang.Thread.run(Unknown Source) [na:1.7.0_80]
16-10-21 10:41:49.371 INFO [binlog-parser-1 TransportImpl.disconnect:121] disconnected from xx.xx.xx.60:3306

初步解决方案(extends OpenReplicator然后添加重试机制):

package or;

import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.code.or.OpenReplicator;

public class OpenReplicatorPlus extends OpenReplicator{
private static final Logger logger = LoggerFactory.getLogger(OpenReplicatorPlus.class);
private volatile boolean autoRestart = true;
@Override
public void stopQuietly(long timeout, TimeUnit unit){
super.stopQuietly(timeout, unit);
if(autoRestart){
try {
TimeUnit.SECONDS.sleep(10);
logger.error("Restart OpenReplicator");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}

最后只需将OpenReplicatorTest.java中的OpenReplicator or = new OpenReplicator ();改为OpenReplicator or = new OpenReplicatorPlus ();即可。

大功告成~~


参考资料

  1. 谈谈对Canal(增量数据订阅与消费)的理解
  2. MySQL主备复制原理、实现及异常处理
  3. https://github.com/whitesock/open-replicator

欢迎支持笔者的作品《深入理解Kafka: 核心设计与实践原理》和《RabbitMQ实战指南》,同时欢迎关注笔者的微信公众号:朱小厮的博客(ID: hiddenkafka)。
本文作者: 朱小厮

评论

Your browser is out-of-date!

Update your browser to view this website correctly. Update my browser now

×