事务与并发控制
原创2026/3/18大约 8 分钟
事务(Transactions)是数据库保证数据一致性的核心机制,支持“原子性、一致性、隔离性、持久性(ACID)”;并发控制则解决多用户同时操作数据时的冲突问题,TypeORM 提供了完善的事务管理和并发控制方案,适配不同业务场景的需求。
一、事务基础概念
1. 事务的核心特性(ACID)
- 原子性(Atomicity):事务中的所有操作要么全部执行,要么全部回滚;
- 一致性(Consistency):事务执行前后,数据库数据始终保持合法状态;
- 隔离性(Isolation):多个事务并发执行时,互相不可见(隔离级别可配置);
- 持久性(Durability):事务提交后,修改永久保存到数据库。
2. TypeORM 事务适用场景
- 多表/多操作原子性:如“创建订单 + 扣减库存”需同时成功/失败;
- 数据一致性要求高:如金融交易、支付流程;
- 批量操作回滚:如批量导入数据,出错时回滚所有导入。
二、TypeORM 事务实现方式
TypeORM 提供三种事务实现方式,从简单到灵活,适配不同场景:
1. 自动事务(EntityManager.transaction)
最常用的方式,通过回调函数封装事务逻辑,TypeORM 自动管理事务的开启、提交、回滚:
import { AppDataSource } from "./data-source";
import { User } from "./entity/User";
import { Order } from "./entity/Order";
// 示例:创建订单 + 扣减用户余额(原子操作)
async function createOrder(userId: number, amount: number) {
const entityManager = AppDataSource.manager;
try {
// 开启事务:回调函数执行成功则自动提交,失败则自动回滚
const result = await entityManager.transaction(async (txManager) => {
// 1. 查询用户(必须使用 txManager,而非全局 manager)
const user = await txManager.findOne(User, { where: { id: userId } });
if (!user) {
throw new Error("用户不存在");
}
// 2. 检查余额
if (user.balance < amount) {
throw new Error("余额不足");
}
// 3. 扣减余额
user.balance -= amount;
await txManager.save(user);
// 4. 创建订单
const order = new Order();
order.userId = userId;
order.amount = amount;
order.status = "unpaid";
const savedOrder = await txManager.save(order);
// 返回结果(事务提交后,外部可获取)
return savedOrder;
});
console.log("订单创建成功:", result);
return result;
} catch (error) {
console.error("事务执行失败:", error);
throw error; // 向上抛出,由业务层处理
}
}2. 手动事务(QueryRunner)
最灵活的方式,手动控制事务的开启、提交、回滚,适合复杂业务场景(如多阶段操作、条件提交):
async function createOrderManual(userId: number, amount: number) {
// 1. 创建 QueryRunner(事务的核心载体)
const queryRunner = AppDataSource.createQueryRunner();
// 2. 连接数据库
await queryRunner.connect();
try {
// 3. 开启事务
await queryRunner.startTransaction();
// 4. 执行事务内操作(必须使用 queryRunner.manager)
const user = await queryRunner.manager.findOne(User, { where: { id: userId } });
if (!user || user.balance < amount) {
throw new Error("用户不存在或余额不足");
}
// 扣减余额
user.balance -= amount;
await queryRunner.manager.save(user);
// 创建订单
const order = new Order();
order.userId = userId;
order.amount = amount;
order.status = "unpaid";
const savedOrder = await queryRunner.manager.save(order);
// 5. 提交事务
await queryRunner.commitTransaction();
console.log("订单创建成功:", savedOrder);
return savedOrder;
} catch (error) {
// 6. 回滚事务(出错时必须回滚)
await queryRunner.rollbackTransaction();
console.error("事务执行失败,已回滚:", error);
throw error;
} finally {
// 7. 释放 QueryRunner(无论成功/失败,必须释放)
await queryRunner.release();
}
}3. NestJS 中的事务(依赖注入)
NestJS 中可通过 @Transaction() 装饰器或手动注入 QueryRunner 实现事务,推荐结合 InjectRepository 使用:
import { Injectable } from "@nestjs/common";
import { InjectRepository } from "@nestjs/typeorm";
import { Repository, QueryRunner, Connection } from "typeorm";
import { User } from "./entity/User";
import { Order } from "./entity/Order";
@Injectable()
export class OrderService {
constructor(
@InjectRepository(User) private userRepo: Repository<User>,
@InjectRepository(Order) private orderRepo: Repository<Order>,
private connection: Connection, // 注入数据库连接
) {}
// 方式1:自动事务(推荐)
async createOrder(userId: number, amount: number) {
return await this.connection.transaction(async (txManager) => {
const user = await txManager.findOne(User, { where: { id: userId } });
if (!user || user.balance < amount) {
throw new Error("用户不存在或余额不足");
}
user.balance -= amount;
await txManager.save(user);
const order = this.orderRepo.create({ userId, amount, status: "unpaid" });
return await txManager.save(order);
});
}
// 方式2:手动事务
async createOrderManual(userId: number, amount: number) {
const queryRunner = this.connection.createQueryRunner();
await queryRunner.connect();
await queryRunner.startTransaction();
try {
const user = await queryRunner.manager.findOne(User, { where: { id: userId } });
if (!user || user.balance < amount) {
throw new Error("用户不存在或余额不足");
}
user.balance -= amount;
await queryRunner.manager.save(user);
const order = this.orderRepo.create({ userId, amount, status: "unpaid" });
await queryRunner.manager.save(order);
await queryRunner.commitTransaction();
return order;
} catch (error) {
await queryRunner.rollbackTransaction();
throw error;
} finally {
await queryRunner.release();
}
}
}三、事务隔离级别
隔离级别决定了多个并发事务之间的可见性,TypeORM 支持配置事务隔离级别,适配不同的并发需求:
1. 常用隔离级别(MySQL 为例)
| 隔离级别 | 核心特性 | 适用场景 |
|---|---|---|
| READ UNCOMMITTED | 可读取未提交的数据(脏读) | 无,几乎不使用 |
| READ COMMITTED | 仅读取已提交的数据(避免脏读,可能幻读/不可重复读) | 大多数业务场景(默认) |
| REPEATABLE READ | 事务内多次读取结果一致(避免脏读/不可重复读,可能幻读) | MySQL 默认级别,适合金融场景 |
| SERIALIZABLE | 完全串行化(避免所有并发问题,性能最低) | 高一致性要求(如银行交易) |
2. 配置隔离级别
(1)自动事务中配置
await entityManager.transaction(
"REPEATABLE READ", // 隔离级别
async (txManager) => {
// 事务逻辑
}
);(2)手动事务中配置
await queryRunner.startTransaction("SERIALIZABLE"); // 开启事务时指定隔离级别四、并发控制
并发控制解决多用户同时操作同一数据的冲突问题,TypeORM 提供乐观锁、悲观锁两种核心方案:
1. 乐观锁(Optimistic Locking)
乐观锁假设并发冲突很少发生,仅在提交时检查数据是否被修改,通过 @VersionColumn 实现:
(1)实体配置乐观锁
import { Entity, PrimaryGeneratedColumn, Column, VersionColumn } from "typeorm";
@Entity()
export class Product {
@PrimaryGeneratedColumn()
id: number;
@Column()
name: string;
@Column({ type: "int", default: 0 })
stock: number;
// 版本列:每次更新自动递增,乐观锁核心
@VersionColumn()
version: number;
}(2)乐观锁使用示例
async function reduceStock(productId: number, count: number) {
const productRepo = AppDataSource.getRepository(Product);
// 事务内执行库存扣减
return await productRepo.manager.transaction(async (txManager) => {
// 1. 查询商品
const product = await txManager.findOne(Product, { where: { id: productId } });
if (!product) {
throw new Error("商品不存在");
}
// 2. 检查库存
if (product.stock < count) {
throw new Error("库存不足");
}
// 3. 扣减库存
product.stock -= count;
try {
// 4. 保存(若版本号不匹配,会抛出 OptimisticLockVersionMismatchError)
return await txManager.save(product);
} catch (error) {
if (error.name === "OptimisticLockVersionMismatchError") {
throw new Error("商品已被其他用户修改,请刷新后重试");
}
throw error;
}
});
}原理:更新时 SQL 会自动添加
WHERE version = 当前版本,若版本不匹配(数据已被修改),更新行数为 0,触发乐观锁异常。
2. 悲观锁(Pessimistic Locking)
悲观锁假设并发冲突频繁发生,读取数据时直接锁定,阻止其他事务修改,通过 QueryBuilder 实现:
async function reduceStockPessimistic(productId: number, count: number) {
const queryRunner = AppDataSource.createQueryRunner();
await queryRunner.connect();
await queryRunner.startTransaction();
try {
// 1. 查询商品并加锁(FOR UPDATE:排他锁,阻止其他事务修改/加锁)
const product = await queryRunner.manager
.createQueryBuilder(Product, "p")
.setLock("pessimistic_write") // 悲观写锁(等价于 FOR UPDATE)
// .setLock("pessimistic_read") // 悲观读锁(等价于 LOCK IN SHARE MODE)
.where("p.id = :id", { id: productId })
.getOne();
if (!product || product.stock < count) {
throw new Error("商品不存在或库存不足");
}
// 2. 扣减库存
product.stock -= count;
await queryRunner.manager.save(product);
await queryRunner.commitTransaction();
return product;
} catch (error) {
await queryRunner.rollbackTransaction();
throw error;
} finally {
await queryRunner.release();
}
}核心锁类型:
pessimistic_write:排他锁(FOR UPDATE),阻止其他事务读取/修改;pessimistic_read:共享锁(LOCK IN SHARE MODE),允许读取但阻止修改。
3. 乐观锁 vs 悲观锁
| 特性 | 乐观锁 | 悲观锁 |
|---|---|---|
| 核心思想 | 假设无冲突,提交时检查 | 假设冲突多,读取时锁定 |
| 实现方式 | @VersionColumn 版本号 | QueryBuilder 加锁 |
| 性能 | 高(无锁开销) | 低(锁等待开销) |
| 适用场景 | 并发冲突少(如商品详情) | 并发冲突多(如库存扣减) |
| 异常处理 | 需捕获版本冲突异常 | 自动等待锁释放(或超时) |
五、事务与并发控制最佳实践
1. 事务最佳实践
- 最小事务原则:事务逻辑尽量精简,减少锁持有时间(如避免在事务中调用远程API);
- 避免嵌套事务:TypeORM 不支持真正的嵌套事务,内层事务会合并到外层;
- 异常必回滚:手动事务中,
catch块必须调用rollbackTransaction(),finally块释放QueryRunner; - 统一使用 txManager:事务内所有操作必须使用
txManager``queryRunner.manager,而非全局manager。
2. 并发控制最佳实践
- 优先乐观锁:大多数场景(如商品信息修改)用乐观锁,性能更高;
- 悲观锁慎用:仅在高冲突场景(如库存扣减)使用悲观锁,且尽量缩短锁持有时间;
- 乐观锁重试机制:版本冲突时,可实现自动重试(如最多重试3次):
async function reduceStockWithRetry(productId: number, count: number, retry = 3) { try { return await reduceStock(productId, count); } catch (error) { if (error.message.includes("已被其他用户修改") && retry > 0) { // 等待 100ms 后重试 await new Promise(resolve => setTimeout(resolve, 100)); return await reduceStockWithRetry(productId, count, retry - 1); } throw error; } }
3. 常见问题排查
(1)事务未回滚
- 手动事务未调用
rollbackTransaction(); - 事务内抛出的异常未被捕获,导致
rollback未执行。
(2)乐观锁异常频繁
- 并发冲突确实过高,考虑改用悲观锁;
- 事务逻辑过长,导致版本号被其他事务修改,优化事务逻辑。
(3)悲观锁死锁
- 多事务操作表的顺序不一致(如事务1:A→B,事务2:B→A);
- 解决方案:统一所有事务操作表的顺序,缩短锁持有时间。
至此,本章节的学习就到此结束了,如有疑惑,可对接技术客服进行相关咨询。