ThreadPool-Cpp/ThreadPool.h
2023-06-12 17:41:12 +08:00

273 lines
11 KiB
C++
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

//
// Created by louyu on 2023/6/12.
//
#ifndef THREADPOOL_FINAL_THREADPOOL_H
#define THREADPOOL_FINAL_THREADPOOL_H
#include <vector>
#include <queue>
#include <memory>
#include <atomic>
#include <mutex>
#include <condition_variable>
#include <functional>
#include <unordered_map>
#include <thread>
#include <future>
#include <iostream>
const int TASK_MAX_THRESHOLD = 1024;
const int THREAD_MAX_THRESHOLD = 10;
const int THREAD_MAX_IDLE_TIME = 60; // 单位秒
// 线程池支持的模式
enum class PoolMode {
MODE_FIXED, // 固定数量线程
MODE_CACHED, // 线程数量可动态增长
};
class Thread {
public:
using ThreadFunc = std::function<void(int)>;
Thread(ThreadFunc func): func_(func), threadId_(generateId_ ++) {
}
~Thread() = default;
void start() { // 启动线程
std::thread t(func_, threadId_); // 创建一个线程执行一个线程函数
t.detach(); // 设置分离线程,否则出作用域后线程对象销毁,线程函数也会中止
}
unsigned getId() const {
return threadId_;
}
private:
ThreadFunc func_;
static unsigned generateId_;
unsigned threadId_; // 线程id会在线程池对象中建立id与线程对象的映射关系
};
unsigned Thread::generateId_ = 0;
// 线程池类型
class ThreadPool {
public:
ThreadPool()
: initThreadSize_(0)
, taskSize_(0)
, idleThreadSize_(0)
, curThreadSize_(0)
, taskQueMaxThreshold_(TASK_MAX_THRESHOLD)
, threadSizeThreshold_(THREAD_MAX_THRESHOLD)
, poolMode_(PoolMode::MODE_FIXED)
, isPoolRunning_(false) {
}
~ThreadPool() {
isPoolRunning_ = false;
// 等待线程池中所有的线程返回
std::unique_lock<std::mutex> lock(taskQueMtx_);
notEmpty_.notify_all(); // 线程池对象析构前,唤醒所有被阻塞的线程以返回结果
exitCond_.wait(lock, [&]() {return threads_.empty();});
}
void setMode(PoolMode mode) { // 设置线程池的工作模式
if(checkRunningState()) { // 若线程池已经在运行,禁止修改线程池模式
return;
}
poolMode_ = mode;
}
void setTaskQueMaxThreshold(unsigned threshold) { // 设置task任务队列任务上限
if(checkRunningState()) {
return;
}
if(poolMode_ == PoolMode::MODE_CACHED) {
taskQueMaxThreshold_ = threshold;
}
}
void setThreadSizeThreshold(unsigned threshold) { // 设置线程池cached模式下线程阈值
if(checkRunningState()) {
return;
}
threadSizeThreshold_ = threshold;
}
// 使用可变参模版编程使得submitTask可以接收任意任务函数与任意数量的参数
// pool.submitTask(sum, 10, 20);
// decltype可以根据括号内的表达式推导类型
template<class Func, class ...Args>
auto submitTask(Func &&func, Args &&...args) -> std::future<decltype(func(args...))> {
// 打包任务,放入任务队列
using RType = decltype(func(args...));
auto task = std::make_shared<std::packaged_task<RType()>>(
std::bind(std::forward<Func>(func), std::forward<Args>(args)...));
std::future<RType> result = task->get_future();
// 获取锁,任务队列不是线程安全的
std::unique_lock<std::mutex> lock(taskQueMtx_); // unique_lock构造同时会获取锁
// 用条件变量等待任务队列有空余wait一直等待直到后续条件满足 wait_for等待指定时间段 wait_until等待到某时间节点
// 用户提交任务最长阻塞不能超过1s否则判断提交任务失败返回
if(!notFull_.wait_for(lock, std::chrono::seconds(1), [&]() { return taskQue_.size() < taskQueMaxThreshold_; })) {// 不满足lambda表达式条件时wait释放锁
// 若等待1s条件依然不满足提交任务失败
std::cerr << "task queue is full, submit task fail." << std::endl;
auto emptyTask = std::make_shared<std::packaged_task<RType()>>( // 任务提交失败,运行一个空任务
[]() -> RType {return RType();});
(*emptyTask)(); // 执行空任务否则主线程调用future.get()会死锁
return emptyTask->get_future();
}
// 若有空余,将任务放入任务队列中
// using Task = std::function<void()>;
// 如何将一个带返回值的线程函数封装到std::function<void()>中用lambda表达式
taskQue_.emplace([task]() {
(*task)();
});
taskSize_ ++;
// 新放入任务任务队列必然不空notEmpty_通知
notEmpty_.notify_all();
// cached模式任务处理较为紧急适合小且快的任务多场景需要根据任务数量和空闲线程的数量决定是否动态扩充线程数
if(poolMode_ == PoolMode::MODE_CACHED
&& taskSize_ > idleThreadSize_
&& curThreadSize_ < threadSizeThreshold_) {
// 创建新线程
auto ptr = std::make_unique<Thread>(std::bind(&ThreadPool::threadFunc, this, std::placeholders::_1));
unsigned threadId = ptr->getId();
threads_.emplace(threadId, std::move(ptr)); // unique_ptr拷贝构造被删除要强转右值
threads_[threadId]->start(); // 启动线程
// 修改线程个数相关变量
curThreadSize_ ++;
idleThreadSize_ ++;
}
// 返回Result对象
return result;
}
void start(unsigned initThreadSize = std::thread::hardware_concurrency()) { // 开启线程池默认开启cpu核心数个线程
isPoolRunning_ = true; // 设置线程池启动状态
initThreadSize_ = initThreadSize; // 初始化初始线程个数
curThreadSize_ = initThreadSize;
// 创建线程对象
for(int i = 0; i < initThreadSize_; i ++) {
auto ptr = std::make_unique<Thread>(std::bind(&ThreadPool::threadFunc, this, std::placeholders::_1));
unsigned threadId = ptr->getId();
threads_.emplace(threadId, std::move(ptr)); // unique_ptr拷贝构造被删除要强转右值
}
// 启动所有线程
for(int i = 0; i < initThreadSize_; i ++) {
threads_[i]->start();
idleThreadSize_ ++; // 记录初始线程的数量
}
}
ThreadPool(const ThreadPool&) = delete;
ThreadPool& operator=(const ThreadPool&) = delete;
private:
void threadFunc(unsigned threadId) { // 定义线程函数
auto lastTime = std::chrono::high_resolution_clock().now(); // 记录该线程调度时间的变量
for(;;) {
Task task;
{
// 先获取锁
std::unique_lock<std::mutex> lock(taskQueMtx_);
// 当任务队列中有任务的时候,不论线程池是否要析构,先要把任务做完
while(taskQue_.empty()) { // 任务队列为空时
if(!isPoolRunning_) { // 唤醒后如果线程池要析构了,那么停止线程执行
threads_.erase(threadId); // 线程结束前把线程对象从线程列表容器中删除
exitCond_.notify_all(); // 通知线程池的析构函数,有线程析构
return;
}
// 否则,需要等待任务到来
// cached模式下有可能需要回收之前创建的线程即超过initThreadSize_数量的线程要进行回收
// 当前时间 - 上一次线程执行时间 > 60s
if(poolMode_ == PoolMode::MODE_CACHED) {
if(std::cv_status::timeout == notEmpty_.wait_for(lock, std::chrono::seconds(1))) { // 条件变量超时返回
auto now = std::chrono::high_resolution_clock().now();
auto dur = std::chrono::duration_cast<std::chrono::seconds>(now - lastTime);
if(dur.count() >= THREAD_MAX_IDLE_TIME && curThreadSize_ > initThreadSize_) {
// 开始回收线程
threads_.erase(threadId); // 把线程对象从线程列表容器中删除
// 记录线程数量的相关变量的值修改
curThreadSize_ --;
idleThreadSize_ --;
return;
}
}
} else { // fixed模式
// 等待notEmpty_条件
notEmpty_.wait(lock);
}
}
idleThreadSize_ --; // 空闲线程--
// 从任务队列取一个任务
task = taskQue_.front();
taskQue_.pop();
taskSize_ --;
// 如果依然有剩余任务,继续通知其它线程执行任务
if(!taskQue_.empty()) {
notEmpty_.notify_all();
}
// 取出任务后必然不满,通知可以继续提交任务
notFull_.notify_all();
} // 释放锁,不能执行任务的时候还占着锁
// 当前线程负责执行这个任务
if(task != nullptr) {
task(); // 执行任务function<void()>
}
idleThreadSize_ ++; // 任务执行完,空闲线程数量增加
lastTime = std::chrono::high_resolution_clock().now(); // 更新该线程被调度执行完的时间
}
}
bool checkRunningState() const {
return isPoolRunning_;
}
private:
std::unordered_map<unsigned, std::unique_ptr<Thread>> threads_; // 线程列表
unsigned initThreadSize_; // 初始线程数量
std::atomic<unsigned int> curThreadSize_; // 记录当前线程池中线程总数量
std::atomic<unsigned int> idleThreadSize_; // 记录空闲线程的数量
unsigned threadSizeThreshold_; // 线程数量上限阈值
// 这里队列里不能存裸指针,避免用户传入一个临时对象,使用智能指针延长外部传进来对象的生命周期
using Task = std::function<void()>;
std::queue<Task> taskQue_; // 任务队列
std::atomic<unsigned int> taskSize_; // 任务队列任务数,用原子变量保证原子性
unsigned taskQueMaxThreshold_; // 任务队列任务数上限
std::mutex taskQueMtx_; // 保证任务队列的线程安全
std::condition_variable notFull_; // 任务队列不满
std::condition_variable notEmpty_; // 任务队列不空
std::condition_variable exitCond_; // 等待线程资源全部回收
PoolMode poolMode_; // 当前线程池的工作模式
std::atomic<bool> isPoolRunning_; // 表示当前线程池的启动状态
};
#endif //THREADPOOL_FINAL_THREADPOOL_H