分布式 XA 事务处理逻辑
事务在数据库中代表一系列操作要么全部都完成,要么全部都失败,ACID 规定了事务操作的原子性、一致性、隔离性和持久性。然而数据库的环境不可能只在单机上,在分布式环境下,一个事务中某个操作可能发往 A 节点,而另一个操作发往 B 节点,这就导致无法保证 ACID 的原则。
实现分布式事务常见的解决办法有以下几种:XA 两阶段提交协议、TCC 协议和 SAGA 协议。但是这些解决办法都不可能完全保证事务不出错。分布式系统中有一个 CAP 定理,说的是在分布式情况下,不可能同时满足一致性、可用性和容错性这三个条件,一般需要满足其中两个条件。
XA 两阶段提交协议
XA 协议规定了分布式事务的标准,其中 AP 代表应用程序,TM 代表事务管理器,负责协调和管理事务,而RM 代表着资源管理器。
而事务的具体处理过程就是 TM 和 RM 之间的交互,分为两个阶段:
第一阶段:事务管理器要求每个涉及到事务的数据库预提交 (precommit) 此操作,并反映是否可以提交。
第二阶段:事务管理器要求每个数据库提交数据,或者回滚数据。
以 MySQL 中的 XA 处理逻辑为例(MySQL5.7 版本实现了对 XA 协议的支持),来看下这两个阶段的逻辑处理过程。
对于一个事务:
begin;
insert into student values ('xiaoming', 18);
update test set age = 18 where name = 'xiaoming';
commit;
第一阶段
事务管理器会生成一个全局的事务 ID,比如使用 uuid 生成一个唯一的 ID,为了方便用 xid1 代替。
首先,遇到 begin,不处理。
然后是 insert 操作,事务管理器根据表中主键的值计算(hash)出应该分布在哪个节点上,比如 insert 语句被计算出应该发到节点 A 上,事务管理器就像 A 节点发送命令开始 XA 事务,同时将 insert 语句发送过去。
xa start 'xid1'; # 开启事务
insert into student values ('xiaoming', 18);
接下来 update 操作,同样的,事务管理器根据主键计算所属节点,开启 XA,发送 update 语句。
xa start 'xid1';
update test set age = 18 where name = 'xiaoming';
commit 的时候,事务管理器分别向节点 A 和 B 发送一个预提交操作:
xa end 'xid1';
xa prepare 'xid1';
第二阶段
如果节点 A 和 B 都返回就绪 ready,此时进入 第二阶段:
事务管理器分别向节点 AB 发送 commit 操作:
xa commit 'xid1';
相反的,如果有任何一个节点是 unready,事务管理器就会通知 A、B 节点的操作回滚:
xa rollback'xid1';
有一个问题,如果在进入第二阶段 commit 的时候,某个数据节点出现故障,会导致节点状态不一致。解决办法是把 XA 事务处理的过程也存入日志数据,比如 MySQL 将其写入了 binlog,这样在出现问题时还可以恢复。
整个 XA 的过程:
# 阶段一
xa start 'xid1';
insert into test values (1, 1);
xa start 'xid1';
update test set b = 1 where a = 10;
xa end 'xid1';
xa prepare 'xid1';
# 阶段二
xa commit 'xid1';
# or
xa rollback 'xid1'; # 失败回滚
EverDB 分布式事务的支持
MyCat 中的实现
EDB-Grid 组件中,借鉴了 MyCat(也是一个数据库中间件)的 XA 处理逻辑,MyCat 根据 XA 协议实现了对分布式事务的支持,具体来说:
通过数据库编程接口(比如 JDBC,也就是 XA 协议中的 AP)开启 XA 事务,然后执行 SQL 语句,预提交,最后 commit。
// 开始 XA 事务
conn.prepareStatement("set xa=on").execute();
// 插入语句
// 分别预提交
conn.prepareStatement(sql1).execute();
conn.prepareStatement(sql2).execute();
// commit
conn.commit();
过程跟 MySQL 类似,在实现上,利用 uuid 生成了一个全局的事务 ID:
public void setXATXEnabled(boolean xaTXEnabled) {
if (xaTXEnabled) {
if (this.xaTXID == null) {
xaTXID = genXATXID(); // 获得 XA 事务编号
}
} else {
this.xaTXID = null;
}
}
//......
public static String getUUID() {
String s = UUID.randomUUID().toString();
return s.substring(0, 8) + s.substring(9, 13) + s.substring(14, 18) + s.substring(19, 23) + s.substring(24);
}
然后在事务管理器向节点分发语句时,会先写入 XA START:
if (expectAutocommit == false && xaTxID != null && xaStatus == TxState.TX_INITIALIZE_STATE) {
xaCmd = "XA START " + xaTxID + ';';
this.xaStatus = TxState.TX_STARTED_STATE;
}
//......
// and our query sql to multi command at last
sb.append(rrn.getStatement() + ";");
// syn and execute others
this.sendQueryCmd(sb.toString());
MyCat 在执行事务操作是,会同时将其写入日志中,保证可恢复。
if (mysqlCon.getXaStatus() == TxState.TX_STARTED_STATE) { // XA 事务
//recovery Log
participantLogEntry[started] = new ParticipantLogEntry(xaTxId, conn.getHost(), 0, conn.getSchema(), ((MySQLConnection) conn).getXaStatus());
String[] cmds = new String[]{"XA END " + xaTxId, // XA END 命令
"XA PREPARE " + xaTxId}; // XA PREPARE 命令
mysqlCon.execBatchCmd(cmds);
同样的,commit 时也会同步写入日志。
rollback:
if (needRollback) {
for (int j = 0; j < coordinatorLogEntry.participants.length; j++) {
ParticipantLogEntry participantLogEntry = coordinatorLogEntry.participants[j];
//XA rollback
String xacmd = "XA ROLLBACK " + coordinatorLogEntry.id + ';';
OneRawSQLQueryResultHandler resultHandler = new OneRawSQLQueryResultHandler(new String[0], new XARollbackCallback());
outloop:
// ...
EverDB 中的实现
再来看下 EverDB 的处理过程:
首先是生成 xid,从 0 开始递增。
unsigned long XA_manager::generate_xid()
{
unsigned long ret = 0;
xid_mutex.acquire();
try {
//TODO: find a place to do init_max_xid
if (!init_xid)
init_max_xid();
ret = xid_next++;
if (!ret) // 0 is kept as the initial value
++ret;
//...
开始 XA 事务:
void MySQLXA_helper::init_conn_to_start_xa(Session *session,
DataSpace *space,
Connection *conn)
{
unsigned long xid = session->get_xa_id();
// clear the pending transaction
conn->execute_one_modify_sql("COMMIT;");
// ......
record_xa_redo_log(session, space, sql.c_str()); // log
}
// ...
// start xa transaction
sql += "XA START '";
sql += tmp;
sql += "';";
conn->execute_one_modify_sql(sql.c_str());
conn->set_start_xa_conn(true);
}
第二阶段:XA COMMIT 或者 ROLLBACK:
void xa_commit_or_rollback_xid(Connection *conn, string xid, int flag)
{
string sql("");
if (flag == TC_TRANSACTION_COMMIT)
sql += "XA COMMIT '"; // xa commit
else if (flag != TC_TRANSACTION_COMMIT)
sql += "XA ROLLBACK '"; // xa rollback
sql += xid.c_str();
sql += "';";
check_xa_sql_is_not_running(conn, sql);
TimeValue timeout = TimeValue(backend_sql_net_timeout);
//......
}
}
同时事务处理的过程会写入 redolog 中,比如上面的开始 XA 事务中 record_xa_redo_log 。
© CC BY-NC-SA 4.0 | Chaojie