这里面只是简单的通过判断事务的转播属性来决定是否挂起事务或者抛出异常 继续往下跟踪beginTransaction和commitTransaction会发现实际上是调用了DefaultGlobalTransaction.TransactionManager.begin()和DefaultGlobalTransaction.TransactionManager.commit() 而这两个方法只是简单的向TC发送了一个事务开始和事务提交的消息 所以这个切面代理的作用只是告诉TC开启事务和提交事务 TC收到消息后会告诉RM RM会异步的提交事务 这个提交事务的意思并非本地事务的提交 而是异步的删除undo log 因为分支事务里面业务sql和undo log是在一个本地事务里面提交的 本地事务提交的话 数据已经持久话到磁盘了 它的回滚是靠undo log来执行数据修改实现的。下面看下seata是如何实现本地数据的提交和undo log的生成以及业务sql的执行的。
undo log的生成和业务sql的执行是依赖于SeataAutoDataSourceProxyCreator 这个类也是在自动配置类SeataAutoConfiguration里面生成的
public class SeataAutoDataSourceProxyCreator extends AbstractAutoProxyCreator { private static final Logger LOGGER LoggerFactory.getLogger(SeataAutoDataSourceProxyCreator.class); private final String[] excludes; //被代理类的方法在这里面被代理执行 private final Advisor advisor new DefaultIntroductionAdvisor(new SeataAutoDataSourceProxyAdvice()); public SeataAutoDataSourceProxyCreator(boolean useJdkProxy, String[] excludes) { this.excludes excludes; setProxyTargetClass(!useJdkProxy); Override protected Object[] getAdvicesAndAdvisorsForBean(Class ? beanClass, String beanName, TargetSource customTargetSource) throws BeansException { if (LOGGER.isInfoEnabled()) { LOGGER.info( Auto proxy of [{}] , beanName); return new Object[]{advisor}; Override protected boolean shouldSkip(Class ? beanClass, String beanName) { //只对datasource进行代理 return SeataProxy.class.isAssignableFrom(beanClass) || !DataSource.class.isAssignableFrom(beanClass) || Arrays.asList(excludes).contains(beanClass.getName());
SeataAutoDataSourceProxyCreator也是一样继承了AbstractAutoProxyCreator spring会在对应bean实列前生成代理。DefaultIntroductionAdvisor里面使用了DataSourceProxy来代理DataSource的执行。DataSourceProxy里面获取的connection是ConnectionProxy ConnectionProxy的父类里面获取的statement是StatementProxy,StatementProxy使用ExecuteTemplate来执行statement ExecuteTemplate使用Executor类执行statement Executor有InsertExecutor UpdateExecutor DeleteExecutor SelectForUpdateExecutor PlainExecutor几种类型下面来分析下UpdateExecutor
public class UpdateExecutor T, S extends Statement extends AbstractDMLBaseExecutor T, S { Override protected TableRecords beforeImage() throws SQLException { ArrayList List Object paramAppenderList new ArrayList (); TableMeta tmeta getTableMeta(); String selectSQL buildBeforeImageSQL(tmeta, paramAppenderList); return buildTableRecords(tmeta, selectSQL, paramAppenderList); private String buildBeforeImageSQL(TableMeta tableMeta, ArrayList List Object paramAppenderList) { SQLUpdateRecognizer recognizer (SQLUpdateRecognizer) sqlRecognizer; StringBuilder prefix new StringBuilder( SELECT ); StringBuilder suffix new StringBuilder( FROM ).append(getFromTableInSQL()); String whereCondition buildWhereCondition(recognizer, paramAppenderList); if (StringUtils.isNotBlank(whereCondition)) { suffix.append( WHERE ).append(whereCondition); suffix.append( FOR UPDATE ); StringJoiner selectSQLJoin new StringJoiner( , , prefix.toString(), suffix.toString()); if (ONLY_CARE_UPDATE_COLUMNS) { List String updateColumns recognizer.getUpdateColumns(); if (!containsPK(updateColumns)) { selectSQLJoin.add(getColumnNamesInSQL(tableMeta.getEscapePkNameList(getDbType()))); for (String columnName : updateColumns) { selectSQLJoin.add(columnName); } else { for (String columnName : tableMeta.getAllColumns().keySet()) { selectSQLJoin.add(ColumnUtils.addEscape(columnName, getDbType())); return selectSQLJoin.toString(); Override protected TableRecords afterImage(TableRecords beforeImage) throws SQLException { TableMeta tmeta getTableMeta(); if (beforeImage null || beforeImage.size() 0) { return TableRecords.empty(getTableMeta()); String selectSQL buildAfterImageSQL(tmeta, beforeImage); ResultSet rs null; try (PreparedStatement pst statementProxy.getConnection().prepareStatement(selectSQL)) { SqlGenerateUtils.setParamForPk(beforeImage.pkRows(), getTableMeta().getPrimaryKeyOnlyName(), pst); rs pst.executeQuery(); return TableRecords.buildRecords(tmeta, rs); } finally { IOUtil.close(rs); private String buildAfterImageSQL(TableMeta tableMeta, TableRecords beforeImage) throws SQLException { StringBuilder prefix new StringBuilder( SELECT ); String whereSql SqlGenerateUtils.buildWhereConditionByPKs(tableMeta.getPrimaryKeyOnlyName(), beforeImage.pkRows().size(), getDbType()); String suffix FROM getFromTableInSQL() WHERE whereSql; StringJoiner selectSQLJoiner new StringJoiner( , , prefix.toString(), suffix); if (ONLY_CARE_UPDATE_COLUMNS) { SQLUpdateRecognizer recognizer (SQLUpdateRecognizer) sqlRecognizer; List String updateColumns recognizer.getUpdateColumns(); if (!containsPK(updateColumns)) { selectSQLJoiner.add(getColumnNamesInSQL(tableMeta.getEscapePkNameList(getDbType()))); for (String columnName : updateColumns) { selectSQLJoiner.add(columnName); } else { for (String columnName : tableMeta.getAllColumns().keySet()) { selectSQLJoiner.add(ColumnUtils.addEscape(columnName, getDbType())); return selectSQLJoiner.toString();
UpdateExecutor只实现了几个模板方法 用于构建业务sql执行前后 对应需要修改的数据记录修改前的值和修改后的值 这些内容会写入到undo log表的context字段里面 如果查看DeleteExecutor源码可以发现它只只实现了获取前镜像的模板方法。真正执行的动作是在AbstractDMLBaseExecutor里面
public abstract class AbstractDMLBaseExecutor T, S extends Statement extends BaseTransactionalExecutor T, S { Override public T doExecute(Object... args) throws Throwable { AbstractConnectionProxy connectionProxy statementProxy.getConnectionProxy(); if (connectionProxy.getAutoCommit()) { return executeAutoCommitTrue(args); } else { return executeAutoCommitFalse(args); * Execute auto commit false t. * param args the args * return the t * throws Exception the exception protected T executeAutoCommitFalse(Object[] args) throws Exception { if (!JdbcConstants.MYSQL.equalsIgnoreCase(getDbType()) getTableMeta().getPrimaryKeyOnlyName().size() 1) throw new NotSupportYetException( multi pk only support mysql! ); TableRecords beforeImage beforeImage(); T result statementCallback.execute(statementProxy.getTargetStatement(), args); TableRecords afterImage afterImage(beforeImage); prepareUndoLog(beforeImage, afterImage); return result; * Execute auto commit true t. * param args the args * return the t * throws Throwable the throwable protected T executeAutoCommitTrue(Object[] args) throws Throwable { ConnectionProxy connectionProxy statementProxy.getConnectionProxy(); try { connectionProxy.setAutoCommit(false); return new LockRetryPolicy(connectionProxy).execute(() - { T result executeAutoCommitFalse(args); connectionProxy.commit(); return result; } catch (Exception e) { // when exception occur in finally,this exception will lost, so just print it here LOGGER.error( execute executeAutoCommitTrue error:{} , e.getMessage(), e); if (!LockRetryPolicy.isLockRetryPolicyBranchRollbackOnConflict()) { connectionProxy.getTargetConnection().rollback(); throw e; } finally { connectionProxy.getContext().reset(); connectionProxy.setAutoCommit(true);
可以看到executeAutoCommitFalse中终于把statement执行掉了 也就是业务sql在这一步执行成功了 获取了数据前的数据镜像 然后根据执行的结果获取了数据修改后的镜像 并通过prepareUndoLog生成了undo log 但prepareUndoLog并不只是生成undo log这么简单 整个事务的流程这里面还有另一个关键动装 生成锁的key 将锁的key放到ConnectionProxy里面 ConnectionProxy在做本地事务提交前会获取锁 获取失败会有对应的重试方法
protected void prepareUndoLog(TableRecords beforeImage, TableRecords afterImage) throws SQLException { if (beforeImage.getRows().isEmpty() afterImage.getRows().isEmpty()) { return; ConnectionProxy connectionProxy statementProxy.getConnectionProxy(); TableRecords lockKeyRecords sqlRecognizer.getSQLType() SQLType.DELETE ? beforeImage : afterImage; String lockKeys buildLockKey(lockKeyRecords); connectionProxy.appendLockKey(lockKeys); SQLUndoLog sqlUndoLog buildUndoItem(beforeImage, afterImage); connectionProxy.appendUndoLog(sqlUndoLog);
ConnectionProxy里面的提交本地事务方法
private void processLocalCommitWithGlobalLocks() throws SQLException { checkLock(context.buildLockKeys()); try { targetConnection.commit(); } catch (Throwable ex) { throw new SQLException(ex); context.reset();
到此整个本地事务执行的流程完成了。
spring-cloud-starter-alibaba-seata通过设置了fegin拦截器和resttemplate拦截器来传递xid到下一个事务节点里面 下一步分析下一个节点是如何通过xid来参与到事务中来的。
本文链接: http://benaglobal.immuno-online.com/view-711791.html