我做手机壳特好看小游戏
113.12MB · 2025-11-10
在现代分布式系统中,数据库事务与第三方服务调用的混合使用已成为架构设计的常见痛点。许多开发者在初期会写出这样的代码:
@Transactional
public void createOrder(OrderDTO orderDTO) {
// 数据库操作
orderMapper.insert(order);
inventoryMapper.deduct(order.getProductId(), order.getQuantity());
// 在事务中调用第三方接口 - 架构陷阱!
paymentService.pay(order); // 支付接口
smsService.sendConfirm(order); // 短信接口
warehouseService.notify(order); // 仓库接口
// 更多数据库操作
userMapper.updateOrderCount(order.getUserId());
}
这种设计虽然直观,却隐藏着严重的技术风险。本文将深入分析问题根源,并提供完整的解决方案。
@Transactional
public void processBusiness() {
// 获取数据库连接
orderDao.save(order); // 连接被占用
// 第三方调用期间连接无法释放
thirdPartyService.call(); // 耗时5-30秒,连接被阻塞
inventoryDao.update(stock); // 继续使用同一连接
}
// 事务提交,连接释放
影响链:
第三方接口慢 → 事务时间长 → 连接占用久 → 连接池耗尽 → 系统雪崩
@Transactional
public void refundProcess(Long orderId) {
// 阶段1:更新订单状态
orderService.updateStatus(orderId, REFUNDING); // 数据库操作
// 阶段2:调用退款接口
boolean refundSuccess = thirdPartyRefundService.refund(orderId); // 第三方调用
if (refundSuccess) {
// 阶段3:更新本地状态
orderService.updateStatus(orderId, REFUNDED);
accountService.addBalance(order.getUserId(), order.getAmount());
}
// 如果此处发生数据库异常,整个事务回滚
// 结果:第三方已退款,但本地状态显示未退款!
}
@Transactional
public void concurrentProcess() {
// 对数据行A加锁
productDao.selectForUpdate(productId);
// 第三方调用期间锁保持
thirdPartyService.validate(product); // 耗时操作
// 对数据行B加锁
inventoryDao.selectForUpdate(warehouseId);
// 其他事务可能以相反顺序加锁,导致死锁
}
核心思想:将长时间操作移出事务边界
@Service
public class OrderService {
@Autowired
private TransactionTemplate transactionTemplate;
public OrderResult createOrder(OrderDTO orderDTO) {
// 第一阶段:纯数据库事务(快速提交)
Order order = executeInTransaction(() -> {
// 验证库存
Inventory inventory = inventoryMapper.selectForUpdate(
orderDTO.getProductId());
if (inventory.getStock() < orderDTO.getQuantity()) {
throw new InventoryNotEnoughException();
}
// 创建订单
Order order = orderMapper.insert(orderDTO);
// 扣减库存
inventoryMapper.deduct(orderDTO.getProductId(), orderDTO.getQuantity());
return order;
});
// 第二阶段:调用第三方服务(不在事务中)
processThirdPartyServices(order);
return OrderResult.success(order);
}
private void processThirdPartyServices(Order order) {
try {
// 并行调用第三方服务
CompletableFuture<Void> paymentFuture = CompletableFuture
.runAsync(() -> paymentService.pay(order));
CompletableFuture<Void> notifyFuture = CompletableFuture
.runAsync(() -> warehouseService.notify(order));
// 等待所有调用完成
CompletableFuture.allOf(paymentFuture, notifyFuture)
.get(30, TimeUnit.SECONDS);
// 更新订单状态为成功
updateOrderStatus(order.getId(), OrderStatus.SUCCESS);
} catch (TimeoutException e) {
// 处理超时
updateOrderStatus(order.getId(), OrderStatus.TIMEOUT);
compensateOrder(order);
} catch (Exception e) {
// 处理失败
updateOrderStatus(order.getId(), OrderStatus.FAILED);
compensateOrder(order);
}
}
@Transactional
public void updateOrderStatus(Long orderId, OrderStatus status) {
orderMapper.updateStatus(orderId, status);
}
private void compensateOrder(Order order) {
// 执行补偿操作:恢复库存等
executeInTransaction(() -> {
inventoryMapper.recover(order.getProductId(), order.getQuantity());
});
}
private <T> T executeInTransaction(Supplier<T> supplier) {
return transactionTemplate.execute(status -> supplier.get());
}
}
核心思想:通过本地记录跟踪第三方调用状态
@Entity
@Table(name = "async_task")
public class AsyncTask {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;
private String taskType; // 任务类型:PAYMENT、NOTIFY等
private Long businessId; // 业务ID
private String status; // 状态:PENDING, PROCESSING, SUCCESS, FAILED
private Integer retryCount; // 重试次数
private String requestData; // 请求数据
private String responseData; // 响应数据
private LocalDateTime nextRetryTime; // 下次重试时间
}
@Service
public class AsyncTaskService {
@Transactional
public void createOrderWithAsyncTask(OrderDTO orderDTO) {
// 1. 保存主订单
Order order = orderMapper.insert(orderDTO);
// 2. 扣减库存
inventoryMapper.deduct(orderDTO.getProductId(), orderDTO.getQuantity());
// 3. 创建异步任务记录(在同一事务中)
createAsyncTask("PAYMENT", order.getId(), order);
createAsyncTask("NOTIFY_WAREHOUSE", order.getId(), order);
// 事务提交,数据持久化
}
@Async("taskExecutor")
public void processPendingTasks() {
List<AsyncTask> tasks = asyncTaskMapper.findPendingTasks();
for (AsyncTask task : tasks) {
try {
processSingleTask(task);
} catch (Exception e) {
handleTaskFailure(task, e);
}
}
}
private void processSingleTask(AsyncTask task) {
task.setStatus("PROCESSING");
asyncTaskMapper.update(task);
switch (task.getTaskType()) {
case "PAYMENT":
PaymentRequest request = JSON.parseObject(
task.getRequestData(), PaymentRequest.class);
PaymentResponse response = paymentService.pay(request);
task.setResponseData(JSON.toJSONString(response));
break;
case "NOTIFY_WAREHOUSE":
// 处理仓库通知
break;
}
task.setStatus("SUCCESS");
asyncTaskMapper.update(task);
}
private void handleTaskFailure(AsyncTask task, Exception e) {
if (task.getRetryCount() < MAX_RETRY_COUNT) {
task.setRetryCount(task.getRetryCount() + 1);
task.setNextRetryTime(LocalDateTime.now().plusMinutes(
calculateRetryDelay(task.getRetryCount())));
task.setStatus("PENDING");
} else {
task.setStatus("FAILED");
// 发送告警通知
alertService.sendTaskFailedAlert(task, e);
}
asyncTaskMapper.update(task);
}
}
核心思想:通过状态转移确保业务流程的可靠性,每个调用定义状态
public enum OrderState {
INITIALIZED, // 已初始化
INVENTORY_DEDUCTED, // 库存已扣减
PAYMENT_PENDING, // 支付处理中
PAYMENT_SUCCESS, // 支付成功
PAYMENT_FAILED, // 支付失败
NOTIFIED, // 已通知仓库
COMPLETED // 已完成
}
@Service
public class StateMachineOrderService {
@Transactional
public void initializeOrder(OrderDTO orderDTO) {
// 只做最基本的初始化,快速提交事务
Order order = new Order();
order.setStatus(OrderState.INITIALIZED);
order.setAmount(orderDTO.getAmount());
orderMapper.insert(order);
}
public void processOrder(Long orderId) {
// 尝试获取处理权
if (acquireProcessingLock(orderId)) {
try {
processOrderSteps(orderId);
} finally {
releaseProcessingLock(orderId);
}
}
}
private boolean acquireProcessingLock(Long orderId) {
// 通过状态转移实现分布式锁
int rows = orderMapper.updateStatus(
orderId,
OrderState.INITIALIZED,
OrderState.INVENTORY_DEDUCTED
);
return rows > 0;
}
private void processOrderSteps(Long orderId) {
Order order = orderMapper.findById(orderId);
try {
// 步骤1:扣减库存
deductInventory(order);
orderMapper.updateStatus(orderId, OrderState.INVENTORY_DEDUCTED);
// 步骤2:调用支付
callPaymentService(order);
orderMapper.updateStatus(orderId, OrderState.PAYMENT_SUCCESS);
// 步骤3:通知仓库
notifyWarehouse(order);
orderMapper.updateStatus(orderId, OrderState.NOTIFIED);
// 完成订单
orderMapper.updateStatus(orderId, OrderState.COMPLETED);
} catch (PaymentException e) {
// 支付失败,执行补偿
orderMapper.updateStatus(orderId, OrderState.PAYMENT_FAILED);
compensateInventory(order);
} catch (Exception e) {
// 其他异常,标记为需要人工干预
alertService.requireManualIntervention(orderId, e);
}
}
@Transactional
public void deductInventory(Order order) {
inventoryMapper.deduct(order.getProductId(), order.getQuantity());
}
public void callPaymentService(Order order) {
// 支付调用,不在事务中
paymentService.pay(order);
}
@Transactional
public void compensateInventory(Order order) {
inventoryMapper.recover(order.getProductId(), order.getQuantity());
}
}
核心思想:通过消息中间件实现系统解耦
@Configuration
public class RabbitMQConfig {
@Bean
public Queue orderQueue() {
return new Queue("order.process.queue", true);
}
@Bean
public Exchange orderExchange() {
return new TopicExchange("order.exchange");
}
@Bean
public Binding orderBinding() {
return BindingBuilder.bind(orderQueue())
.to(orderExchange()).with("order.created");
}
}
@Service
public class MQOrderService {
@Autowired
private RabbitTemplate rabbitTemplate;
public void createOrder(OrderDTO orderDTO) {
// 1. 数据库操作(快速)
Order order = orderMapper.insert(orderDTO);
inventoryMapper.deduct(orderDTO.getProductId(), orderDTO.getQuantity());
// 2. 事务提交后 发送消息
OrderMessage message = new OrderMessage(order.getId(), order.getAmount());
rabbitTemplate.convertAndSend(
"order.exchange",
"order.created",
message,
new CorrelationData(order.getId().toString())
);
}
}
@Component
public class OrderMessageListener {
@RabbitListener(queues = "order.process.queue")
@Transactional(propagation = Propagation.REQUIRES_NEW)
public void handleOrderMessage(OrderMessage message) {
try {
// 处理支付
paymentService.pay(message.getOrderId(), message.getAmount());
// 通知仓库
warehouseService.notify(message.getOrderId());
// 更新订单状态
orderMapper.updateStatus(message.getOrderId(), OrderStatus.COMPLETED);
} catch (Exception e) {
// 记录失败,进入重试或死信队列
log.error("处理订单消息失败: {}", message.getOrderId(), e);
throw new AmqpRejectAndDontRequeueException(e.getMessage());
}
}
}
@Service
@Slf4j
public class CircuitBreakerPaymentService {
private final CircuitBreaker circuitBreaker;
public CircuitBreakerPaymentService() {
// 配置断路器:失败率50%,时间窗口10秒
CircuitBreakerConfig config = CircuitBreakerConfig.custom()
.failureRateThreshold(50)
.waitDurationInOpenState(Duration.ofSeconds(30))
.slidingWindowType(SlidingWindowType.COUNT_BASED)
.slidingWindowSize(10)
.build();
this.circuitBreaker = CircuitBreaker.of("payment-service", config);
}
public PaymentResult payWithCircuitBreaker(Order order) {
return circuitBreaker.executeSupplier(() -> {
try {
return paymentService.pay(order);
} catch (Exception e) {
log.warn("支付服务调用失败,订单: {}", order.getId(), e);
throw new CallNotPermittedException("支付服务暂不可用");
}
});
}
}
@Service
public class PaymentServiceWithFallback {
public PaymentResult payWithFallback(Order order) {
try {
return paymentService.pay(order);
} catch (Exception e) {
// 主服务失败,执行降级策略
return fallbackPayment(order);
}
}
private PaymentResult fallbackPayment(Order order) {
// 降级策略1:记录到本地,后续人工处理
asyncTaskService.createPaymentTask(order);
// 降级策略2:返回中间状态,引导用户稍后重试
return PaymentResult.pending("支付处理中,请稍后查看结果");
// 或者降级策略3:使用备用支付通道
// return backupPaymentService.pay(order);
}
}
@Component
public class ThirdPartyMonitor {
@EventListener
public void monitorApiCall(ThirdPartyCallEvent event) {
// 记录指标
Metrics.counter("third_party_calls_total",
"service", event.getServiceName(),
"status", event.isSuccess() ? "success" : "failure")
.increment();
Metrics.timer("third_party_call_duration",
"service", event.getServiceName())
.record(event.getDuration());
// 日志记录
if (event.isSuccess()) {
log.info("第三方调用成功: {}, 耗时: {}ms",
event.getServiceName(), event.getDuration().toMillis());
} else {
log.error("第三方调用失败: {}, 错误: {}",
event.getServiceName(), event.getErrorMessage());
}
// 告警
if (shouldAlert(event)) {
alertService.sendAlert(createAlert(event));
}
}
}
| 方案 | 适用场景 | 优点 | 缺点 | 复杂度 |
|---|---|---|---|---|
| 事务拆分 | 业务逻辑清晰,第三方调用少 | 实现简单,性能好 | 一致性较弱 | ⭐⭐ |
| 本地事务表 | 高一致性要求,重试需求 | 数据一致性好,可靠 | 架构复杂,需要任务调度 | ⭐⭐⭐⭐ |
| 状态机 | 复杂业务流程,多状态 | 流程清晰,易于追踪 | 实现复杂,状态管理难 | ⭐⭐⭐ |
| 消息队列 | 高并发,系统解耦 | 削峰填谷,松耦合 | 引入中间件,运维复杂 | ⭐⭐⭐ |
初创项目:优先选择事务拆分方案,快速验证业务
电商系统:推荐本地事务表 + 异步处理,保证数据一致性
金融系统:建议状态机 + 消息队列,确保流程可追踪
高并发系统:采用消息队列解耦,实现系统隔离