Shardingsphere整合Narayana對XA分布式事務的支持(4)

Apache ShardingSphere 是一套開源的分布式資料庫中間件解決方案組成的生態圈,它由 JDBC、Proxy 和 Sidecar(規劃中)這 3 款相互獨立,卻又能夠混合部署配合使用的產品組成。它們均提供標準化的數據分片、分布式事務和資料庫治理功能,可適用于如 Java 同構、異構語言、云原生等各種多樣化的應用場景。

ShardingSphere 已于 2020 年 4 月 16 日成為 Apache 軟體基金會的頂級項目。

Narayana 簡單介紹

Narayana(https://narayana.io/),是由 Jboss 團隊提供的 XA 分布式事務的解決方案。

它具有以下特點:

  • 標準的基于JTA實現。

  • TransactionManager(TM) 完全去中心化設計,與業務耦合,無需單獨部署。

  • 事務日志支持資料庫存儲,支持集群模式下的事務恢復。

ShardingTransactionManager 初始化 XATransactionDataSource 流程

ShardingSphere 對 XA 的支持提供一整套的 SPI 接口,在初始化話的時候,根據事務類型,先進行 TransactionManager 的初始化。我們先進入org.apache.shardingsphere.transaction.xa.XAShardingTransactionManager。代碼如下:

 private final Map cachedDataSources = new HashMap<>();
private final XATransactionManager xaTransactionManager = XATransactionManagerLoader.getInstance().getTransactionManager();
@Override public void init(final DatabaseType databaseType, final Collection resourceDataSources) { for (ResourceDataSource each : resourceDataSources) { cachedDataSources.put(each.getOriginalName(), new XATransactionDataSource(databaseType, each.getUniqueResourceName(), each.getDataSource(), xaTransactionManager)); } // Narayana的初始化 xaTransactionManager.init(); }
復制代碼

  • 首先會根據配置的datasource將其轉換成XATransactionDataSource,具體代碼在new XATransactionDataSource(databaseType, each.getUniqueResourceName(), each.getDataSource(), xaTransactionManager))。我們跟進去,代碼如下:

public XATransactionDataSource(final DatabaseType databaseType, final String resourceName, final DataSource dataSource, final XATransactionManager xaTransactionManager) {        this.databaseType = databaseType;        this.resourceName = resourceName;        this.dataSource = dataSource;        if (!CONTAINER_DATASOURCE_NAMES.contains(dataSource.getClass().getSimpleName())) {            // 重點關注 1 ,返回了xaDatasource            xaDataSource = XADataSourceFactory.build(databaseType, dataSource);            this.xaTransactionManager = xaTransactionManager;            // 重點關注2 注冊資源            xaTransactionManager.registerRecoveryResource(resourceName, xaDataSource);        }    }
復制代碼

  • 我們重點來關注 XADataSourceFactory.build(databaseType, dataSource),從名字我們就可以看出,這應該是返回JTA規范里面的XADataSource,在ShardingSphere里面很多的功能,可以從代碼風格的命名上就能猜出來,這就是優雅代碼(吹一波)。不多逼逼,我們進入該方法。

public final class XADataSourceFactory {
public static XADataSource build(final DatabaseType databaseType, final DataSource dataSource) { return new DataSourceSwapper(XADataSourceDefinitionFactory.getXADataSourceDefinition(databaseType)).swap(dataSource); }}
復制代碼

  • 首先又是一個SPI定義的 XADataSourceDefinitionFactory,它根據不同的資料庫類型,來加載不同的方言。然后我們進入 swap方法。

 public XADataSource swap(final DataSource dataSource) {        XADataSource result = createXADataSource();        setProperties(result, getDatabaseAccessConfiguration(dataSource));        return result;    }
復制代碼

  • 很簡明,第一步創建,XADataSource,第二步給它設置屬性(包含數據的連接,用戶名密碼等),然后返回。

Narayana 初始化過程詳解

我們首先進入org.apache.shardingsphere.transaction.xa.narayana.manager.NarayanaXATransactionManager

public final class NarayanaXATransactionManager implements XATransactionManager {    //加載transactionManger    private final TransactionManager transactionManager = jtaPropertyManager.getJTAEnvironmentBean().getTransactionManager();
//獲取事務恢復模塊 private final XARecoveryModule xaRecoveryModule = XARecoveryModule.getRegisteredXARecoveryModule();
private final RecoveryManagerService recoveryManagerService = new RecoveryManagerService();
@Override public void init() { RecoveryManager.delayRecoveryManagerThread(); recoveryManagerService.create();//開啟事務恢復 recoveryManagerService.start(); }
@Override public void registerRecoveryResource(final String dataSourceName, final XADataSource xaDataSource) { xaRecoveryModule.addXAResourceRecoveryHelper(new DataSourceXAResourceRecoveryHelper(xaDataSource)); }
@Override public void removeRecoveryResource(final String dataSourceName, final XADataSource xaDataSource) { xaRecoveryModule.removeXAResourceRecoveryHelper(new DataSourceXAResourceRecoveryHelper(xaDataSource)); }
@SneakyThrows({SystemException.class, RollbackException.class}) @Override public void enlistResource(final SingleXAResource singleXAResource) { transactionManager.getTransaction().enlistResource(singleXAResource.getDelegate()); }
@Override public TransactionManager getTransactionManager() { return transactionManager; }
@Override public void close() throws Exception { recoveryManagerService.stop(); recoveryManagerService.destroy(); }}
復制代碼

  • 首先我們關注jtaPropertyManager.getJTAEnvironmentBean().getTransactionManager()獲取TransactionManager,這是整個 Narayana初始化的核心。進入代碼 com.arjuna.common.internal.util.propertyservice.BeanPopulator.getNamedInstance()

private static  T getNamedInstance(Class beanClass, String name, Properties properties) throws RuntimeException {        StringBuilder sb = new StringBuilder().append(beanClass.getName());        if (name != null)           sb.append(":").append(name);        String key = sb.toString();        // we don't mind sometimes instantiating the bean multiple times,        // as long as the duplicates never escape into the outside world.        if(!beanInstances.containsKey(key)) {            T bean = null;            try {               // 初始化 JTAEnvironmentBean 這個類                bean = beanClass.newInstance();                if (properties != null) {                    configureFromProperties(bean, name, properties);                } else {                    //初始化屬性配置                    Properties defaultProperties = PropertiesFactory.getDefaultProperties();                    configureFromProperties(bean, name, defaultProperties);                }            } catch (Throwable e) {                throw new RuntimeException(e);            }            beanInstances.putIfAbsent(key, bean);        }        return (T) beanInstances.get(key);    }
復制代碼

  • 我們重點關注 Properties defaultProperties = PropertiesFactory.getDefaultProperties(); 。最后會進入com.arjuna.common.util.propertyservice.AbstractPropertiesFactory.getPropertiesFromFile()

 public Properties getPropertiesFromFile(String propertyFileName, ClassLoader classLoader) {        String propertiesSourceUri = null;        try        {            // 文件名稱為:jbossts-properties.xml 加載順序為:This is the point where the search path is applied - user.dir (pwd), user.home, java.home, classpath            propertiesSourceUri = com.arjuna.common.util.propertyservice.FileLocator.locateFile(propertyFileName, classLoader);        }        catch(FileNotFoundException fileNotFoundException)        {            // try falling back to a default file built into the .jar            // Note the default- prefix on the name, to avoid finding it from the .jar at the previous stage            // in cases where the .jar comes before the etc dir on the classpath.            URL url = AbstractPropertiesFactory.class.getResource("/default-" propertyFileName);            if(url == null) {            commonLogger.i18NLogger.warn_could_not_find_config_file(url);            } else {                propertiesSourceUri = url.toString();            }        }        catch (IOException e)        {            throw new RuntimeException("invalid property file " propertiesSourceUri, e);        }        Properties properties = null;        try {            if (propertiesSourceUri != null) {               //加載配置文件                properties = loadFromFile(propertiesSourceUri);            }            // 疊加系統配置屬性            properties = applySystemProperties(properties);        } catch(Exception e) {            throw new RuntimeException("unable to load properties from " propertiesSourceUri, e);        }        return properties;    }
復制代碼

  • 加載文件名稱為 jbossts-properties.xml, 加載路徑優先級別為 :user.dir > user.home >java.home >classpath。最后再疊加上系統屬性,然后返回。

我們再來看一下 jbossts-properties.xml 的參考格式如下:

    YES    com.arjuna.ats.internal.arjuna.objectstore.jdbc.JDBCStore    com.arjuna.ats.internal.arjuna.objectstore.jdbc.accessors.DynamicDataSourceJDBCAccess;ClassName=com.mysql.jdbc.jdbc2.optional.MysqlDataSource;DatabaseName=jbossts;ServerName=172.25.4.62;PortNumber=3306;User=j_jbossts;Password=9MfNHoRncCi8    Action    true    com.arjuna.ats.internal.arjuna.objectstore.jdbc.JDBCStore    com.arjuna.ats.internal.arjuna.objectstore.jdbc.accessors.DynamicDataSourceJDBCAccess;ClassName=com.mysql.jdbc.jdbc2.optional.MysqlDataSource;DatabaseName=jbossts;ServerName=172.25.4.62;PortNumber=3306;User=j_jbossts;Password=9MfNHoRncCi8    stateStore    true    com.arjuna.ats.internal.arjuna.objectstore.jdbc.JDBCStore    com.arjuna.ats.internal.arjuna.objectstore.jdbc.accessors.DynamicDataSourceJDBCAccess;ClassName=com.mysql.jdbc.jdbc2.optional.MysqlDataSource;DatabaseName=jbossts;ServerName=172.25.4.62;PortNumber=3306;User=j_jbossts;Password=9MfNHoRncCi8    Communication    true    ON    1    1            com.arjuna.ats.internal.jta.recovery.arjunacore.JTATransactionLogXAResourceOrphanFilter        com.arjuna.ats.internal.jta.recovery.arjunacore.JTANodeNameXAResourceOrphanFilter        com.arjuna.ats.internal.jta.recovery.arjunacore.JTAActionStatusServiceXAResourceOrphanFilter        0            com.arjuna.ats.internal.arjuna.recovery.AtomicActionRecoveryModule        com.arjuna.ats.internal.jta.recovery.arjunacore.XARecoveryModule                com.arjuna.ats.internal.arjuna.recovery.ExpiredTransactionStatusManagerScanner        4712        0        NO    1
復制代碼

它被視為標準 java.util.Properties 文件的 XML 格式并按需加載。entry 名稱的形式為:類名.屬性名。提供的配置類都在com.arjuna.ats.arjuna.common包下,以 bean 結尾的實體類。

  • 文件加載后,它會被緩存,直到JVM重新啟動才重新讀取。對屬性文件的更改需要重新啟動才能生效

  • 在屬性加載之后,將檢查EnvironmentBean,對于每個字段,如果屬性在搜索順序中包含如下匹配的鍵,則使用屬性的值調用該字段的setter方法,或者使用不同的系統屬性調用該字段的setter方法。

  • 然后將bean返回給調用者,調用者可以通過調用setter方法進一步覆蓋值。

我們返回主線:現在已經加載了配置。接下來就是執行configureFromProperties(bean, name, defaultProperties); 。就是利用反射機制初始化對象,以及給對象的屬性賦值。代碼如下:

public static void configureFromProperties(Object bean, String instanceName, Properties properties) throws Exception {       for(Field field : bean.getClass().getDeclaredFields()) {            Class type = field.getType();            String setterMethodName = "set" capitalizeFirstLetter(field.getName());            Method setter;            try {                setter = bean.getClass().getMethod(setterMethodName, new Class[] {field.getType()});            } catch(NoSuchMethodException e) {                continue; // emma code coverage tool adds fields to instrumented classes - ignore them.            }            String getterMethodName;            Method getter = null;            if(field.getType().equals(Boolean.TYPE)) {                getterMethodName = "is" capitalizeFirstLetter(field.getName());                try {                    getter = bean.getClass().getMethod(getterMethodName, new Class[] {});                } catch (NoSuchMethodException e) {}            }            if(getter == null) {                getterMethodName = "get" capitalizeFirstLetter(field.getName());                getter = bean.getClass().getMethod(getterMethodName, new Class[] {});            }            if(field.isAnnotationPresent(ConcatenationPrefix.class) || field.getType().getName().startsWith("java.util")) {                handleGroupProperty(bean, instanceName, properties, field, setter, getter);            } else {                handleSimpleProperty(bean, instanceName, properties, field, setter, getter);            }        }    }
復制代碼

我們在回到 NarayanaXATransactionManager,分析 XARecoveryModule.getRegisteredXARecoveryModule();代碼如下 :

    public static XARecoveryModule getRegisteredXARecoveryModule () {         if (registeredXARecoveryModule == null) {//獲取事務恢復manager            RecoveryManager recMan = RecoveryManager.manager();            Vector recoveryModules = recMan.getModules();
if (recoveryModules != null) { Enumeration modules = recoveryModules.elements();
while (modules.hasMoreElements()) { RecoveryModule m = (RecoveryModule) modules.nextElement();
if (m instanceof XARecoveryModule) { registeredXARecoveryModule = (XARecoveryModule) m; break; } } } } return registeredXARecoveryModule; }
復制代碼

  • 重點關注獲取RecoveryManager.manager();, 最后會進入com.arjuna.ats.internal.arjuna.recovery.RecoveryManagerImple的構造方法,代碼如下:

       //省略了相關無用代碼      // start the activator recovery loader 加載事務恢復        _recActivatorLoader = new RecActivatorLoader();        _recActivatorLoader.startRecoveryActivators();
// start the periodic recovery thread // (don't start this until just about to go on to the other stuff) //進行初始化 _periodicRecovery = new PeriodicRecovery(threaded, useListener);
/* * Start the expiry scanner * * This has to happen after initiating periodic recovery, because periodic recovery registers record types used * by the expiry scanner */ ExpiredEntryMonitor.startUp();
復制代碼

  • 重點關注 new PeriodicRecovery(threaded, useListener);,會進行恢復模塊的加載,最后會進入com.arjuna.ats.internal.arjuna.recovery.AtomicActionRecoveryModule 的構造方法。

 public AtomicActionRecoveryModule()   {       if (tsLogger.logger.isDebugEnabled()) {           tsLogger.logger.debug("AtomicActionRecoveryModule created");       }      if (_recoveryStore == null)      {         _recoveryStore = StoreManager.getRecoveryStore();      }      _transactionStatusConnectionMgr = new TransactionStatusConnectionManager() ;   }
復制代碼

  • StoreManager.getRecoveryStore(); ,最后會進入com.arjuna.ats.arjuna.objectstore.StoreManager.initStore(),進入事務日志的初始化。代碼如下:

private static final ObjectStoreAPI initStore(String name)    {        ObjectStoreEnvironmentBean storeEnvBean = BeanPopulator.getNamedInstance(ObjectStoreEnvironmentBean.class, name);//獲取事務存儲類型,支持的類名,默認使用 ShadowNoFileLockStore 來存儲      String storeType = storeEnvBean.getObjectStoreType();        ObjectStoreAPI store;
try {//進行SPI初始化加載 store = ClassloadingUtility.loadAndInstantiateClass(ObjectStoreAPI.class, storeType, name); } catch (final Throwable ex) { throw new FatalError(tsLogger.i18NLogger.get_StoreManager_invalidtype() " " storeType, ex); } //進行初始化 store.start();
return store; }
復制代碼

  • 整個方法是比較清楚的,首先獲取事務日志存儲的類型(默認使用file模式),然后進行SPI初始化加載,最后再初始化。

  • storeType 這里如果配置的是 com.arjuna.ats.internal.arjuna.objectstore.jdbc.JDBCStore,那么就會進入這個類的構造方法,來進行初始化。代碼如下:

//省略無關代碼 try {                StringTokenizer stringTokenizer = new StringTokenizer(connectionDetails, ";");     //初始化jdbcAccess ,用來初始化                JDBCAccess jdbcAccess = (JDBCAccess) Class.forName(stringTokenizer.nextToken()).newInstance();//進行jdbc連接,datasource的初始化                jdbcAccess.initialise(stringTokenizer);
_storeName = jdbcAccess.getClass().getName() ":" tableName;
Connection connection = jdbcAccess.getConnection(); String name; int major; int minor; try { DatabaseMetaData md = connection.getMetaData(); name = md.getDriverName(); major = md.getDriverMajorVersion(); minor = md.getDriverMinorVersion(); } finally { connection.close(); }
/* * Check for spaces in the name - our implementation classes are * always just the first part of such names. */
int index = name.indexOf(' ');
if (index != -1) name = name.substring(0, index);
name = name.replaceAll("-", "_");
name = name.toLowerCase();
final String packagePrefix = JDBCStore.class.getName().substring(0, JDBCStore.class.getName().lastIndexOf('.')) ".drivers."; Class jdbcImpleClass = null; try { jdbcImpleClass = Class.forName(packagePrefix name "_" major "_" minor "_driver"); } catch (final ClassNotFoundException cnfe) { try { jdbcImpleClass = Class.forName(packagePrefix name "_" major "_driver"); } catch (final ClassNotFoundException cnfe2) { jdbcImpleClass = Class.forName(packagePrefix name "_driver"); } } _theImple = (com.arjuna.ats.internal.arjuna.objectstore.jdbc.JDBCImple_driver) jdbcImpleClass.newInstance(); //使用不同的資料庫類型來初始化 _theImple.initialise(jdbcAccess, tableName, jdbcStoreEnvironmentBean); imples.put(key, _theImple); storeNames.put(key, _storeName); } catch (Exception e) { tsLogger.i18NLogger.fatal_objectstore_JDBCStore_2(_storeName, e); throw new ObjectStoreException(e); } }
復制代碼

  • 這個方法還是比較清晰的,根據我們的jdbc的配置,首先初始化連接資訊。然后獲取連接,然后根據不同的資料庫類型,來進行初始化。我們來關心下_theImple.initialise(jdbcAccess, tableName, jdbcStoreEnvironmentBean);。代碼如下:

public void initialise(final JDBCAccess jdbcAccess, String tableName,            ObjectStoreEnvironmentBean jdbcStoreEnvironmentBean)            throws SQLException, NamingException {        this.jdbcAccess = jdbcAccess;        try (Connection connection = jdbcAccess.getConnection()) {            try (Statement stmt = connection.createStatement()) {                 // table [type, object UID, format, blob]  //初始化是否是否需要刪除表                   if (jdbcStoreEnvironmentBean.getDropTable()) {                    try {                        stmt.executeUpdate("DROP TABLE "   tableName);                    } catch (SQLException ex) {                        checkDropTableException(connection, ex);                    }                }        //是否需要創建表                if (jdbcStoreEnvironmentBean.getCreateTable()) {                    try {                        createTable(stmt, tableName);                    } catch (SQLException ex) {                        checkCreateTableError(ex);                    }                }
// This can be the case when triggering via EmptyObjectStore if (!connection.getAutoCommit()) { connection.commit(); } } }
this.tableName = tableName; }
復制代碼

  • 框架會自動的創建事務日志表來進行存儲,所以我們不需要手動創建,也不要驚訝這個表是從哪里來的。創建的表的代碼如下:

protected void createTable(Statement stmt, String tableName)            throws SQLException {        String statement = "CREATE TABLE "                  tableName                  " (StateType INTEGER NOT NULL, Hidden INTEGER NOT NULL, "                  "TypeName VARCHAR(255) NOT NULL, UidString VARCHAR(255) NOT NULL, ObjectState "                  getObjectStateSQLType()                  ", PRIMARY KEY(UidString, TypeName, StateType))";        stmt.executeUpdate(statement);    }
復制代碼

  • 我們在回到主線 PeriodicRecovery,這個類是繼承Thread,調用start就會執行run方法,他會對控制需要進行恢復的事務線程,真的當前的事務狀態進行處理,到底是阻塞,還是喚醒。

  • 初始化流程中,還有一步是進行事務恢復的,這個我們在后續的章節,單獨拿出來進行講解。

NarayanaXA 分布式事務 begin 流程

我們知道,本地的事務,都會有一個 trainsaction.begin, 對應 XA 分布式事務來說也不另外,我們再把思路切換回XAShardingTransactionManager.begin(), 會調用com.arjuna.ats.internal.jta.transaction.arjunacore.BaseTransaction.begin() 方法。代碼如下:

        //檢查事務狀態        checkTransactionState();        //獲取超時配置,超時很重要        Iteger value = http://dz.com/_timeouts.get();        int v = 0; // if not set then assume 0. What else can we do?        if (value != null)        {            v = value.intValue();        }        else            v = TxControl.getDefaultTimeout();
// TODO set default timeout //初始化事務實現 TransactionImple.putTransaction(new TransactionImple(v));
復制代碼

  • 初始化流程主要就是檢查事務狀態,獲取超時時間,最后也是最重要的創建事務實現。new TransactionImple(v)。我們進入該類的構造方法,代碼如下:

public TransactionImple(int timeout)    {//創建事務執行action        _theTransaction = new AtomicAction();//開啟事務        _theTransaction.begin(timeout);
_resources = new Hashtable(); _duplicateResources = new Hashtable(); _suspendCount = 0; _xaTransactionTimeoutEnabled = getXATransactionTimeoutEnabled();
_txLocalResources = Collections.synchronizedMap(new HashMap()); }
復制代碼

  • 這里面最重要是2步,第一步是初始化 AtomicAction,第二步是 AtomicAction.begin()。我們先來看 new AtomicAction。會對相關的父類,進行初始化。AtomicAction的繼承體系圖為:

  • 我們接下來看com.arjuna.ats.arjuna.AtomicAction.begin()。代碼如下:

public int begin (int timeout)    {               //進行start,最關鍵        int status = super.start();        if (status == ActionStatus.RUNNING)        {            /*             * Now do thread/action tracking.             *///放入threadlocal里面            ThreadActionData.pushAction(this);            _timeout = timeout;            if (_timeout == 0)                _timeout = TxControl.getDefaultTimeout();
if (_timeout > 0) //設置事務超時控制,很重要 TransactionReaper.transactionReaper().insert(this, _timeout); } return status; }
復制代碼

  • 我們先來分析super.start()。最后會進入com.arjuna.ats.arjuna.coordinator.BasicAction.begin()。代碼如下:

  //省略很多代碼//進行action的一些初始化工作  actionInitialise(parentAct);
復制代碼

XATransactionDataSource getConnection() 流程

我們都知道想要執行 SQL 語句,必須要獲取到資料庫的 connection。讓我們再回到 XAShardingTransactionManager.getConnection() 最后會調用到org.apache.shardingsphere.transaction.xa.jta.datasourceXATransactionDataSource.getConnection()。流程圖如下:

代碼 :

 public Connection getConnection() throws SQLException, SystemException, RollbackException {      //先檢查是否已經有存在的connection,這一步很關心,也是XA的關鍵,因為XA事務,必須在同一個connection        if (CONTAINER_DATASOURCE_NAMES.contains(dataSource.getClass().getSimpleName())) {            return dataSource.getConnection();        }      //獲取資料庫連接        Connection result = dataSource.getConnection();      //轉成XAConnection,其實是同一個連接        XAConnection xaConnection = XAConnectionFactory.createXAConnection(databaseType, xaDataSource, result);      //獲取JTA事務定義接口        Transaction transaction = xaTransactionManager.getTransactionManager().getTransaction();        if (!enlistedTransactions.get().contains(transaction)) {      //進行資源注冊            transaction.enlistResource(new SingleXAResource(resourceName, xaConnection.getXAResource()));            transaction.registerSynchronization(new Synchronization() {                @Override                public void beforeCompletion() {                    enlistedTransactions.get().remove(transaction);                }
@Override public void afterCompletion(final int status) { enlistedTransactions.get().clear(); } }); enlistedTransactions.get().add(transaction); } return result; }
復制代碼

  • 首先第一步很關心,尤其是對shardingsphere來說,因為在一個事務里面,會有多個SQL語句,打到相同的資料庫,所以對相同的資料庫,必須獲取同一個XAConnection,這樣才能進行XA事務的提交與回滾。

  • 我們接下來關心 transaction.enlistResource(new SingleXAResource(resourceName, xaConnection.getXAResource()));, 會進入com.arjuna.ats.internal.jta.transaction.arjunacore.TransactionImp.enlistResource(), 代碼太長,截取一部分。

 // Pay attention now, this bit is hairy. We need to add a new AbstractRecord (XAResourceRecord)                        // to the BasicAction, which will thereafter drive its completion. However, the transaction                        // core is not directly XA aware, so it's our job to start the XAResource. Problem is, if                        // adding the record fails, BasicAction will never end the resource via the XAResourceRecord,                        // so we must do so directly.  start may fail due to dupl xid or other reason, and transactions                        // may rollback async, for which reasons we can't call add before start.                        // The xid will change on each pass of the loop, so we need to create a new record on each pass.                        // The add will fail in the case of multiple last resources being disallowed                        // see JBTM-362 and JBTM-363                        AbstractRecord abstractRecord = createRecord(xaRes, params, xid);                        if(abstractRecord != null) {                            xaRes.start(xid, xaStartNormal);                            if(_theTransaction.add(abstractRecord) == AddOutcome.AR_ADDED) {                                _resources.put(xaRes, new TxInfo(xid));                                return true; // dive out, no need to set associatedWork = true;                            } else {                                // we called start on the resource, but _theTransaction did not accept it.                                // we therefore have a mess which we must now clean up by ensuring the start is undone:                                abstractRecord.topLevelAbort();                            }                        }
復制代碼

  • 哦多尅,看見了嗎,各位,看見了 xaRes.start(xid, xaStartNormal); 了嗎????,我們進去,假設我們使用的Mysql資料庫:

 public void start(Xid xid, int flags) throws XAException {        StringBuilder commandBuf = new StringBuilder(300);        commandBuf.append("XA START ");        appendXid(commandBuf, xid);        switch(flags) {        case 0:            break;        case 2097152:            commandBuf.append(" JOIN");            break;        case 134217728:            commandBuf.append(" RESUME");            break;        default:            throw new XAException(-5);        }
this.dispatchCommand(commandBuf.toString()); this.underlyingConnection.setInGlobalTx(true); }
復制代碼

  • 組裝XA start Xid SQL語句,進行執行。

到這里,我們總結下,在獲取資料庫連接的時候,我們執行了 XA 協議接口中的 XA start xid

Narayana commit 流程源碼分析

我們進入com.arjuna.ats.internal.jta.transaction.arjunacore.BaseTransaction.commit() 方法,代碼如下:

//獲取當前事務    TransactionImple theTransaction = TransactionImple.getTransaction();
if (theTransaction == null) throw new IllegalStateException( "BaseTransaction.commit - " jtaLogger.i18NLogger.get_transaction_arjunacore_notx()); //進行事務提交 theTransaction.commitAndDisassociate();
復制代碼

  • 我們重點來關注theTransaction.commitAndDisassociate();,最后進入com.arjuna.ats.arjuna.AtomicAction.commit()代碼如下:

public int commit (boolean report_heuristics)    {                 //進行事務提交        int status = super.end(report_heuristics);        /*         * Now remove this thread from the action state.         */               //清空數據        ThreadActionData.popAction();        TransactionReaper.transactionReaper().remove(this);        return status;    }
復制代碼

  • 最后我們會進入com.arjuna.ats.arjuna.coordinator.BasicAction.End()方法,會首先判斷是否能優化成一階段提交,否則進行二階段提交(二階段提交還可以使用異步線程池方式)。代碼如下:

 if (doOnePhase())            {                onePhaseCommit(reportHeuristics);
ActionManager.manager().remove(get_uid()); } else { int prepareStatus = prepare(reportHeuristics);
if (prepareStatus == TwoPhaseOutcome.PREPARE_NOTOK || prepareStatus == TwoPhaseOutcome.ONE_PHASE_ERROR) { tsLogger.i18NLogger.warn_coordinator_BasicAction_36(get_uid());
if (heuristicDecision != TwoPhaseOutcome.PREPARE_OK) { tsLogger.i18NLogger.warn_coordinator_BasicAction_37(TwoPhaseOutcome.stringForm(heuristicDecision)); }
tsLogger.i18NLogger.warn_coordinator_BasicAction_38();
if (!reportHeuristics && TxControl.asyncCommit && (parentAction == null)) { TwoPhaseCommitThreadPool.submitJob(new AsyncCommit(this, false)); } else phase2Abort(reportHeuristics); /* first phase failed */ } else { if (!reportHeuristics && TxControl.asyncCommit && (parentAction == null)) { TwoPhaseCommitThreadPool.submitJob(new AsyncCommit(this, true)); } else phase2Commit(reportHeuristics); /* first phase succeeded */ } } }
復制代碼

一階段提交

進入方法 onePhaseCommit, 最后會調用com.arjuna.ats.internal.jta.resources.arjunacore.XAResourceRecord.topLevelOnePhaseCommit()。該方法首先會發起 XA end 語句,然后再執行 XA commit 語句。代碼如下:

//省略相關代碼//執行XA end語句 endAssociation(XAResource.TMSUCCESS, TxInfo.NOT_ASSOCIATED);
//執行XA commit _theXAResource.commit(_tranID, true);
復制代碼

二階段提交

  • 首先會進行進入 prepare(reportHeuristics);, 最后會調用com.arjuna.ats.internal.jta.resources.arjunacore.XAResourceRecord.topLevelPrepare()該方法首先會執行 XA end 語句,然后執行 XA prepare語句。代碼如下:

//省略相關代碼//執行XA end語句 endAssociation(XAResource.TMSUCCESS, TxInfo.NOT_ASSOCIATED);
//執行XA prepare theXAResource.prepare(_tranID)
復制代碼

  • 接下來進行提交,進入方法 phase2Commit, 最后會調用com.arjuna.ats.internal.jta.resources.arjunacore.XAResourceRecord.topLevelCommit()。該方法會執行XA commit語句。代碼如下:

//省略相關代碼//執行XA commit _theXAResource.commit(_tranID, fase);
復制代碼

Narayana 回滾流程

首先我們先切換回org.apache.shardingsphere.transaction.xa.XAShardingTransactionManager.rollback() 方法,然后會進入 com.arjuna.ats.internal.jta.transaction.arjunacore.BaseTransaction.rollback() 方法,代碼如下:

public void rollback() throws java.lang.IllegalStateException,            java.lang.SecurityException, javax.transaction.SystemException    {        if (jtaLogger.logger.isTraceEnabled()) {            jtaLogger.logger.trace("BaseTransaction.rollback");        }
TransactionImple theTransaction = TransactionImple.getTransaction();
if (theTransaction == null) throw new IllegalStateException( "BaseTransaction.rollback - " jtaLogger.i18NLogger.get_transaction_arjunacore_notx());
theTransaction.rollbackAndDisassociate(); }
復制代碼

  • 代碼最后后進入com.arjuna.ats.arjuna.coordinator.BasicAction.topLevelAbort()。代碼如下:

//省略代碼//先執行XA end 語句endAssociation(XAResource.TMFAIL, TxInfo.FAILED);
//然后執行XA rollback_theXAResource.rollback(_tranID);
復制代碼

  • 接下來就是清除換成,清除事務日志。代碼如下:

       ActionManager.manager().remove(get_uid());
actionStatus = ActionStatus.ABORTED;
if (TxStats.enabled()) { TxStats.getInstance().incrementAbortedTransactions();
if (applicationAbort) TxStats.getInstance().incrementApplicationRollbacks(); }
復制代碼

總結 :可以看到回滾流程會稍微畢竟簡單。先執行 XA end 語句,然后執行 XA rollback 語句。

文章到此,已經寫的很長很多了,我們分析了 ShardingSphere 對于 XA 方案,提供了一套 SPI 解決方案,對 Narayana 進行了整合,也分析了 Narayana 初始化流程,開始事務流程,獲取連接流程,提交事務流程,回滾事務流程。下一篇文章,我們來詳解 narayana 的事務恢復流程。

作者介紹

肖宇,Apache ShardingSphere Committer,開源 hmily 分布式事務框架作者,開源 soul 網關作者,熱愛開源,追求寫優雅代碼。目前就職于京東數科,參與 ShardingSphere 的開源建設,以及分布式資料庫的研發工作。

本文轉載自公眾號 ShardingSphere 官微(ID:Sharding-Sphere)。

原文鏈接

Shardingsphere整合Narayana對XA分布式事務的支持(4)

0 条回复 A文章作者 M管理員
    暫無討論,說說你的看法吧