从“逐一处理”到“批量处理”的优化方案
场景
我们经常会遇到:有 N 个数据需要处理,而每个数据都需要查询一些数据,计算后写入一些数据。
简单总结是:逐一处理代码清晰,但是性能稍差。批量处理整体性能好,但是事务时间长带来风险。
当然有些项目一开始就明确了需要处理的数据量,我们根据实际情况可以在初期确定处理的策略,
但是有些项目一开始是没有那么清晰的,项目在发展,业务在变化,
代码既要考虑前期的快,也要考虑后期的稳,有没有方式可以更好适配项目的发展。
常见代码
逐一处理
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
for _, item := range items {
err := processSingleItem(item)
if err != nil {
return err
}
}
func processSingleItem(item Item) error {
// 查询相关表数据
data, err := tx.Query("SELECT ... WHERE ...", item.ID)
// 业务逻辑判断
if !shouldProcess(data) {
return nil
}
// 插入数据
return tx.Exec("INSERT ...", item.Data)
}
这样的代码很清晰,非常贴合这个功能逻辑的口头描述。 但是很快发现数据量稍微大一点时,运行起来性能不佳,因为在 for 循环里面做了数据库操作。
我们可以再分析一下优缺点:
- 优点
- 代码简单清晰,易于理解和维护
- 事务粒度小,锁持有时间短
- 内存占用稳定,不会因数据量大而 OOM
- 部分失败不影响其他记录处理
- 缺点
- 网络往返次数多(N+1 查询问题)
- 总体执行时间长
- 数据库连接利用率低
批量处理
于是很快就有了第二版代码,把数据库的操作变成批量的, 其优缺点基本和逐一处理相反,比如事务长,内存占用大,但是总体耗时短等等。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
func processBatch(items []Item) error {
// 批量查询所有需要的数据
allData, err := batchQueryData(tx, items)
if err != nil {
return err
}
// 批量计算和判断
var toInsert []InsertData
for i, item := range items {
if shouldProcess(allData[i]) {
toInsert = append(toInsert, buildInsertData(item, allData[i]))
}
}
// 批量插入
if len(toInsert) > 0 {
err = batchInsert(tx, toInsert)
if err != nil {
return err
}
}
return nil
}
分批处理
分批处理,结合两者优点
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
func processInBatches(items []Item, batchSize int) error {
for i := 0; i < len(items); i += batchSize {
end := i + batchSize
if end > len(items) {
end = len(items)
}
batch := items[i:end]
err := processBatch(batch)
if err != nil {
return fmt.Errorf("batch %d failed: %w", i/batchSize, err)
}
}
return nil
}
考虑因素
- 数据一致性要求
- 单个数据处理失败,需要整批数据回滚吗?
- 顺便考虑部分数据失败,如何返回错误信息?
- 数据量大小
- 小数据量(几十到几百条):逐一处理足够
- 中等数据量(几百到几千条):需要考虑批量处理
- 大数据量(上万条):需要更复杂的分批处理
- 逻辑复杂程度
- 数据计算复杂度远大于数据操作量时,为了保持代码的可读性,而性能上可以妥协,应该考虑逐一处理,让代码清晰表达单条数据的处理过程。
- 反之,如果逻辑简单,或者封装后代码清晰,那么数据的批量操作不至于导致可读性差,则可以选择批量处理。
遇到的困难
整个代码的演变过程很大可能是逐步优化,从“逐一处理”到“批量处理”再到“分批处理”。
先简单讨论从“批量处理”到“分批处理”, 这个过程是相对简单的,很大可能是找到分批的切入点, 可能是入参直接分批处理, 也可能是给某个 select 语句加上 offset 和 limit。
再来讨论从“逐一处理”到“批量处理”,我认为这个优化有时候是困难的。 尤其是我们会对处理逻辑进行封装和复用。考虑以下代码(从上面逐一处理稍微修改而来)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
for _, item := range items {
err := processSingleItem(item)
if err != nil {
return err
}
}
func processSingleItem(item Item) error {
// 业务逻辑判断
if !shouldProcess(item) {
return nil
}
// 插入数据
return tx.Exec("INSERT ...", item.Data)
}
func shouldProcess(item Item) bool {
// 查询相关表数据
data, err := tx.Query("SELECT ... WHERE ...", item.ID)
return err != nil || data.Status != 0 // 有对应的数据并且状态不为零,表示待处理
}
shouldProcess 方法可能被其他代码复用了,考虑这是一个更早存在的代码,并且有一定的复杂度,也有不少引用。 那么把它改成“批量处理”的模式,难度将暴增。
- 是否应该复用该方法
- 复用,则考虑是否提取对应的数据操作代码到外部
- 提取,则需要修改现存的引用代码,引入新 BUG 的风险也很大。
- 不提取,不修改这份代码,只能通过其他方式(下面继续讨论这个思路)
- 不复用,重新写一份功能一致,但是符合批量处理模式的代码
- 开发成本和维护成本都增加
- 开发成本:要先深入理解现有代码逻辑
- 维护成本:后续增减功能点都需要维护这两份代码,需要逐步废弃一份
- 这个方案的执行大概是:
- 新写一个批量处理的代码,然后把旧代码,则逐一处理的标记为废弃
- 新代码经过足够的测试,甚至生产验证,再替换掉旧代码的引用,或者逐一处理的内部调用批量处理代码,只是入参列表只有一个元素而已。
- 最后彻底删掉旧代码
- 开发成本和维护成本都增加
- 复用,则考虑是否提取对应的数据操作代码到外部
利用缓存
利用缓存可以保持复用又能批量处理,这里说的缓存并不只是指 redis 这样的特定技术,我们有很多可以做缓存的切面。
核心思路:
- 对于读操作,可以先批量读取并缓存,后续读取直接从缓存读取,
- 对于写操作,可以先缓存起来,最后批量做一次批量写操作。
示例代码,只对读操作做了缓存:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
var idList []int
for _, item := range items {
idList = append(idList, item.ID)
}
// 根据业务逻辑,先把所有需要用到的数据,都加载到缓存中
_,err := selectByIdList(idList)
for _, item := range items {
err := processSingleItem(item)
if err != nil {
return err
}
}
func processSingleItem(item Item) error {
// 业务逻辑判断
if !shouldProcess(item) {
return nil
}
// 插入数据
return tx.Exec("INSERT ...", item.Data)
}
func shouldProcess(item Item) bool {
// 查询相关表数据
data, err := selectById(item.ID) // 复用代码里,先读取缓存,减少数据读写操作次数
return err != nil || data.Status != 0 // 有对应的数据并且状态不为零,表示待处理
}
// 一般是DAO层代码,可以直接在这里做缓存,需要一定的管理逻辑,也可以考虑配合redis等做更加完善的缓存方案
var cacheMap = make(map[id]Data)
func selectByIdList(idList []int)([]Data, error){
results, err := tx.Query("SELECT ... WHERE id in ...", id)
for _, res := range results{
cacheMap[res.ID] = res // 批量查询时写入缓存
}
return results, err
}
func selectById(id int)(Data, error){
cacheData, ok := cacheMap[id]
if ok {// 单个查询时先找缓存,如果有则直接用
return cacheData, nil
}
return tx.Query("SELECT ... WHERE ...", id)
}
注意:
- 依然需要对旧代码做一次基本审查,确保新增的缓存逻辑不会引发 BUG
- 比如某个逻辑多次读写同一个表甚至同一行数据的情况,可能会出现 BUG
- 要求代码有一定的封装层次,常见的数据库访问,网络请求等等都会有一层自己的封装
让 AI 评价一下这个方案
直接贴 AI 的回答:
这个方案在实践中被称为 “预加载缓存模式” 或 “批量预取模式”。
优点:
- 代码复用性极高
- 渐进式优化,风险可控
- 现有代码完全不受影响(AI 说没有,其实还是有,如果现有代码又改又查还多次操作,而缓存又没有更新)
- 可以逐步为高频场景添加批量版本
- 出现问题时可以快速回退到单条处理
- 保持接口简洁
- 缓存生命周期清晰
缺点:
- 数据访问层的复杂性增加
- 预加载逻辑的准确性要求高
- 内存使用可能较高
- 事务管理的挑战
- 错误处理边界模糊
建议:
- 使用依赖注入明确数据源,则旧代码保持常规 dao 读取,新代码提供带有缓存功能的 dao
- 分层预加载策略,提供更加准确的预加载方式
- 缓存有效性检查,优化缓存的失效机制,避免数据滞后等等问题
适用场景
- 这个方案特别适合:
- 业务逻辑复杂,难以重构为纯批量处理
- 需要保持现有接口的稳定性
- 数据依赖关系相对明确和固定
- 团队对现有代码熟悉,希望最小化修改
- 不太适合:
- 数据依赖关系动态变化,难以预测
- 内存资源非常紧张
- 需要极致的性能(额外的缓存查找仍有开销)
总结 你提出的这个方案是一个非常实用的折中方案,它:
✅ 保持了代码复用性和一致性 ✅ 最小化了对现有代码的侵入 ✅ 提供了显著的性能提升 ⚠️ 但需要仔细设计缓存策略和数据预加载逻辑
其他方案
最后列举一下 AI 到的一些方案: 简单/片面的总结两句,我希望达成的是“避免两份实现”,如果抛开这个限制,那下面的方案都值得考虑。
- 适配器模式:调用一个方法,该方法会判断调用“逐一处理”还是“批量处理”
- 策略模式:思路同上,只是代码封装结构上的区别
- 装饰器模式 + 批量缓冲:以“逐一处理”作为方法,内部缓冲到一定的阈值再“批量处理”