线程池模式 (Thread Pool Pattern)

"线程池像出租车调度——车在站里等着,有客单时分配一辆,送完客回站待命。"——我发现


开篇故事

你开了一个餐厅。客人点单时,如果每次都去招聘并培训一个新厨师——太慢了。更好的方式是:

  1. 提前雇佣好 N 个厨师(创建 N 个线程)
  2. 客人来了,把菜谱放到出菜口(提交任务)
  3. 厨师做完一个菜,回来等下一个菜谱(从队列取任务)
  4. 打烊时,告诉厨师「做完手上这个就下班」(优雅关闭)

这就是线程池。创建线程有成本(几微秒到几十微秒),频繁创建/销毁线程会浪费资源。线程池复用线程,适合「大量短任务」的场景。

线程池架构:

┌───────────── 主线程 (老板) ──────────────────┐
│                                              │
│        ┌── 任务 ──┐                          │
│  submit──────────→  待办队列 (Task Queue)     │
│        └── 任务 ──┘                          │
│                                              │
│   ┌────────┐  取任务  ┌────────┐  取任务     │
│   │Worker A├─────────>│Worker B├─────────    │
│   │(线程1) │          │(线程2) │             │
│   └────────┘          └────────┘             │
└──────────────────────────────────────────────┘

本章适合谁

  • 已经会手动创建/销毁线程,但觉得太麻烦
  • 需要处理大量并发任务(如网络请求)
  • 想了解服务器/框架「幕后是怎么管理线程的」

你会学到什么

  1. 线程池的组成——Worker 线程 + 任务队列 + 同步
  2. 环形队列 (Ring Buffer)——固定大小的循环任务队列
  3. 任务提交——pool_submit(func, arg) 入队
  4. 优雅关闭——标记 shutdown → worker 清空队列 → join 回收
  5. 实际应用——用线程池批量处理数组数据

前置要求

  • 已掌握:mutex + condvar 的基本用法
  • 已掌握:函数指针 (void (*)(void *))
  • 理解回调函数的概念

第一个例子

/* 简化版线程池 */
typedef void (*TaskFunc)(void *);
typedef struct { TaskFunc func; void *arg; } Task;

/* 提交任务 */
pool_submit(pool, my_task, &data);
// 某个空闲 worker 会自动执行: my_task(&data)

编译:gcc -Wall -Wextra -std=c17 -pthread -o pool_demo pool_demo.c

原理解析

环形缓冲区 (Ring Buffer)

任务队列用数组实现,头和尾两个指针,满了就绕回开头:

队列大小 = 4:

  [task0] [task1] [task2] [task3]
    ↑head            ↑tail

head == tail 且 count == 0 → 空
head == tail 且 count == MAX → 满

Worker 线程循环

void *worker_loop(void *arg) {
    ThreadPool *pool = arg;
    while (1) {
        pthread_mutex_lock(&pool->mutex);
        while (count == 0 && !shutdown)
            pthread_cond_wait(&not_empty, &mutex);
        if (shutdown && count == 0) break;
        Task task = queue[head];  // 取任务
        head = (head+1) % max;    // 环形前进
        count--;
        pthread_cond_signal(&not_full);
        pthread_mutex_unlock(&mutex);

        task.func(task.arg);  // 执行任务
    }
    return NULL;
}

优雅关闭流程

pool_shutdown(pool):
  1. 设置 shutdown = 1
  2. cond_broadcast 唤醒所有 worker
  3. worker: 执行完队列剩余任务
  4. worker: 检查 shutdown 标记 → 退出循环
  5. pool_shutdown: join 回收所有 worker 线程

常见错误

❌ 错误 1: shutdown 后还 submit

pool_shutdown(pool, 4);
pool_submit(pool, task, &data);  // ❌ pool 已经关了!
// ✅ 先提交所有任务,再调用 shutdown

❌ 错误 2: 队列满时不等待

if (count == max_queue) {
    // ❌ 直接丢弃任务?还是阻塞?
}
// ✅ 用 cond_wait 等 not_full 信号

❌ 错误 3: worker 执行长任务导致队列积压

/* ❌ 一个任务耗时 10 秒 → 其他 worker 空闲 */
pool_submit(pool, long_task, &data);   // 10 秒
pool_submit(pool, quick_task, &data);  // 等 10 秒才被执行
// ✅ 拆分长任务,或限制队列深度做背压 (backpressure)

动手练习

🟢 入门:提交 3 个打印任务

创建 2 个 worker 的线程池,提交 3 个任务(每个打印自己的 id),优雅关闭。

点击查看答案
#include <stdio.h>
#include <pthread.h>

typedef void (*TaskFunc)(void *);
typedef struct { TaskFunc func; void *arg; } Task;

static void print_task(void *arg) {
    int id = *(int *)arg;
    printf("执行任务 %d\n", id);
}

/* ... thread pool 实现 ... */
int main(void) {
    int ids[] = {1, 2, 3};
    /* pool 创建后: */
    pool_submit(pool, print_task, &ids[0]);
    pool_submit(pool, print_task, &ids[1]);
    pool_submit(pool, print_task, &ids[2]);
    pool_shutdown(pool, 2);
    return 0;
}

🟡 中级:批量平方计算

提交 10 个任务,每个计算一个数字的平方,结果存回原 struct。提交完关闭线程池,打印所有结果。

点击查看答案

思路:每个任务接收一个 IntTask {id, value, result},任务函数里 result = value * value。提交 10 个后 shutdown,shutdown 保证所有任务执行完才返回。

🔴 挑战:实现动态线程池

当队列持续积压时,自动创建更多 worker 线程(上限 N)。当队列持续为空时,减少多余的 worker。

查看答案提示

思路:维护一个活跃 worker 计数和空闲 worker 计数。定时检查队列深度,超过阈值则创建新 worker;低于阈值一定时间则释放多余 worker。需要 pthread_detach 自动回收空闲 worker。

故障排查

Q:pool_shutdown 卡住了?

A:某个 worker 在执行的任务永远不返回。用 sleep 模拟超时、或者检查是否有死锁。

Q:任务被执行了两次?

A:环形队列的「满」和「空」条件判断有误。head == tail 在空和满时都成立,需要用 count 变量来区分。

Q:任务丢失了?

A:submit 时没有检查队列是否满了,直接覆盖。正确做法:等待 not_full 信号或返回错误码。

知识扩展

实际项目中的线程池

  • Nginx: 多进程 + listen fd 共享
  • Redis: 单线程事件循环 + background worker
  • gRPC: C++ ThreadPool 实现,支持动态扩缩
  • Linux 内核: workqueue 子系统,内核态线程池

任务类型分类

类型特征推荐线程池大小
CPU 密集型大量计算CPU 核心数
I/O 密集型等待网络/磁盘CPU 核心数 × 2~4
混合型计算+等待根据测量调整

小结

  • 线程池 = N 个 worker 线程 + 任务队列 + mutex + condvar
  • 提交任务:放入队列,cond_signal 唤醒空闲 worker
  • 优雅关闭:标记 shutdown → worker 清空队列 → join 回收
  • 适用场景:大量短任务,避免频繁创建/销毁线程
  • 核心数据结构:环形缓冲区

术语表

英文中文
Thread Pool线程池
Worker Thread工作线程
Task Queue任务队列
Ring Buffer环形缓冲区
Graceful Shutdown优雅关闭
Backpressure背压(限流)
Dynamic Scaling动态扩缩
CPU-boundCPU 密集型
I/O-boundI/O 密集型
Fire-and-forget投递即忘

延伸阅读

继续学习

你已经学会了如何高效复用线程。但还有一个场景:服务器同时有 1000 个客户端连接等着读数据——每个连接开一个线程?不行。下一章介绍 I/O 多路复用——一个线程监控所有文件描述符,哪个有数据就处理哪个。