2000字范文,分享全网优秀范文,学习好帮手!
2000字范文 > TX-LCN分布式事务之LCN模式

TX-LCN分布式事务之LCN模式

时间:2023-11-16 19:46:01

相关推荐

TX-LCN分布式事务之LCN模式

什么是LCN模式

LCN模式是TX-LCN分布式事务模式的一种,L-lock-锁定事务单元、C-confirm-确认事务模块状态、notify-通知事务单元

原理

LCN模式是通过Spring AOP的方式代理Connection的方式实现对本地事务的操作,然后在由TxManager统一协调控制事务。 当本地事务提交回滚或者关闭连接时将会执行假操作,该代理的连接将由LCN连接池管理。

模式特点

该模式对代码的嵌入性为低。

该模式仅限于本地存在连接对象且可通过连接对象控制事务的模块。

该模式下的事务提交与回滚是由本地事务方控制,对于数据一致性上有较高的保障。

该模式缺陷在于代理的连接需要随事务发起方一共释放连接,增加了连接占用的时间。

源码解读

首先我们来看几个关键的类DataSourceAspect-数据源切面类、TransactionAspect事务切面类、LcnConnectionProxylcn 连接代理类、DTXLogicWeaver分布式事务调度器、DTXServiceExecutor分布式事务执行器

DataSourceAspect的作用

源码

@Aspect

@Component

publicclassDataSourceAspectimplementsOrdered{

privatestaticfinalLoggerlog=LoggerFactory.getLogger(DataSourceAspect.class);

privatefinalTxClientConfigtxClientConfig;

privatefinalDTXResourceWeaverdtxResourceWeaver;

publicDataSourceAspect(TxClientConfigtxClientConfig,DTXResourceWeaverdtxResourceWeaver){

this.txClientConfig=txClientConfig;

this.dtxResourceWeaver=dtxResourceWeaver;

}

@Around("execution(*javax.sql.DataSource.getConnection(..))")

publicObjectaround(ProceedingJoinPointpoint)throwsThrowable{

returnthis.dtxResourceWeaver.getConnection(()->{

return(Connection)point.proceed();

});

}

publicintgetOrder(){

returnthis.txClientConfig.getResourceOrder();

}

}

由该类的源码,我们能够知道,lcn模式主要对数据库的连接进行了拦截代理。获取到数据库的连接交由lcn来进行代理。

TransactionAspect 作用

源码

@Aspect

@Component

publicclassTransactionAspectimplementsOrdered{

privatestaticfinalLoggerlog=LoggerFactory.getLogger(TransactionAspect.class);

privatefinalTxClientConfigtxClientConfig;

privatefinalDTXLogicWeaverdtxLogicWeaver;

publicTransactionAspect(TxClientConfigtxClientConfig,DTXLogicWeaverdtxLogicWeaver){

this.txClientConfig=txClientConfig;

this.dtxLogicWeaver=dtxLogicWeaver;

}

@Pointcut("@annotation(com.codingapi.txlcn.tc.annotation.LcnTransaction)")

publicvoidlcnTransactionPointcut(){

}

@Around("lcnTransactionPointcut()&&!txcTransactionPointcut()&&!tccTransactionPointcut()&&!txTransactionPointcut()")

publicObjectrunWithLcnTransaction(ProceedingJoinPointpoint)throwsThrowable{

DTXInfodtxInfo=DTXInfo.getFromCache(point);

LcnTransactionlcnTransaction=(LcnTransaction)dtxInfo.getBusinessMethod().getAnnotation(LcnTransaction.class);

dtxInfo.setTransactionType("lcn");

dtxInfo.setTransactionPropagation(lcnTransaction.propagation());

DTXLogicWeavervar10000=this.dtxLogicWeaver;

point.getClass();

returnvar10000.runTransaction(dtxInfo,point::proceed);

}

publicintgetOrder(){

returnthis.txClientConfig.getDtxAspectOrder();

}

}

由该类的源码,我们能够明白,通过解析@LcnTransaction注解进行相应的操作。代码会调用到DTXLogicWeaver

DTXLogicWeaver 作用

publicObjectrunTransaction(DTXInfodtxInfo,BusinessCallbackbusiness)throwsThrowable{

if(Objects.isNull(DTXLocalContext.cur())){

DTXLocalContext.getOrNew();

log.debug("<----TxLcnstart---->");

DTXLocalContextdtxLocalContext=DTXLocalContext.getOrNew();

TxContexttxContext;

if(this.globalContext.hasTxContext()){

txContext=this.globalContext.txContext();

dtxLocalContext.setInGroup(true);

log.debug("Unit[{}]usedparent'sTxContext[{}].",dtxInfo.getUnitId(),txContext.getGroupId());

}else{

txContext=this.globalContext.startTx();

}

if(Objects.nonNull(dtxLocalContext.getGroupId())){

dtxLocalContext.setDestroy(false);

}

dtxLocalContext.setUnitId(dtxInfo.getUnitId());

dtxLocalContext.setGroupId(txContext.getGroupId());

dtxLocalContext.setTransactionType(dtxInfo.getTransactionType());

TxTransactionInfoinfo=newTxTransactionInfo();

info.setBusinessCallback(business);

info.setGroupId(txContext.getGroupId());

info.setUnitId(dtxInfo.getUnitId());

info.setPointMethod(dtxInfo.getBusinessMethod());

info.setPropagation(dtxInfo.getTransactionPropagation());

info.setTransactionInfo(dtxInfo.getTransactionInfo());

info.setTransactionType(dtxInfo.getTransactionType());

info.setTransactionStart(txContext.isDtxStart());

booleanvar15=false;

Objectvar6;

try{

var15=true;

var6=this.transactionServiceExecutor.transactionRunning(info);

var15=false;

}finally{

if(var15){

if(dtxLocalContext.isDestroy()){

synchronized(txContext.getLock()){

txContext.getLock().notifyAll();

}

if(!dtxLocalContext.isInGroup()){

this.globalContext.destroyTx();

}

DTXLocalContext.makeNeverAppeared();

TracingContext.tracing().destroy();

}

log.debug("<----TxLcnend---->");

}

}

if(dtxLocalContext.isDestroy()){

synchronized(txContext.getLock()){

txContext.getLock().notifyAll();

}

if(!dtxLocalContext.isInGroup()){

this.globalContext.destroyTx();

}

DTXLocalContext.makeNeverAppeared();

TracingContext.tracing().destroy();

}

log.debug("<----TxLcnend---->");

returnvar6;

}else{

returnbusiness.call();

}

}

以上代码是该类的核心逻辑,可以看出来TX-LCN事务的处理全部都是走的这个类的该方法,最终会调用到DTXServiceExecutor分布式事务执行器

DTXServiceExecutor 作用

/**

*事务业务执行

*

*@paraminfoinfo

*@returnObject

*@throwsThrowableThrowable

*/

publicObjecttransactionRunning(TxTransactionInfoinfo)throwsThrowable{

//1.获取事务类型

StringtransactionType=info.getTransactionType();

//2.获取事务传播状态

DTXPropagationStatepropagationState=propagationResolver.resolvePropagationState(info);

//2.1如果不参与分布式事务立即终止

if(propagationState.isIgnored()){

returninfo.getBusinessCallback().call();

}

//3.获取本地分布式事务控制器

DTXLocalControldtxLocalControl=txLcnBeanHelper.loadDTXLocalControl(transactionType,propagationState);

//4.织入事务操作

try{

//4.1记录事务类型到事务上下文

Set<String>transactionTypeSet=globalContext.txContext(info.getGroupId()).getTransactionTypes();

transactionTypeSet.add(transactionType);

dtxLocalControl.preBusinessCode(info);

//4.2业务执行前

txLogger.txTrace(

info.getGroupId(),info.getUnitId(),"prebusinesscode,unittype:{}",transactionType);

//4.3执行业务

Objectresult=dtxLocalControl.doBusinessCode(info);

//4.4业务执行成功

txLogger.txTrace(info.getGroupId(),info.getUnitId(),"businesssuccess");

dtxLocalControl.onBusinessCodeSuccess(info,result);

returnresult;

}catch(TransactionExceptione){

txLogger.error(info.getGroupId(),info.getUnitId(),"beforebusinesscodeerror");

throwe;

}catch(Throwablee){

//4.5业务执行失败

txLogger.error(info.getGroupId(),info.getUnitId(),Transactions.TAG_TRANSACTION,

"businesscodeerror");

dtxLocalControl.onBusinessCodeError(info,e);

throwe;

}finally{

//4.6业务执行完毕

dtxLocalControl.postBusinessCode(info);

}

}

通过以上代码可以看出,该类是整个事务执行关键类。

以上就是LCN模式比较核心的代码,其他的分支代码就不一一赘述了

实战

由上一篇分布式事务之TX-LCN 我们规划了俩个TC分别是lcn-order服务和lcn-pay服务,我们的思路是订单服务调用支付服务,分别在订单服务表t_order和支付服务表t_pay中插入插入数据。

订单服务核心代码和数据表脚本

代码

/**

*@author:triumphxx

*@Date:/10/24

*@Time:2:13下午

*@微信公众号:北漂码农有话说

*@网站:

*@GitHub/triumphxx

*@Desc:

**/

@RestController

publicclassLcnOrderController{

@Autowired

TOrderDaotOrderDao;

@Autowired

privateRestTemplaterestTemplate;

@PostMapping("/add-order")

@Transactional(rollbackFor=Exception.class)

@LcnTransaction

publicStringadd(){

TOrderbean=newTOrder();

bean.setTId(1);

bean.setTName("order");

restTemplate.postForEntity("http://lcn-pay/add-pay","",String.class);

//inti=1/0;

tOrderDao.insert(bean);

return"新增订单成功";

}

}

脚本

CREATETABLE`t_order`(

`t_id`int(11)NOTNULL,

`t_name`varchar(45)DEFAULTNULL

)ENGINE=InnoDBDEFAULTCHARSET=latin1

支付服务核心代码和数据表脚本

代码

/**

*@author:triumphxx

*@Date:/10/24

*@Time:2:26下午

*@微信公众号:北漂码农有话说

*@网站:

*@GitHub/triumphxx

*@Desc:

**/

@RestController

publicclassLcnPayController{

@Autowired

TPayDaotPayDao;

@PostMapping("/add-pay")

@Transactional(rollbackFor=Exception.class)

@LcnTransaction

publicStringaddPay(){

TPaytPay=newTPay();

tPay.setTId(1);

tPay.setTName("t_pay");

inti=tPayDao.insertSelective(tPay);

return"新增支付成功";

}

}

脚本

CREATETABLE`t_pay`(

`t_id`int(11)NOTNULL,

`t_name`varchar(45)DEFAULTNULL

)ENGINE=InnoDBDEFAULTCHARSET=latin1

测试流程

启动Redis

启动TM

启动注册中心eureka-server

启动服务lcn-order

启动服务lcn-pay

请求接口http://localhost:8001/add-order

代码创造异常看数据是否进行回滚

小结

本篇我们分析了TX-LCN分布式事务的lcn模式的原理及相关源码,以及搭建服务的进行测试。希望能对大家有所帮助。 源码地址源码传送门

本内容不代表本网观点和政治立场,如有侵犯你的权益请联系我们处理。
网友评论
网友评论仅供其表达个人看法,并不表明网站立场。