最近自己写了一个线程池。
总的来说,线程池就是有一个任务队列,一个线程队列,线程队列不断地去取任务队列中的任务来执行,当任务队列中为空时,线程阻塞等待新的任务添加过来。
我是用queue来存放任务,vector存放thread*,然后用condition_variable 来设置线程阻塞和唤醒。
下面直接上代码吧。
线程池类头文件Thread_Pool.h
/********************************************
线程池头文件
Author:十面埋伏但莫慌
Time:2020/05/03
*********************************************/
#pragma once
#ifndef _THREAD_POOL_H_
#define _THREAD_POOL_H_
#include<thread>
#include<queue>
#include<mutex>
#include<atomic>
#include<vector>
#include<condition_variable>
typedef std::function<void()> Func;//定义线程执行函数类型,方便后面编码使用。
//任务类
class Task {
public:
Task() {}
~Task() {}
int push(Func func);//添加任务;
int getTaskNum();//获得当前队列中的任务数;
Func pop();//取出待执行的任务;
public:
std::mutex mx;//锁;
private:
std::queue<Func> tasks;//任务队列
};
//线程池类
class Thread_Pool {
public:
Thread_Pool() :IsStart(false) {}
~Thread_Pool();
int addTasks(Func tasks);//添加任务;
void start();//开启线程池;
void stop();//关闭线程池;
void run();//线程工作函数;
int getTaskNum();//获得当前队列中的任务数;
private:
static const int maxThreadNum = 3;//最大线程数为3;
std::condition_variable cond;//条件量;
std::vector<std::thread*> threads;//线程向量;
std::atomic<bool> IsStart;//原子变量,判断线程池是否运行;
Task tasks;//任务变量;
};
#endif
然后是线程池类成员函数定义文件Thread_Pool.cpp
/********************************************
线程池CPP文件
Author:十面埋伏但莫慌
Time:2020/05/03
*********************************************/
#include"Thread_Pool.h"
#include<iostream>
int Task::push(Func func) {
std::unique_lock<std::mutex> lock(mx);
try {
tasks.emplace(func);
}
catch (std::exception e)
{
throw e;
return -1;
}
return 0;
}
int Task::getTaskNum()
{
return tasks.size();
}
Func Task::pop() {
std::unique_lock<std::mutex> lock(mx);
Func temp;
if (tasks.empty())
return temp;
else
{
temp = tasks.front();
tasks.pop();
return temp;
}
}
int Thread_Pool::addTasks(Func func)
{
int ret = tasks.push(func);
cond.notify_one();
return ret;
}
void Thread_Pool::start() {
if (!IsStart) {
IsStart = true;
for (int i = 0; i < maxThreadNum; i++)
{
threads.emplace_back(new std::thread(std::bind(&Thread_Pool::run,this)));
}
}
}
void Thread_Pool::run()
{
while (IsStart)
{
Func f;
if (tasks.getTaskNum() == 0 && IsStart)
{
std::unique_lock<std::mutex> lock(tasks.mx);
cond.wait(lock);
}
if (tasks.getTaskNum() != 0 && IsStart)
{
f = tasks.pop();
if(f)
f();
}
}
}
int Thread_Pool::getTaskNum() {
return tasks.getTaskNum();
}
void Thread_Pool::stop() {
IsStart = false;
cond.notify_all();
for (auto T : threads) {
std::cout << "线程 " << T->get_id() << " 已停止。" << std::endl;
T->join();
if (T != nullptr)
{
delete T;
T = nullptr;
}
}
std::cout << "所有线程已停止。" << std::endl;
}
Thread_Pool::~Thread_Pool() {
if (IsStart)
{
stop();
}
}
最后是测试用的main.cpp
#include<iostream>
#include"Thread_Pool.h"
using namespace std;
void string_out_one() {
cout << "One!" << endl;
}
void string_out_two() {
cout << "Two!" << endl;
}
void string_out_three() {
cout << "Three!" << endl;
}
int main() {
{
Thread_Pool Pool;
try {
Pool.start();
}
catch (std::exception e)
{
throw e;
cout << "线程池创建失败。" << endl;
}
for (int i = 0; i < 50000 ;)
{
if (Pool.getTaskNum() < 1000) {
Pool.addTasks(string_out_one);
Pool.addTasks(string_out_two);
Pool.addTasks(string_out_three);
std::cout << i++ << std::endl;
}
}
getchar();
}
getchar();
return 0;
}
执行的效果如下:
线程唤醒和阻塞的逻辑就是在线程工作函数run函数中,判断队列是否为空,若为空则设置锁并调用condition变量的wait函数,释放这个线程中的锁并阻塞线程,等待任务队列中新的任务添加进来后,
condition变量通过notify_one()随机唤醒一个在wait的线程,取出队列中的任务执行。
写这个线程池的过程中碰到的最主要需要注意的就是锁的使用,在对队列的写和释放时要注意加锁,在需要阻塞线程时,要注意通过{}设置锁的范围。
IsStart是原子的,所以在写这个变量的时候没有另外加锁。
目前我觉得这个线程池的缺陷就是可执行函数的类型被写死了,有尝试对Task类使用模板类,但是在Thread_Pool中还是要指明Task模板类的类型参数,要是有大神指点下就好了- -。
就先记录这么多,感觉这个线程池的还是有很多可以改进的地方的,也欢迎大家指出不足。
到此这篇关于C++实现线程池的简单方法的文章就介绍到这了,更多相关C++实现线程池内容请搜索软件开发网以前的文章或继续浏览下面的相关文章希望大家以后多多支持软件开发网!
您可能感兴趣的文章:C++线程池的简单实现方法c++版线程池和任务池示例深入解析C++编程中线程池的使用基于C++11的threadpool线程池(简洁且可以带任意多的参数)c++实现简单的线程池c++实现简单的线程池c++线程池实现方法一种类似JAVA线程池的C++线程池实现方法