1. 背景 一个主库和n个应用库的数据源,并且会同时操作主库和应用库的数据,需要解决以下两个问题:
如何动态管理多个数据源以及切换? 如何保证多数据源场景下的数据一致性(事务)? 本文主要探讨这两个问题的解决方案,希望能对读者有一定的启发。
基于 spring boot + mybatis plus + vue & element 实现的后台管理系统 + 用户小程序,支持 rbac 动态权限、多租户、数据权限、工作流、三方登录、支付、短信、商城等功能
项目地址:https://github.com/yunaiv/ruoyi-vue-pro 视频教程:https://doc.iocoder.cn/video/ 2. 数据源切换原理 通过扩展spring提供的抽象类abstractroutingdatasource,可以实现切换数据源。其类结构如下图所示:
targetdatasources&defaulttargetdatasource 项目上需要使用的所有数据源和默认数据源。
resolveddatasources&resolveddefaultdatasource 当spring容器创建abstractroutingdatasource对象时,通过调用afterpropertiesset复制上述目标数据源。由此可见,一旦数据源实例对象创建完毕,业务无法再添加新的数据源。
determinecurrentlookupkey 此方法为抽象方法,通过扩展这个方法来实现数据源的切换。目标数据源的结构为:map其key为lookup key。
我们来看官方对这个方法的注释:
lookup key通常是绑定在线程上下文中,根据这个key去resolveddatasources中取出datasource。
根据目标数据源的管理方式不同,可以使用基于配置文件和数据库表两种方式。基于配置文件管理方案无法后续添加新的数据源,而基于数据库表方案管理,则更加灵活。
基于 spring cloud alibaba + gateway + nacos + rocketmq + vue & element 实现的后台管理系统 + 用户小程序,支持 rbac 动态权限、多租户、数据权限、工作流、三方登录、支付、短信、商城等功能
项目地址:https://github.com/yunaiv/yudao-cloud 视频教程:https://doc.iocoder.cn/video/ 3. 配置文件解决方案 根据上面的分析,我们可以按照下面的步骤去实现:
定义dynamicdatasource类继承abstractroutingdatasource,重写determinecurrentlookupkey()方法。 配置多个数据源注入targetdatasources和defaulttargetdatasource,通过afterpropertiesset()方法将数据源写入resolveddatasources和resolveddefaultdatasource。 调用abstractroutingdatasource的getconnection()方法时,determinetargetdatasource()方法返回datasource执行底层的getconnection()。 其流程如下图所示:
3.1 创建数据源 dynamicdatasource数据源的注入,目前业界主流实现步骤如下:
在配置文件中定义数据源
spring.datasource.type=com.alibaba.druid.pool.druiddatasourcespring.datasource.driverclassname=com.mysql.jdbc.driver# 主数据源spring.datasource.druid.master.url=jdbcurlspring.datasource.druid.master.username=***spring.datasource.druid.master.password=***# 其他数据源spring.datasource.druid.second.url=jdbcurlspring.datasource.druid.second.username=***spring.datasource.druid.second.password=*** 在代码中配置bean
@configurationpublic class dynamicdatasourceconfig { @bean @configurationproperties(spring.datasource.druid.master) public datasource firstdatasource(){ return druiddatasourcebuilder.create().build(); } @bean @configurationproperties(spring.datasource.druid.second) public datasource seconddatasource(){ return druiddatasourcebuilder.create().build(); } @bean @primary public dynamicdatasource datasource(datasource firstdatasource, datasource seconddatasource) { map targetdatasources = new hashmap(5); targetdatasources.put(datasourcenames.first, firstdatasource); targetdatasources.put(datasourcenames.second, seconddatasource); return new dynamicdatasource(firstdatasource, targetdatasources); }} 3.2 aop处理 通过datasourceaspect切面技术来简化业务上的使用,只需要在业务方法添加@switchdatasource注解即可完成动态切换:
@documented@retention(retentionpolicy.runtime)@target({elementtype.method})public @interface switchdatasource { string value();} datasourceaspect拦截业务方法,更新当前线程上下文datasourcecontextholder中存储的key,即可实现数据源切换。
3.3 方案不足 基于abstractroutingdatasource的多数据源动态切换,有个明显的缺点,无法动态添加和删除数据源。在我们的产品中,不能把应用数据源写死在配置文件。接下来分享一下基于数据库表的实现方案。
4. 数据库表解决方案 我们需要实现可视化的数据源管理,并实时查看数据源的运行状态。所以我们不能把数据源全部配置在文件中,应该将数据源定义保存到数据库表。参考abstractroutingdatasource的设计思路,实现自定义数据源管理。
4.1 设计数据源表 主库的数据源信息仍然配置在项目配置文件中,应用库数据源配置参数,则设计对应的数据表。表结构如下所示:
这个表主要就是datasource的相关配置参数,其相应的orm操作代码在此不再赘述,主要是实现数据源的增删改查操作。
4.2 自定义数据源管理 4.2.1 定义管理接口 通过继承abstractdatasource即可实现dynamicdatasource。为了方便对数据源进行操作,我们定义一个接口datasourcemanager,为业务提供操作数据源的统一接口。
public interface datasourcemanager { void put(string var1, datasource var2); datasource get(string var1); boolean hasdatasource(string var1); void remove(string var1); void closedatasource(string var1); collection all();} 该接口主要是对数据表中定义的数据源,提供基础管理功能。
4.2.2 自定义数据源 dynamicdatasource的实现如下图所示:
根据前面的分析,abstractroutingdatasource是在容器启动的时候,执行afterpropertiesset注入数据源对象,完成之后无法对数据源进行修改。dynamicdatasource则实现datasourcemanager接口,可以将数据表中的数据源加载到datasources。
4.2.3 切面处理 这一块的处理跟配置文件数据源方案处理方式相同,都是通过aop技术切换lookup key。
public datasource determinetargetdatasource() { string lookupkey = datasourcecontextholder.getkey(); datasource datasource = optional.ofnullable(lookupkey) .map(datasources::get) .orelse(defaultdatasource); if (datasource == null) { throw new illegalstateexception(cannot determine datasource for lookup key [ + lookupkey + ]); } return datasource; } 4.2.4 管理数据源状态 在项目启动的时候,加载数据表中的所有数据源,并执行初始化。初始化操作主要是使用springboot提供的datasourcebuilder类,根据数据源表的定义创建datasource。在项目运行过程中,可以使用定时任务对数据源进行保活,为了提升性能再添加一层缓存。
abstractroutingdatasource 只支持单库事务,切换数据源是在开启事务之前执行。 spring使用 datasourcetransactionmanager进行事务管理。开启事务,会将数据源缓存到datasourcetransactionobject对象中,后续的commit和 rollback事务操作实际上是使用的同一个数据源。
如何解决切库事务问题?借助spring的声明式事务处理,我们可以在多次切库操作时强制开启新的事务:
@switchdatasource @transactional(rollbackfor = exception.class, propagation = propagation.requires_new) 这样的话,执行切库操作的时候强制启动新事务,便可实现多次切库而且事务能够生效。但是这种事务方式,存在数据一致性问题:
假若serviceb正常执行提交事务,接着返回servicea执行并且发生异常。因为两次处理是不同的事务,servicea这个事务执行回滚,而servicea事务已经提交。这样的话,数据就不一致了。接下来,我们主要讨论如何解决多库的事务问题。
6. 多库事务处理 6.1 关于事务的理解 首先有必要理解事务的本质。
1.提到spring事务,就离不开事务的四大特性和隔离级别、七大传播特性。
事务特性和离级别是属于数据库范畴。spring事务的七大传播特性是什么呢?它是spring在当前线程内,处理多个事务操作时的事务应用策略,数据库事务本身并不存在传播特性。
2.spring事务的定义包括:begin、commit、rollback、close、suspend、resume等动作。
begin(事务开始): 可以认为存在于数据库的命令中,比如mysql的start transaction命令,但是在jdbc编程方式中不存在。 close(事务关闭): spring事务的close()方法,是把connection对象归还给数据库连接池,与事务无关。 suspend(事务挂起): spring中事务挂起的语义是:需要新事务时,将现有的connection保存起来(还有尚未提交的事务),然后创建新的connection2,connection2提交、回滚、关闭完毕后,再把connection1取出来继续执行。 resume(事务恢复): 嵌套事务执行完毕,返回上层事务重新绑定连接对象到事务管理器的过程。 实际上,只有commit、rollback、close是在jdbc真实存在的,而其他动作都是应用的语意,而非jdbc事务的真实命令。因此,事务真实存在的方法是:setautocommit()、commit()、rollback()。
close()语义为:
关闭一个数据库连接,这已经不再是事务的方法了。 使用datasource并不会执行物理关闭,只是归还给连接池。
6.2 自定义管理事务 为了保证在多个数据源中事务的一致性,我们可以手动管理connetion的事务提交和回滚。考虑到不同orm框架的事务管理实现差异,要求实现自定义事务管理不影响框架层的事务。
这可以通过使用装饰器设计模式,对connection进行包装重写commit和rolllback屏蔽其默认行为,这样就不会影响到原生connection和orm框架的默认事务行为。其整体思路如下图所示:
这里并没有使用前面提到的@switchdatasource,这是因为我们在transactionaop中已经执行了lookupkey的切换。
6.2.1 定义多事务注解 @target({elementtype.method})@retention(retentionpolicy.runtime)@documentedpublic @interface multitransaction { string transactionmanager() default multitransactionmanager; // 默认数据隔离级别,随数据库本身默认值 isolationlevel isolationlevel() default isolationlevel.default; // 默认为主库数据源 string datasourceid() default default; // 只读事务,若有更新操作会抛出异常 boolean readonly() default false; 业务方法只需使用该注解即可开启事务,datasourceid指定事务用到的数据源,不指定默认为主库。
6.2.3 包装connection 自定义事务我们使用包装过的connection,屏蔽其中的commit&rollback方法。这样我们就可以在主事务里进行统一的事务提交和回滚操作。
public class connectionproxy implements connection { private final connection connection; public connectionproxy(connection connection) { this.connection = connection; } @override public void commit() throws sqlexception { // connection.commit(); } public void realcommit() throws sqlexception { connection.commit(); } @override public void close() throws sqlexception { //connection.close(); } public void realclose() throws sqlexception { if (!connection.getautocommit()) { connection.setautocommit(true); } connection.close(); } @override public void rollback() throws sqlexception { if(!connection.isclosed()) connection.rollback(); } ...} 这里commit&close方法不执行操作,rollback执行的前提是连接执行close才生效。这样不管是使用哪个orm框架,其自身事务管理都将失效。事务的控制就交由multitransaction控制了。
6.2.4 事务上下文管理 public class transactionholder { // 是否开启了一个multitransaction private boolean isopen; // 是否只读事务 private boolean readonly; // 事务隔离级别 private isolationlevel isolationlevel; // 维护当前线程事务id和连接关系 private concurrenthashmap connectionmap; // 事务执行栈 private stack executestack; // 数据源切换栈 private stack datasourcekeystack; // 主事务id private string maintransactionid; // 执行次数 private atomicinteger transcount; // 事务和数据源key关系 private concurrenthashmap executeiddatasourcekeymap; } 每开启一个事物,生成一个事务id并绑定一个connectionproxy。事务嵌套调用,保存事务id和lookupkey至栈中,当内层事务执行完毕执行pop。这样的话,外层事务只需在栈中执行peek即可获取事务id和lookupkey。
6.2.5 数据源兼容处理 为了不影响原生事务的使用,需要重写getconnection方法。当前线程没有启动自定义事务,则直接从数据源中返回连接。
@override public connection getconnection() throws sqlexception { transactionholder transactionholder = multitransactionmanager.transaction_holder_thread_local.get(); if (objects.isnull(transactionholder)) { return determinetargetdatasource().getconnection(); } connectionproxy connectionproxy = transactionholder.getconnectionmap() .get(transactionholder.getexecutestack().peek()); if (connectionproxy == null) { // 没开跨库事务,直接返回 return determinetargetdatasource().getconnection(); } else { transactionholder.addcount(); // 开了跨库事务,从当前线程中拿包装过的connection return connectionproxy; } } 6.2.6 切面处理 切面处理的核心逻辑是:维护一个嵌套事务栈,当业务方法执行结束,或者发生异常时,判断当前栈顶事务id是否为主事务id。如果是的话这时候已经到了最外层事务,这时才执行提交和回滚。详细流程如下图所示:
package com.github.mtxn.transaction.aop; import com.github.mtxn.application.application;import com.github.mtxn.transaction.multitransactionmanager;import com.github.mtxn.transaction.annotation.multitransaction;import com.github.mtxn.transaction.context.datasourcecontextholder;import com.github.mtxn.transaction.support.isolationlevel;import com.github.mtxn.transaction.support.transactionholder;import com.github.mtxn.utils.exceptionutils;import lombok.extern.slf4j.slf4j;import org.aspectj.lang.proceedingjoinpoint;import org.aspectj.lang.annotation.around;import org.aspectj.lang.annotation.aspect;import org.aspectj.lang.annotation.pointcut;import org.aspectj.lang.reflect.methodsignature;import org.springframework.core.annotation.order;import org.springframework.stereotype.component; import java.lang.reflect.method; @aspect@component@slf4j@order(99999)public class multitransactionaop { @pointcut(@annotation(com.github.mtxn.transaction.annotation.multitransaction)) public void pointcut() { if (log.isdebugenabled()) { log.debug(start in transaction pointcut...); } } @around(pointcut()) public object aroundtransaction(proceedingjoinpoint point) throws throwable { methodsignature signature = (methodsignature) point.getsignature(); // 从切面中获取当前方法 method method = signature.getmethod(); multitransaction multitransaction = method.getannotation(multitransaction.class); if (multitransaction == null) { return point.proceed(); } isolationlevel isolationlevel = multitransaction.isolationlevel(); boolean readonly = multitransaction.readonly(); string prevkey = datasourcecontextholder.getkey(); multitransactionmanager multitransactionmanager = application.resolve(multitransaction.transactionmanager()); // 切数据源,如果失败使用默认库 if (multitransactionmanager.switchdatasource(point, signature, multitransaction)) return point.proceed(); // 开启事务栈 transactionholder transactionholder = multitransactionmanager.starttransaction(prevkey, isolationlevel, readonly, multitransactionmanager); object proceed; try { proceed = point.proceed(); multitransactionmanager.commit(); } catch (throwable ex) { log.error(execute method:{}#{},err:, method.getdeclaringclass(), method.getname(), ex); multitransactionmanager.rollback(); throw exceptionutils.api(ex, 系统异常:%s, ex.getmessage()); } finally { // 当前事务结束出栈 string transid = multitransactionmanager.gettrans().getexecutestack().pop(); transactionholder.getdatasourcekeystack().pop(); // 恢复上一层事务 datasourcecontextholder.setkey(transactionholder.getdatasourcekeystack().peek()); // 最后回到主事务,关闭此次事务 multitransactionmanager.close(transid); } return proceed; } } 7.总结 本文主要介绍了多数据源管理的解决方案(应用层事务,而非xa二段提交保证),以及对多个库同时操作的事务管理。
需要注意的是,这种方式只适用于单体架构的应用。因为多个库的事务参与者都是运行在同一个jvm进行。如果是在微服务架构的应用中,则需要使用分布式事务管理(譬如:seata)。
PCB设计的一大难题,如何确保数据既可访问又安全?
LT89101L是一款高性能的电平转换器
石化电气安全,无源无线测温保障
深度分析!五个对智能物联网强烈刚需的工业
DTA和应对不确定性的EMA简析
SpringBoot多数据源及事务解决方案
锂离子电池隔膜材料
你当真了解人工智能吗?别急着回答,先看完这篇文章
小米6最新消息:小米6至今无现货,小米Note2和小米MIX现货且不断降价,何必苦抢小米6?
智能电网的用途和应用
安卓7.0还未普及 安卓8.0即将到来 这是要累死小米华为的节奏
米家投影仪体验 没有激光电视那么震撼但更为实用
大联大“霸总”式收购文晔股权两大疑点、三大反转可能!最终能否达成?
光电晶体管的工作原理,基于光电晶体管构建的自动灯开/关开关
为机器人安全限制电机运行范围的简单方法
黄仁勋:ARM中国将是英伟达的一部分
究竟是谁走漏风声?华硕新材料《8曲面屏——无痕贴合、超窄边框》
怎样用电位器控制LED闪烁速率
努比亚视频中的“8”有亮点,究竟会有什么奥秘8.21揭晓?难道第一个安卓8.0
高线性度CMOS调幅电路技术介绍