在Linux下使用C++调用pthread API实现的一个线程池。
简介
这个线程池是在学习完《Linux/UNIX系统编程手册》中线程相关知识后用来练手的小项目,线程相关函数都是直接调用Linux的API,并且使用了C++中的queue和vector。
虽然C++中也提供了线程创建、互斥锁等函数库,但是也是对系统函数的封装。并且作为初学,先学会用原生函数比较好。
基础知识
pthread API
函数命名
1
2
3
4
5
6
7pthread_ 线程本身和各种相关函数
pthread_attr_ 线程属性对象
pthread_mutex_ 互斥量
pthread_mutexattr_ 互斥量属性对象
pthread_cond_ 条件变量
pthread_condattr_ 条件变量属性对象
pthread_key_ 线程数据键(Thread-specific data keys)线程管理相关函数
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30int pthread_create(pthread_t *restrict tidp,const pthread_attr_t *restrict_attr,void*(*start_rtn)(void*),void *restrict arg);
pthread_create是UNIX环境创建线程函数。
返回值:若成功则返回0,否则返回出错编号
参数:
第一个参数为指向线程标识符的指针。
第二个参数用来设置线程属性。
第三个参数是线程运行函数的地址。
最后一个参数是运行函数的参数。
int pthread_join(pthread_t thread, void **retval);
pthread_join()函数,以阻塞的方式等待thread指定的线程结束。当函数返回时,被等待线程的资源被收回。如果线程已经结束,那么该函数会立即返回。
返回值:0代表成功,失败返回错误号。
参数:
thread: 线程标识符,即线程ID,标识唯一线程。
retval: 用户定义的指针,用来存储被等待线程的返回值。
int pthread_detach(pthread_t thread);
使主线程与该线程分离,线程结束后,其退出状态不由其他线程获取,而直接自己自动释放。
返回值:0代表成功,失败返回错误号。
参数:线程标识符
int pthread_cancel(pthread_t thread);
该函数使目标线程停止执行,调用该方法后,被终止的线程并不一定立马被终止,只有在下次系统调用或调用了pthread_testcancel()方法后,才真正终止线程。
void pthread_exit(void* retval);
使当前线程结束运行。
int pthread_kill(pthread_t thread, int sig);
向指定ID的线程发送sig信号,如果线程代码内不做处理,则按照信号默认的行为影响整个进程,也就是说,如果你给一个线程发送了SIGQUIT,但线程却没有实现signal处理函数,则整个进程退出。
如果int sig是0呢,这是一个保留信号,一个作用是用来判断线程是不是还活着。
互斥锁
当不同的线程需要对同一块资源进行访问时,为了保证资源的安全,可以给其加锁。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16int pthread_mutex_init(pthread_mutex_t *restrict mutex, const pthread_mutexattr_t *restric attr);
对锁进行初始化。
PTHREAD_MUTEX_TIMED_N 这是缺省值,也就是普通锁。当一个线程加锁以后,其余请求锁的线程将形成一个等待队列,并在解锁后按优先级获得锁。这种锁策略保证了资源分配的公平性。
PTHREAD_MUTEX_RECURSIVE_NP 嵌套锁,允许同一个线程对同一个锁成功获得多次,并通过多次unlock解锁。如果是不同线程请求,则在加锁线程解锁时重新竞争。
PTHREAD_MUTEX_ERRORCHECK_NP 检错锁,如果同一个线程请求同一个锁,则返回EDEADLK,否则与PTHREAD_MUTEX_TIMED_NP类型动作相同。这样就保证当不允许多次加锁时不会出现最简单情况下的死锁。
PTHREAD_MUTEX_ADAPTIVE_NP 适应锁,动作最简单的锁类型,仅等待解锁后重新竞争。
int pthread_mutex_lock(pthread_mutex_t *mutex);
给互斥量加锁,如果该互斥量已经加锁,那么该线程将等待互斥量解锁。
返回值:成功:0,失败:错误码
int pthread_mutex_unlock(pthread_mutex_t *mutex);
返回值:成功:0,失败:错误码
int pthread_mutex_destory(pthread_mutex_t *mutex);
返回值:成功:0,失败:错误码
条件变量
与互斥锁不同,条件变量是用来等待而不是用来上锁的。条件变量用来自动阻塞一个线程,直 到某特殊情况发生为止。通常条件变量和互斥锁同时使用。
条件变量使我们可以睡眠等待某种条件出现。条件变量是利用线程间共享的全局变量进行同步 的一种机制,主要包括两个动作:1.一个线程等待”条件变量的条件成立”而挂起;2.另一个线程使 “条件成立”(给出条件成立信号)。
- 代码示例:
1
2
3
4
5
6
7
8
9In Thread1:
pthread_mutex_lock(&m_mutex);
pthread_cond_wait(&m_cond,&m_mutex);
pthread_mutex_unlock(&m_mutex);
In Thread2:
pthread_mutex_lock(&m_mutex);
pthread_cond_signal(&m_cond);
pthread_mutex_unlock(&m_mutex);
pthread_cond_wait(cond, mutex)的功能有3个:
调用者线程首先释放mutex
然后阻塞,等待被别的线程唤醒
当调用者线程被唤醒后,调用者线程会再次获取mutex
线程清理函数
- pthread_cleanup_push & pthread_cleanup_pop
pthread_cleanup_push()/pthread_cleanup_pop()的详解1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23在POSIX线程API中提供了一个pthread_cleanup_push()/pthread_cleanup_pop()函数对用于自动释放资源 --从pthread_cleanup_push()的调用点到pthread_cleanup_pop()之间的程序段中的终止动作(pthread_exit(),在执行return时不会执行清理函数)都将执行pthread_cleanup_push()所指定的清理函数。API定义如下:
void pthread_cleanup_push(void (*routine) (void *), void *arg)
void pthread_cleanup_pop(int execute)
pthread_cleanup_push()/pthread_cleanup_pop()采用先入后出的栈结构管理,void routine(void *arg)函数在调用pthread_cleanup_push()时压入清理函数栈,多次对pthread_cleanup_push()的调用将在清理函数栈中形成一个函数链,在执行该函数链时按照压栈的相反顺序弹出。execute参数表示执行到pthread_cleanup_pop()时是否在弹出清理函数的同时执行该函数,为0表示不执行,非0为执行;这个参数并不影响异常终止时清理函数的执行。
pthread_cleanup_push()/pthread_cleanup_pop()是以宏方式实现的,这是pthread.h中的宏定义:
#define pthread_cleanup_push(routine,arg)
{ struct _pthread_cleanup_buffer _buffer;
_pthread_cleanup_push (&_buffer, (routine), (arg));
#define pthread_cleanup_pop(execute)
_pthread_cleanup_pop (&_buffer, (execute)); }
可见,pthread_cleanup_push()带有一个"{",而pthread_cleanup_pop()带有一个"}",因此这两个函数必须成对出现,且必须位于程序的同一级别的代码段中才能通过编译。在下面的例子里,当线程在"do some work"中终止时,将主动调用pthread_mutex_unlock(mut),以完成解锁动作。
work"中终止时,将主动调用pthread_mutex_unlock(mut),以完成解锁动作。
pthread_cleanup_push(pthread_mutex_unlock, (void *) &mut);
pthread_mutex_lock(&mut);
/* do some work */
pthread_mutex_unlock(&mut);
pthread_cleanup_pop(0);
必须要注意的是,如果线程处于PTHREAD_CANCEL_ASYNCHRONOUS状态,上述代码段就有可能出错,因为CANCEL事件有可能在
pthread_cleanup_push()和pthread_mutex_lock()之间发生,或者在pthread_mutex_unlock()和pthread_cleanup_pop()之间发生,从而导致清理函数unlock一个并没有加锁的
mutex变量,造成错误。因此,在使用清理函数的时候,都应该暂时设置成PTHREAD_CANCEL_DEFERRED模式。
结构
为了缓存添加进来的任务,需要创建一个队列来存储任务,并且还用一个vector数组来存储线程。
使用的是生产者-消费者模型,其中addOneTask()是往队列中添加任务,是生产者。而执行任务的线程有多个,所以每一个线程都是消费者。
每次添加完任务之后,会使用条件变量通知“一个”空闲线程来执行任务
遇到的问题
惊群问题
在每次添加任务之后,需要通知一个线程来执行任务,这里如果使用pthread_cond_broadcast(),
就会唤醒所有线程,而只能有一个线程得到任务,其他线程只能回去继续等待。这样就造成了不必要的浪费。
但即使使用pthread_cond_signal(),好像有些系统的实现,也可能会唤醒不止一个线程。
为了解决这个问题,可以给每一个线程添加一个条件变量,如果有任务被添加,此时只要看哪个线程处于空闲状态,只通知那一个线程即可。静态函数访问非静态成员
在使用pthread_create()创建线程的时候,往里传的函数必须是静态函数,但是我们经常会需要在这个静态函数里访问类的非静态成员变量,那怎么办呢?
这里已知有两种方法解决这个问题:
1.创建线程时,需要用arg往里传递回调函数的参数,可以在这里把当前对象的地址封装到回调函数的参数arg里面,然后在回调函数中使用这个对象地址来调用他的非静态成员变量。
2.直接把需要访问的普通成员变量改成静态的。由于这种方法比较简单,并且已经满足当前需求,在线程池使用的这种方法。
代码
- github链接:https://github.com/TWS-YIFEI/ThreadPool
注释版代码:ThreadPool.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52//ThreadPool.h
using namespace std;
class ThreadPool;
//要处理的任务以及参数
typedef struct{
void *(*function)(void *);
void *arg;
}Task;
//线程结构体
typedef struct{
pthread_t threadid;
bool state;//false休闲 true忙碌
pthread_cond_t cond;
}Thread;
class ThreadPool{
private:
static queue<Task> task_queue; //任务队列
static pthread_mutex_t task_queue_mutex; //任务队列的互斥体
int maxqueuetaskcount; //最大队列数量,暂时没有用到该变量
static vector<Thread> thread_pool; //线程池
static pthread_mutex_t thread_pool_mutex; //线程池的互斥体
static bool shutdown; //销毁线程池的标志
private:
void initThreadPool(unsigned int count); //初始化线程池
int chooseLeisureThread(); //选择空闲线程
static void *threadFunction(void *arg); //
static void cleanupFunction(void *arg); //
static void excuteAndTest(int s,string str); //测试函数,如果s==0,说明函数调用出错,然后打印错误信息。
public:
ThreadPool(const ThreadPool&)=delete; //禁用红拷贝构造函数
ThreadPool& operator=(const ThreadPool&)=delete; //禁用拷贝赋值运算符
//explicit禁用隐式类型转换
explicit ThreadPool(int threadcount,int maxtaskcount); //带参构造(要创建的线程数量以及最大任务数量)
bool addOneTask(Task task); //添加一个任务
void destroyThreadPool(); //销毁线程池
};ThreadPool.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159//ThreadPool.cpp
std::queue<Task> ThreadPool::task_queue;
pthread_mutex_t ThreadPool::task_queue_mutex=PTHREAD_MUTEX_INITIALIZER;
std::vector<Thread> ThreadPool::thread_pool;
pthread_mutex_t ThreadPool::thread_pool_mutex=PTHREAD_MUTEX_INITIALIZER;
bool ThreadPool::shutdown=false;
//带参构造
ThreadPool::ThreadPool(int threadcount,int maxtaskcount):maxqueuetaskcount(maxtaskcount){
initThreadPool(threadcount);
}
void ThreadPool::initThreadPool(unsigned int count){
Thread tmp;
//该静态变量属于类(是不是直接改成类的成员比较好?)
static int pthcrearg[100005];
tmp.state=false;
for(unsigned int i=0;i<count;i++){
pthcrearg[i]=i;
excuteAndTest(
//初始化每个线程的条件变量
pthread_cond_init(&(tmp.cond),NULL),
"pthread_cond_init(tmp,cond)"
);
cout<<i<<" pthread_cond init sucess"<<endl;
excuteAndTest(
pthread_create(&(tmp.threadid),NULL,threadFunction,(void *)&pthcrearg[i]),
"pthread_create()"
);
thread_pool.emplace_back(tmp);
}
}
//选择空闲的线程,这里state需要改为atomic的!
int ThreadPool::chooseLeisureThread(){
for(int i=0;i<(int)thread_pool.size();i++){
if(thread_pool[i].state==false) return i;
}
return -1;
}
//每个线程的函数,在里面执行回调函数arg
void * ThreadPool::threadFunction(void *arg){
while(1){
excuteAndTest(
//给队列解锁,准备访问队列
pthread_mutex_lock(&(task_queue_mutex)),
"task_queu_mutex_lock in threadFunction"
);
//如果队列中没有任务,就解锁然后阻塞,等待新任务
while(task_queue.size()==0&&shutdown==false){
excuteAndTest(
pthread_cond_wait(&(thread_pool[*(int*)arg].cond),&(task_queue_mutex)),
"pthread_cond_wait in threadFunction"
);
}
if(shutdown==true){
break;
}
Task tt;
pthread_cleanup_push(cleanupFunction,arg); //设置线程清理函数
tt=task_queue.front(); //从队列中取任务
task_queue.pop();
excuteAndTest(
//解锁
pthread_mutex_unlock(&(task_queue_mutex)),
"pthread_mutex_unlock in threadFunction"
);
//从cleanup_push到这里 如果意外退出,清理函数会将task_queue_mutex解锁,此时不需要了,直接将函数出栈即可
pthread_cleanup_pop(0);
(tt.function)(tt.arg); //执行任务
thread_pool[*(int*)arg].state=false;
//设置进程取消点,检测是否有cancel信号
//线程取消功能处于启用状态且取消状态设置为延迟状态时,pthread_testcancel()函数有效。
//如果在取消功能处处于禁用状态下调用pthread_testcancel(),则该函数不起作用。
pthread_testcancel();
}
cout<<"thread "<<*(int*)arg<<" ready to shutdown"<<endl;
excuteAndTest(
pthread_mutex_unlock(&(task_queue_mutex)),
"pthread_mutex_unlock in threadFunction"
);
excuteAndTest(
//销毁条件变量
pthread_cond_destroy(&(thread_pool[*(int*)arg].cond)),
"destroy pthread in threadFunction"
);
cout<<"thread "<<*(int*)arg<<" cond destroy sucess"<<endl;
pthread_exit(NULL);
}
//线程清理函数
void ThreadPool::cleanupFunction(void *arg){
excuteAndTest(
pthread_mutex_unlock(&(task_queue_mutex)),
"pthread_mutex_unlock in cleanupFunction"
);
}
//添加任务
bool ThreadPool::addOneTask(Task task){
excuteAndTest(
//给任务队列加锁
pthread_mutex_lock(&task_queue_mutex),
"pthread_mutex_lock in addOneTask"
);
task_queue.push(task); //任务入队
excuteAndTest(
pthread_mutex_unlock(&task_queue_mutex),
"pthread_mutex_unlock in addOneTask"
);
excuteAndTest(
//给线程池加锁
pthread_mutex_lock(&thread_pool_mutex),
"pthread_mutex_lock in addOneTask"
);
int leisurethread=chooseLeisureThread(); //选择空闲线程
if(leisurethread>=0){
thread_pool[leisurethread].state=true;
excuteAndTest(
pthread_mutex_unlock(&thread_pool_mutex),
"pthread_mutex_unlock in addOneTask"
);
excuteAndTest(
//通知空闲线程工作,此时空闲线程未处于阻塞状态怎么办!
pthread_cond_signal(&(thread_pool[leisurethread].cond)),
"pthread_cond_signal in addOneTask"
);
return true;
}else{
cout<<"no leisure thread"<<endl;
excuteAndTest(
pthread_mutex_unlock(&thread_pool_mutex),
"pthread_mutex_unlock in addOneTask"
);
return true;
}
}
void ThreadPool::destroyThreadPool(){
//设置关闭标志
shutdown=true;
for(int i=0;i<(int)thread_pool.size();i++){
excuteAndTest(
//在销毁线程池时,空闲线程都阻塞在pthread_cond_wait了,需要先将其唤醒
pthread_cond_signal(&thread_pool[i].cond),
"cond_signal in destroyThreadPool"
);
cout<<"notified "<<i<<" pthread_cond to shutdown"<<endl;
}
}
void ThreadPool::excuteAndTest(int s,string str){
if(s!=0) cout<<str<<endl;
}
待解决
- 如果销毁线程池时,给每个线程发送了pthread_cond_signal,但是有线程还没有执行pthread_cond_wait,此时信号会丢失https://www.cnblogs.com/super119/archive/2011/07/29/2120761.html。
- 在添加任务后,对空闲线程发送pthread_cond_signal时,空闲线程未处于阻塞状态怎么处理?
- bool变量需要改为原子的atomic!
- 使用RAII机制的锁。
- queue锁的细粒度(无锁队列)
STL中queue不是线程安全的,所以如果加锁的话只能给整个队列加锁,而不能给入队和出队两个操作分别加锁。
所以添加任务和执行任务两个操作并不能同时进行。 - 线程优先级
- 销毁线程池时将自己设置的shutdown改成利用cancel信号可以吗。
其他
- 关于线程数量的设置
N核服务器,通过执行业务的单线程分析出本地计算时间为x,等待时间为y,则工作线程数(线程池线程数)设置为 N*(x+y)/x,能让CPU的利用率最大化。 - 思路不难,但是真正写代码才发现有好多需要考虑的细节。
- 待学习:使用gdb调试多线程线程的查看以及利用gdb调试多线程
- 阅读jdk里的线程池源码
参考
- 线程数究竟设多少合理
- C++并发实战17:线程安全的stack和queue
- linyacool WebServer中的线程池
- 用C++写线程池是怎样一种体验?
- 基于c++11的100行实现简单线程池
- 使用C++11实现线程池的两种方法
欢迎与我分享你的看法。
转载请注明出处:http://taowusheng.cn/