更新時間:2021-09-17 10:29:56 來源:動力節(jié)點 瀏覽2588次
Mycat的事務(wù)相關(guān)的代碼邏輯,目前的實現(xiàn)方式如下:
用戶會話Session中設(shè)定autocommit=false,開啟一個事務(wù)過程,這個會話中隨后的所有SQL語句進(jìn)入事務(wù)模式,ServerConnection(前端連接)中有一個變量txInterrupted控制是否事務(wù)異常需要回滾
當(dāng)某個SQL執(zhí)行過程中發(fā)生錯誤,則設(shè)置txInterrupted=true,表明此事務(wù)需要回滾
當(dāng)用戶提交事務(wù)(commit指令)的時候,Session會檢查事務(wù)回滾變量,若發(fā)現(xiàn)事務(wù)需要回滾,則取消Commit指令在相關(guān)節(jié)點上的執(zhí)行過程,返回錯誤信息,Transaction need rollback,用戶只能回滾事務(wù),若所有節(jié)點都執(zhí)行成功,則向每個節(jié)點發(fā)送Commit指令,事務(wù)結(jié)束。
從上面的邏輯來看,當(dāng)前Mycat的事務(wù)是一種弱XA的事務(wù),與XA事務(wù)相似的地方是,只有所有節(jié)點都執(zhí)行成功(Prepare階段都成功),才開始提交事務(wù),與XA不同的是,在提交階段,若某個節(jié)點宕機,沒有手段讓此事務(wù)在故障節(jié)點恢復(fù)以后繼續(xù)執(zhí)行,從實際的概率來說,這個概率也是很小很小的,因此,當(dāng)前事務(wù)的方式還是能滿足絕大數(shù)系統(tǒng)對事務(wù)的要求。
另外,Mycat當(dāng)前若XA的事務(wù)模式,相對XA還是比較輕量級,性能更好,雖然如此,也不建議一個事務(wù)中存在跨多個節(jié)點的SQL操作問題,這樣鎖定的資源更多,并發(fā)性降低很多。
前端連接中關(guān)于事務(wù)標(biāo)記txInterrupted的方法片段:
public class ServerConnection extends FrontendConnection {
/** * 設(shè)置是否需要中斷當(dāng)前事務(wù) */
public void setTxInterrupt(String txInterrputMsg) {
if (!autocommit && !txInterrupted) {
txInterrupted = true;
this.txInterrputMsg = txInterrputMsg;
}
}
public boolean isTxInterrupted() {
return txInterrupted;
}
/** * 提交事務(wù) */
public void commit() {
if (txInterrupted) {
writeErrMessage(ErrorCode.ER_YES,"Transaction error, need to rol lback.");
}else{
session.commit();
}
}
}
SQL出錯時候設(shè)置事務(wù)回滾標(biāo)志:
public class SingleNodeHandler implements ResponseHandler, Terminatable,LoadDataResponseHandler {
private void backConnectionErr(ErrorPacket errPkg, BackendConnection conn) {
endRunning();
String errmgs = " errno:" + errPkg.errno + " "+ new String(errPkg.message);
LOGGER.warn("execute sql err :" + errmgs + " con:" + conn);
session.releaseConnectionIfSafe(conn, LOGGER.isDebugEnabled(), false);
ServerConnection source = session.getSource();
source.setTxInterrupt(errmgs);
errPkg.write(source);
recycleResources();
}
}
Session提交事務(wù)的關(guān)鍵代碼:
public class NonBlockingSession implements Session {
public void commit() {
final int initCount = target.size();
if (initCount <= 0) {
ByteBuffer buffer = source.allocate();
buffer = source.writeToBuffer(OkPacket.OK, buffer); source.write(buffer);
return;
} else if (initCount == 1) {
BackendConnection con = target.elements().nextElement(); commitHandler.commit(con);
} else {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("multi node commit to send ,total " + initCount);
}
multiNodeCoordinator.executeBatchNodeCmd(SQLCmdConstant.COMMIT_CMD);
}
}
}
以上就是對“Mycat的事務(wù)管理機制”的介紹,大家如果想了解更多相關(guān)知識,可以關(guān)注動力節(jié)點Mycat教程,文檔當(dāng)中的內(nèi)容很詳細(xì),當(dāng)然也有配套的視頻教程可以免費下載學(xué)習(xí),希望對大家能夠有所幫助。
初級 202925
初級 203221
初級 202629
初級 203743