东方异域见闻~反乌托邦的欲望免安装绿色中文版
8.61G · 2025-11-18
在 Spring Boot 等生态中,多数据源切换是一种常用的基础组件,虽然功能简单但要实现一个并发稳定、鲁棒性好、集成容易的多数元切换组件也需要花费一点功夫。这里给出一些代码示例分析隐藏问题并给出优化建议,不论是面试候选还是代码评审都是一个不错的素材。
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.METHOD)
public @interface Clickhouse {
String value() default "";
}
@Configuration
@MapperScan(basePackages = {"com.xxx.anomaly.**.mapper"}, sqlSessionFactoryRef = "SqlSessionFactory")
public class ClickhouseDatasource {
@Bean(name = "defaultDatasource")
@ConfigurationProperties(prefix = "spring.datasource")
@Primary
public DataSource getDefault() {
return DataSourceBuilder.create().build();
}
@Bean(name = "clickhouse")
@ConfigurationProperties(prefix = "spring.clickhouse")
public DataSource clickhouse(@Qualifier("defaultDatasource") DataSource defaultDatasource) {
if(StringUtils.isNotEmpty(dbUrl)) {
// 获取Clickhouse连接参数
return balancedClickhouseDataSource;
} else {
LOGGER.info("Clickhouse数据源未配置");
return null;
}
}
}
@Aspect
@Order(-1)
@Component
public class DatasourceAop {
private static final String PACKAGE = "com.xxx.anomaly";
@Pointcut("execution(* com.xxx.anomaly..*.*(..))")
public void pointCut(){};
@Before(value = "pointCut()")
public void beforeInvoke(JoinPoint joinpoint) {
try {
String clazzName = joinpoint.getTarget().getClass().getName();
String methodName = joinpoint.getSignature().getName();
if(clazzName.startsWith(PACKAGE)) { // 防止第三方jar包的动态代理影响(如mybatis)
Class targetClazz = Class.forName(clazzName);
Method[] methods = targetClazz.getMethods();
for(Method method : methods) {
if(method.getName().equals(methodName)) {
if(method.isAnnotationPresent(Clickhouse.class)) {
DatasourceType.setDataBaseType(DatasourceType.DataBaseType.CLICKHOUSE);
} else {
DatasourceType.setDataBaseType(DatasourceType.DataBaseType.DEFAULT);
}
break;
}
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
public class DatasourceType {
public enum DataBaseType {
CLICKHOUSE,DEFAULT
}
// 使用ThreadLocal保证线程安全
private static final ThreadLocal<DataBaseType> TYPE = new ThreadLocal<DataBaseType>();
// 往当前线程里设置数据源类型
public static void setDataBaseType(DataBaseType dataBaseType) {
if (dataBaseType == null) {
throw new NullPointerException();
}
//System.err.println("[将当前数据源改为]:" + dataBaseType);
TYPE.set(dataBaseType);
}
// 获取数据源类型
public static DataBaseType getDataBaseType() {
DataBaseType dataBaseType = TYPE.get() == null ? DataBaseType.DEFAULT : TYPE.get();
//System.err.println("[获取当前数据源的类型为]:" + dataBaseType);
return dataBaseType;
}
// 清空数据类型
public static void clearDataBaseType() {
TYPE.remove();
}
}
@Service("dynamicDataSource")
public class DynamicDataSource extends AbstractRoutingDataSource {
@PostConstruct
public void init() {
targetDataSource.put(DatasourceType.DataBaseType.DEFAULT, defaultDatasource);
if(clickhouse != null) {
targetDataSource.put(DatasourceType.DataBaseType.CLICKHOUSE, clickhouse);
}
}
@Override
protected DataSource determineTargetDataSource() {
// 获取数据源名称
Object dbName = (Object) determineCurrentLookupKey();
if(dbName == null) {
return defaultDatasource;
}
if(targetDataSource.get(dbName) == null) {
// 获取Clickhouse连接参数
return balancedClickhouseDataSource;
} else {
LOGGER.error("Clickhouse数据源未配置");
return null;
}
} else {
return (DataSource) targetDataSource.get(dbName);
}
}
@Override
protected Object determineCurrentLookupKey() {
DatasourceType.DataBaseType dataBaseType = DatasourceType.getDataBaseType();
return dataBaseType;
}
@Override
public void afterPropertiesSet() {
}
public void removeDatasouce(Object dbName) {
if(targetDataSource.containsKey(dbName)) {
targetDataSource.remove(dbName);
}
}
public DataSource getDefaultDatasource() {
try {
DataSource dataSource = (DataSource) targetDataSource.get(DatasourceType.DataBaseType.DEFAULT);
return dataSource;
} catch (Exception e) {
LOGGER.error(e.getMessage(), e);
return null;
}
}
}
@Configuration
@MapperScan(basePackages = {"com.xxx.anomaly.**.mapper"}, sqlSessionFactoryRef = "SqlSessionFactory")
public class SessionFactory {
@Autowired
private DynamicDataSource dynamicDataSource;
@Bean("defaultTransactionManager")
@Primary
public DataSourceTransactionManager defaultTransactionManager() {
return new DataSourceTransactionManager(dynamicDataSource);
}
@Bean(name = "SqlSessionFactory")
public MybatisSqlSessionFactoryBean sqlSessionFactory()
throws Exception {
MybatisSqlSessionFactoryBean sessionFactory = new MybatisSqlSessionFactoryBean();
sessionFactory.setDataSource(dynamicDataSource);
MybatisConfiguration configuration = new MybatisConfiguration();
configuration.setMapUnderscoreToCamelCase(true);
configuration.setCallSettersOnNulls(true);
sessionFactory.setConfiguration(configuration);
if (DatabaseUtil.isGuanEnv()) {
sessionFactory.setPlugins(new Interceptor[]{new PaginationInterceptor().setDialectType("postgresql"),new MybatisLikeSqlInterceptor()}); // 分页插件
} else {
sessionFactory.setPlugins(new Interceptor[]{new PaginationInterceptor(),new MybatisLikeSqlInterceptor()}); // 分页插件
}
sessionFactory.setPlugins(new Interceptor[]{new PaginationInterceptor()}); // 分页插件
sessionFactory.setMapperLocations(new PathMatchingResourcePatternResolver().getResources("classpath*:config/dao/**/*.xml"));
return sessionFactory;
}
}
本项目采用 动态数据源路由 架构,支持在运行时根据业务需求自动切换 MySQL(默认)和 ClickHouse 数据源。该架构通过 AOP 切面和 Spring 的 AbstractRoutingDataSource 实现透明的数据源切换,无需业务代码显式处理数据源选择逻辑。
ClickhouseDatasource (配置类)defaultDatasource:主数据源(MySQL/PostgreSQL),通过 spring.datasource.* 配置clickhouse:ClickHouse 数据源,优先从数据库表 ums_sys_datasource_config 读取配置DynamicDataSource (继承 AbstractRoutingDataSource)@PostConstruct 阶段将已创建的数据源缓存到 targetDataSource Map 中determineCurrentLookupKey() 方法读取 ThreadLocal 中的数据源类型标识DatasourceAop (AOP 切面) + @Clickhouse (注解)com.xxx.anomaly..* 包下的所有方法@Clickhouse 注解 → 设置 ThreadLocal 为 CLICKHOUSEDEFAULTThreadLocal 保证多线程环境下数据源选择互不干扰SessionFactory (配置类)DynamicDataSource 注入到 MyBatis 的 SqlSessionFactoryDataSourceTransactionManager 事务管理器此阶段完成默认数据源(MySQL/PostgreSQL)的初始化:
Spring Boot 应用启动
↓
EnvironmentPostProcessor 处理配置文件(application.yml)
↓
Binder 绑定 spring.datasource.* 属性到数据源配置对象
↓
DataSourceBuilder 自动创建 defaultDatasource Bean
(默认优先选择 HikariDataSource 作为连接池实现)
关键源码位置:
ClickhouseDatasource.java 第 63-68 行技术说明:
此阶段根据数据库配置表或配置文件创建 ClickHouse 数据源:
Spring 容器初始化 @Configuration 类
↓
执行 clickhouse() Bean 方法
↓
使用 defaultDatasource 查询配置表
↓
SELECT * FROM ums_sys_datasource_config
WHERE moudle_name='general' AND status=1
↓
根据配置优先级决定数据源配置来源
↓
创建 BalancedClickhouseDataSource 实例
├─ 设置连接池参数(最大连接数、超时等)
├─ scheduleActualization(10s) - 定期刷新节点状态
└─ withConnectionsCleaning(10s) - 定期清理无效连接
关键源码位置:
ClickhouseDatasource.java 第 70-113 行配置优先级(从高到低):
ums_sys_datasource_config 中的配置application.yml 中的 spring.clickhouse.* 静态配置null(ClickHouse 数据源不可用)设计优势:
此阶段将所有数据源注册到 DynamicDataSource 的内部缓存中:
DynamicDataSource Bean 创建
↓
@PostConstruct init() 方法执行
↓
targetDataSource.put(DEFAULT, defaultDatasource) - 注册默认数据源
↓
if (clickhouse != null)
targetDataSource.put(CLICKHOUSE, clickhouse) - 注册 ClickHouse 数据源
↓
setTargetDataSources(targetDataSource) - 设置到父类
↓
afterPropertiesSet() - 完成初始化
关键源码位置:
DynamicDataSource.java 第 61-67 行技术细节:
targetDataSource 是一个 Map,key 为数据源类型枚举,value 为实际的 DataSource 对象此阶段是核心业务逻辑,每次方法调用时都会执行数据源路由判断:
业务方法调用
↓
DatasourceAop 切面拦截(@Before 通知)
↓
检查方法是否标注 @Clickhouse 注解
├─ 有注解:DatasourceType.set(CLICKHOUSE) → 写入 ThreadLocal
└─ 无注解:DatasourceType.set(DEFAULT) → 写入 ThreadLocal
↓
MyBatis 执行 Mapper 方法
↓
SqlSessionFactory 需要获取数据库连接
↓
调用 DynamicDataSource.determineCurrentLookupKey()
└─ 从 ThreadLocal 读取数据源类型(CLICKHOUSE 或 DEFAULT)
↓
调用 DynamicDataSource.determineTargetDataSource()
└─ 根据数据源类型从 targetDataSource 缓存查找
↓
缓存查找结果判断
├─ 缓存命中:直接返回已缓存的数据源对象
└─ 缓存未命中且类型为 CLICKHOUSE(懒加载场景):
├─ 查询数据库配置表 ums_sys_datasource_config
├─ 创建新的 BalancedClickhouseDataSource 实例
├─ 放入 targetDataSource 缓存
└─ 返回新创建的数据源
↓
从目标数据源获取 Connection 对象
↓
MyBatis 通过 Connection 执行 SQL 语句
↓
SQL 路由到对应数据库(MySQL 或 ClickHouse)
关键源码位置:
DatasourceAop.java 第 31-54 行(AOP 拦截逻辑)DynamicDataSource.java 第 70-120 行(数据源路由与懒加载逻辑)技术要点:
sequenceDiagram
autonumber
participant Boot as Spring Boot
participant Cfg as ClickhouseDatasource
participant Dyn as DynamicDataSource
participant AOP as DatasourceAop
participant MyB as MyBatis
participant DB as MySQL
participant CH as ClickHouse
Boot->>Cfg: 加载 @Configuration
Cfg->>DB: 查询 ums_sys_datasource_config
DB-->>Cfg: 返回 CH 配置
Cfg->>Cfg: 创建 BalancedClickhouseDataSource
Cfg-->>Boot: 注册 clickhouse Bean
Boot->>Dyn: 创建 DynamicDataSource
Dyn->>Dyn: @PostConstruct 初始化缓存
Dyn-->>Boot: 初始化完成
Note over AOP,MyB: === 运行时调用 ===
AOP->>AOP: 拦截方法调用
alt 方法有 @Clickhouse
AOP->>Dyn: ThreadLocal.set(CLICKHOUSE)
else 无注解
AOP->>Dyn: ThreadLocal.set(DEFAULT)
end
MyB->>Dyn: getConnection()
Dyn->>Dyn: determineTargetDataSource()
alt 缓存命中
Dyn-->>MyB: 返回已缓存数据源
else 缓存未命中 (懒加载)
Dyn->>DB: 再次查询配置
Dyn->>Dyn: 创建数据源并缓存
Dyn-->>MyB: 返回新数据源
end
alt 使用 DEFAULT
MyB->>DB: 执行 SQL
else 使用 CLICKHOUSE
MyB->>CH: 执行 SQL
end
阶段一:Spring Boot 应用启动与初始化(步骤 1-8)
@Configuration 注解的 ClickhouseDatasource 配置类ClickhouseDatasource Bean 方法执行,使用默认数据源(MySQL)查询配置表 ums_sys_datasource_config,获取 ClickHouse 的连接参数BalancedClickhouseDataSource 实例,并设置连接池参数(最大连接数、超时时间等)clickhouse)DynamicDataSource BeanDynamicDataSource 的 @PostConstruct 方法执行,将 defaultDatasource 和 clickhouse 两个数据源放入内部 targetDataSource 缓存 Map 中阶段二:运行时动态数据源路由(步骤 9-22)
DatasourceAop 切面拦截 com.xxx.anomaly..* 包下的所有方法
10-12. 判断注解并设置数据源类型:
@Clickhouse 注解 → 通过 ThreadLocal 设置数据源类型为 CLICKHOUSEDEFAULT(MySQL)DynamicDataSource 请求获取数据库连接DynamicDataSource.determineTargetDataSource() 方法根据 ThreadLocal 中的数据源类型标识查找实际数据源
15-17. 缓存命中场景:targetDataSource 缓存中查找对应数据源DEFAULT 数据源 → SQL 语句路由到 MySQL 执行CLICKHOUSE 数据源 → SQL 语句路由到 ClickHouse 执行架构设计亮点:
removeDatasouce() 方法清除缓存,下次访问时自动加载最新配置,无需重启应用查看当前 DatasourceAop.java 的实现:
@Before(value = "pointCut()")
public void beforeInvoke(JoinPoint joinpoint) {
// ... 省略其他代码
if (method.isAnnotationPresent(Clickhouse.class)) {
DatasourceType.setDataBaseType(DataBaseType.CLICKHOUSE);
} else {
DatasourceType.setDataBaseType(DataBaseType.DEFAULT);
}
}
代码缺陷分析:
@Before 通知在方法执行前设置数据源类型@After 或 @AfterReturning / @AfterThrowing 清理 ThreadLocalDatasourceType.clearDataBaseType() 方法已定义,但从未被调用在 Web 应用的线程池环境(如 Tomcat 线程池)中,线程会被复用,导致以下问题:
场景时序:
时刻 T1:线程 Thread-1 执行标注 @Clickhouse 的方法
→ ThreadLocal 被设置为 CLICKHOUSE
→ SQL 正确路由到 ClickHouse 执行
→ 方法执行完毕,但 ThreadLocal 未清理
→ 线程返回线程池
时刻 T2:线程 Thread-1 被复用,执行未标注 @Clickhouse 的正常方法
→ AOP 拦截,将 ThreadLocal 设置为 DEFAULT
→ SQL 正确路由到 MySQL
→ 看似正常运行
时刻 T3:线程 Thread-1 再次被复用,执行未标注注解的方法
→ 但前一次请求因异常中断,AOP 的 @Before 未执行
→ ThreadLocal 中残留上次的 CLICKHOUSE 标识
→ 本应路由到 MySQL 的业务请求误路由到 ClickHouse
→ 导致严重问题:
查询失败(ClickHouse 中不存在对应的业务表)
数据写入错误的数据库
事务管理异常
数据一致性被破坏
风险等级:高 - 可能导致数据错误和业务异常
查看 DynamicDataSource.determineTargetDataSource() 懒加载逻辑(第 77-111 行):
if(targetDataSource.get(dbName) == null) {
// 通过数据库获取 ClickHouse 数据源的配置并创建数据源
JdbcTemplate jdbcTemplate = new JdbcTemplate();
// ... 查询数据库配置
BalancedClickhouseDataSource balancedClickhouseDataSource = new BalancedClickhouseDataSource(dbUrl, properties);
targetDataSource.put(DatasourceType.DataBaseType.CLICKHOUSE, balancedClickhouseDataSource);
return balancedClickhouseDataSource;
}
代码缺陷分析:
targetDataSource 使用普通的 HashMap(非线程安全容器)get(dbName) == null 判断与 put() 操作之间非原子并发问题场景:
时刻 T0:ClickHouse 数据源缓存为空
并发线程 A:
T1: if(targetDataSource.get(CLICKHOUSE) == null) → true
T2: 开始创建 BalancedClickhouseDataSource A
并发线程 B:
T1: if(targetDataSource.get(CLICKHOUSE) == null) → true (同时判断为 null)
T2: 开始创建 BalancedClickhouseDataSource B
T3: 线程 B 先完成,put(CLICKHOUSE, dataSourceB)
T4: 线程 A 后完成,put(CLICKHOUSE, dataSourceA) → 覆盖 B
导致的问题:
null 后都创建数据源实例BalancedClickhouseDataSource 对象未正确关闭,连接资源无法释放风险等级:中高 - 在高并发场景下可能导致资源耗尽
Spring 的 DataSourceTransactionManager 在事务开始时获取并绑定数据库连接,事务期间无法切换数据源。
在同一个事务内尝试混用两种数据源:
@Transactional
public void mixedTransaction() {
// 1. 事务开始,获取 MySQL 连接并绑定到当前线程
userMapper.insert(user); // 路由到 MySQL
// 2. 调用标注 @Clickhouse 的方法
logToClickhouse(); // 期望路由到 ClickHouse,但实际仍使用 MySQL 连接
// 3. 继续 MySQL 操作
orderMapper.insert(order); // 仍使用同一个 MySQL 连接
}
@Clickhouse
public void logToClickhouse() {
// ThreadLocal 被设置为 CLICKHOUSE
// 但事务已绑定 MySQL 连接,无法切换
logMapper.insert(log); // 实际仍在 MySQL 执行!
}
问题根本原因:
DataSourceTransactionManager 在事务开始时调用 DataSource.getConnection() 获取连接TransactionSynchronizationManager 绑定到当前线程导致的问题:
风险等级:中 - 业务代码设计不当时会触发
AOP 切面中通过反射匹配方法,仅使用方法名判断:
for(Method method : methods) {
if(method.getName().equals(methodName)) { // ️ 仅按名称匹配,未比较参数类型
if(method.isAnnotationPresent(Clickhouse.class)) {
// 设置数据源类型
}
break; // 找到第一个同名方法即退出
}
}
代码缺陷分析:
break 语句,如果目标方法是第二个重载版本,会匹配到错误的方法@Clickhouse 注解问题场景示例:
public class UserService {
// 方法 1:无注解,路由到 MySQL
public List<User> query(String id) {
return userMapper.selectById(id);
}
// 方法 2:有注解,路由到 ClickHouse
@Clickhouse
public List<User> query(String id, String type) {
return userMapper.selectByIdAndType(id, type);
}
}
错误流程:
业务调用:query("user123", "VIP")
↓
AOP 拦截:methodName = "query"
↓
反射遍历:找到第一个名为 "query" 的方法(方法 1)
↓
检查注解:方法 1 没有 @Clickhouse 注解
↓
设置数据源:DatasourceType.set(DEFAULT) 错误!应该是 CLICKHOUSE
↓
结果:ClickHouse 查询被误路由到 MySQL
风险等级:中 - 使用方法重载时会触发
查看当前 SessionFactory.java 的事务管理器配置(第 31-35 行):
@Bean("defaultTransactionManager")
@Primary
public DataSourceTransactionManager defaultTransactionManager() {
return new DataSourceTransactionManager(dynamicDataSource); // ️ 使用动态数据源
}
核心问题:
DynamicDataSource(包含 MySQL 和 ClickHouse 两种数据源)1. ClickHouse 的事务特性与 OLTP 数据库的本质差异
ClickHouse 是 OLAP(Online Analytical Processing,联机分析处理)数据库,设计目标:
MySQL 是 OLTP(Online Transaction Processing,联机事务处理)数据库,设计目标:
ClickHouse 的事务支持情况:
| 特性 | MySQL(OLTP) | ClickHouse(OLAP) |
|---|---|---|
| BEGIN/COMMIT/ROLLBACK | 完全支持 | 不支持 |
| 行级锁 | 支持 | 仅支持表级和分区级锁 |
| 即时一致性 | 支持 | 最终一致性模型 |
| 单语句原子性 | 支持 | 支持(INSERT 是原子的) |
| 跨语句事务 | 支持 | 不支持 |
| 幂等写入 | 需应用层保证 | 通过 ReplicatedMergeTree 支持 |
2. 事务管理器对 ClickHouse 的副作用
当 DataSourceTransactionManager 管理 ClickHouse 连接时:
@Transactional
public void queryClickhouseData() {
// 事务管理器会尝试执行(但 ClickHouse 不支持):
// 1. connection.setAutoCommit(false) ← ClickHouse JDBC 驱动会忽略
// 2. 执行业务 SQL
// 3. connection.commit() ← 无实际作用,数据已立即写入
// 4. 异常时 connection.rollback() ← 无法回滚已执行的查询/写入
}
导致的问题:
@Transactional 注解无法保证 ClickHouse 操作的原子性3. 实际使用场景分析
典型的 ClickHouse 使用模式:
// 场景 1:纯查询操作(只读,不需要事务)
@Clickhouse
public List<LogEntry> queryLogs(String userId) {
return logMapper.selectByUserId(userId); // 查询操作,无需事务保护
}
// 场景 2:批量写入(INSERT 本身是原子的)
@Clickhouse
public void batchInsertLogs(List<LogEntry> logs) {
logMapper.batchInsert(logs); // 单个 INSERT 语句是原子操作
}
// 场景 3:混合操作(错误示例 - 不应在同一事务中混用)
@Transactional
public void processOrder(Order order) {
orderMapper.insert(order); // MySQL - 需要事务
logToClickhouse(order.getId()); // ClickHouse - 不需要事务,且无法参与 MySQL 事务
}
风险等级:中低 - 影响性能和资源利用,但通常不会导致功能性错误
目标:解决 ThreadLocal 内存泄漏和线程污染问题
修改文件:DatasourceAop.java
@Aspect
@Order(-1) // 保证优先级在 AOP 前
@Component
public class DatasourceAop {
private static final String PACKAGE = "com.xxx.anomaly";
@Pointcut("execution(* com.xxx.anomaly..*.*(..))")
public void pointCut(){};
// 将 @Before 改为 @Around,确保清理
@Around(value = "pointCut()")
public Object aroundInvoke(ProceedingJoinPoint joinpoint) throws Throwable {
// 保存原数据源类型(支持嵌套调用场景)
DatasourceType.DataBaseType originalType = DatasourceType.getDataBaseType();
try {
// 获取目标类和方法信息
String clazzName = joinpoint.getTarget().getClass().getName();
String methodName = joinpoint.getSignature().getName();
// 仅处理指定包下的方法
if(clazzName.startsWith(PACKAGE)) {
Class targetClazz = Class.forName(clazzName);
Method[] methods = targetClazz.getMethods();
// 遍历方法,查找匹配的方法并检查 @Clickhouse 注解
for(Method method : methods) {
if(method.getName().equals(methodName)) {
if(method.isAnnotationPresent(Clickhouse.class)) {
// 设置为 ClickHouse 数据源
DatasourceType.setDataBaseType(DatasourceType.DataBaseType.CLICKHOUSE);
} else {
// 设置为默认数据源(MySQL)
DatasourceType.setDataBaseType(DatasourceType.DataBaseType.DEFAULT);
}
break;
}
}
}
// 执行目标方法
return joinpoint.proceed();
} finally {
// 【关键修复】:方法执行完毕后恢复原数据源类型
// 如果是顶层调用(originalType == null),则清理 ThreadLocal
// 如果是嵌套调用,则恢复为上层的数据源类型
if (originalType == null) {
DatasourceType.clearDataBaseType(); // 清理 ThreadLocal,防止内存泄漏
} else {
DatasourceType.setDataBaseType(originalType); // 恢复嵌套调用的数据源
}
}
}
}
修复效果:
finally 块确保 ThreadLocal 一定会被清理,即使方法抛出异常目标:解决并发场景下的竞态条件,防止重复创建数据源和连接泄漏
修改文件:DynamicDataSource.java
@Service("dynamicDataSource")
public class DynamicDataSource extends AbstractRoutingDataSource {
private static final Logger LOGGER = LoggerFactory.getLogger(DynamicDataSource.class);
// 【修复1】:改为线程安全的 ConcurrentHashMap
private Map<Object, Object> targetDataSource = new ConcurrentHashMap<>();
// 【修复2】:添加锁对象,用于懒加载的同步控制
private final Object clickhouseLock = new Object();
@Override
protected DataSource determineTargetDataSource() {
Object dbName = determineCurrentLookupKey();
if(dbName == null) {
return defaultDatasource;
}
// 先尝试从缓存获取(快速路径,无锁)
DataSource cachedDs = (DataSource) targetDataSource.get(dbName);
if (cachedDs != null) {
return cachedDs;
}
// 缓存未命中且需要 ClickHouse,进入懒加载流程
if (DatasourceType.DataBaseType.CLICKHOUSE.equals(dbName)) {
return getOrCreateClickhouseDataSource();
}
// 默认返回 MySQL 数据源
return defaultDatasource;
}
// 【修复3】:使用双重检查锁定(Double-Checked Locking)模式创建 ClickHouse 数据源
private DataSource getOrCreateClickhouseDataSource() {
// 第一次检查(无锁,提高性能)
DataSource ds = (DataSource) targetDataSource.get(DatasourceType.DataBaseType.CLICKHOUSE);
if (ds != null) {
return ds;
}
// 加锁创建数据源
synchronized (clickhouseLock) {
// 第二次检查(防止重复创建)
ds = (DataSource) targetDataSource.get(DatasourceType.DataBaseType.CLICKHOUSE);
if (ds != null) {
return ds;
}
// 通过数据库获取 ClickHouse 数据源的配置并创建数据源
// ... 查询配置、创建数据源的代码
// BalancedClickhouseDataSource newDs = new BalancedClickhouseDataSource(dbUrl, properties);
// targetDataSource.put(DatasourceType.DataBaseType.CLICKHOUSE, newDs);
// return newDs;
......
}
}
}
修复效果:
ConcurrentHashMap 替代 HashMap,保证基本的线程安全目标:正确关闭旧数据源,防止资源泄漏
修改文件:DynamicDataSource.java
public void removeDatasouce(Object dbName) {
// 使用与懒加载相同的锁,确保线程安全
synchronized (clickhouseLock) {
if(targetDataSource.containsKey(dbName)) {
// 从缓存中移除数据源
DataSource oldDs = (DataSource) targetDataSource.remove(dbName);
// 如果是 BalancedClickhouseDataSource,需要正确关闭以释放资源
if (oldDs instanceof BalancedClickhouseDataSource) {
try {
((BalancedClickhouseDataSource) oldDs).close();
LOGGER.info("已关闭旧的 ClickHouse 数据源,释放连接池资源");
} catch (Exception e) {
LOGGER.error("关闭 ClickHouse 数据源失败,可能导致连接泄漏", e);
}
}
}
}
}
修复效果:
目标:解决方法重载场景下的注解匹配错误
修改文件:DatasourceAop.java
@Aspect
@Order(-1)
@Component
public class DatasourceAop {
private static final String PACKAGE = "com.xxx.anomaly";
@Pointcut("execution(* com.xxx.anomaly..*.*(..))")
public void pointCut(){};
@Around(value = "pointCut()")
public Object aroundInvoke(ProceedingJoinPoint joinPoint) throws Throwable {
try {
// 1. 切换数据源
switchDataSource(joinPoint);
// 2. 执行目标方法
return joinPoint.proceed();
} finally {
// 3. 清理 ThreadLocal(防止内存泄漏)
DatasourceType.clearDataBaseType();
}
}
private void switchDataSource(ProceedingJoinPoint joinPoint) {
try {
// 【优化】:直接通过 MethodSignature 获取方法对象(避免复杂的反射遍历)
MethodSignature signature = (MethodSignature) joinPoint.getSignature();
Method method = signature.getMethod();
// 检查方法是否标注 @Clickhouse 注解
if (method.isAnnotationPresent(Clickhouse.class)) {
DatasourceType.setDataBaseType(DatasourceType.DataBaseType.CLICKHOUSE);
} else {
DatasourceType.setDataBaseType(DatasourceType.DataBaseType.DEFAULT);
}
} catch (Exception e) {
// 异常时降级到默认数据源,保证系统可用性
log.error("数据源切换失败,降级使用默认数据源(MySQL)", e);
DatasourceType.setDataBaseType(DatasourceType.DataBaseType.DEFAULT);
}
}
}
修复效果:
finally 确保 ThreadLocal 一定会被清理MethodSignature 获取方法对象,避免复杂的反射遍历和方法重载问题printStackTrace())注意:此方案使用 MethodSignature.getMethod() 直接获取实际调用的方法对象,自动解决了方法重载的匹配问题,比手动遍历 getMethods() 更可靠。
目标:将 ClickHouse 排除在事务管理之外,避免不必要的事务开销
修改文件:SessionFactory.java
@Configuration
@MapperScan(basePackages = {"com.xxx.anomaly.**.mapper"}, sqlSessionFactoryRef = "SqlSessionFactory")
public class SessionFactory {
// 注入单独的默认数据源(仅 MySQL)
@Autowired
@Qualifier("defaultDatasource")
private DataSource defaultDatasource;
/**
* 事务管理器仅管理默认数据源(MySQL)
* ClickHouse 作为 OLAP 数据库不需要事务管理
*/
@Bean("defaultTransactionManager")
@Primary
public DataSourceTransactionManager defaultTransactionManager() {
// 【关键修改】:仅使用 MySQL 数据源,不使用 DynamicDataSource
return new DataSourceTransactionManager(defaultDatasource);
}
}
修复效果:
使用建议:
@Transactional 注解@Transactional 注解,让其自动提交@Transactional 方法中混用 MySQL 和 ClickHouse 操作鉴于 MySQL 和 ClickHouse 在业务场景中通常同时使用而非互斥使用,可以考虑更彻底的架构调整:为 MySQL 和 ClickHouse 分别配置独立的数据源和 MyBatis SqlSessionFactory,完全避免动态切换带来的实现复杂性和运行时风险。
1. 架构清晰,职责分离
2. 消除已知风险
3. 开发体验更好
*.mysql.mapper vs *.clickhouse.mapper)@Clickhouse 注解对于需要同时操作 MySQL 和 ClickHouse 的场景,推荐以下一致性保证方案:
方案 A:MySQL 事务 + 异步事件通知 ClickHouse(推荐)
@Transactional
public void createOrder(Order order) {
// 1. MySQL 事务内完成业务操作
orderMapper.insert(order);
// 2. 发布领域事件(事务提交后触发)
applicationEventPublisher.publishEvent(new OrderCreatedEvent(order));
}
@EventListener
@Async
public void syncToClickhouse(OrderCreatedEvent event) {
// 3. 异步写入 ClickHouse(最终一致性)
clickhouseLogMapper.insert(event.getOrder());
}
方案 B:补偿机制(适用于对一致性要求不高的场景)
方案 C:分布式事务(不推荐)
8.61G · 2025-11-18
11.1G · 2025-11-18
40.2G · 2025-11-18