下面是原文链接,我是照着敲的。。。hi.baidu.com/boahegcrmdghots/item/f3ca1a3c2d47fcc52e8ec2e1
#include#include #include #include #include #include /*线程池里所有运行和等待的任务都是一个thread_job由于所有任务都在链表中,所以是一个链表结构*/typedef struct job{ /*回调函数,任务运行时会调用次函数,注意也可以声明为其他形式*/ void * (*process)(void *arg); void *arg; struct job *next;}thread_job;/*线程池结构*/typedef struct { pthread_mutex_t queue_lock; pthread_cond_t queue_ready; /*链表结构,线程池中所有等待任务*/ thread_job *queue_head; /*线程池销毁标记*/ int destroy; /**/ pthread_t *threadid; /*线程池中允许的最大线程数目*/ int max_thread_num; /*等待队列的任务数目*/ int cur_queue_size;}thread_pool;int add_job(void *(*process)(void *arg), void *arg);void *thread_runtine(void *arg);/*把线程池指针设为静态*/static thread_pool *pool = NULL;/*初始化线程池*/void pool_init(int max_thread_num){ pool = (thread_pool *)malloc(sizeof(thread_pool)); pthread_mutex_init(&(pool->queue_lock), NULL); pthread_cond_init(&(pool->queue_ready), NULL); pool->queue_head = NULL; pool->max_thread_num = max_thread_num; pool->cur_queue_size = 0; /*为线程池中的各个线程分配空间 做初始化*/ pool->threadid = (pthread_t *)malloc(max_thread_num*sizeof(pthread_t)); int i = 0; /*创建max_thread_num个线程*/ for(i=0; i threadid[i]), NULL, thread_runtine, NULL); }}/*每个线程都会执行runtine函数, 每个thread_rutine函数都是取出一个thread_job处理任务也就是说,线程池中可能有很多个thread_job需要cur_queue_size个线程去处理,处理前要先对线程池加锁,保证当前处理任务的线程处理thread_job时不会收到其他线程影响,能够互斥访问*/void *thread_runtine(void *arg){ printf("starting thread 0x%x\n", (unsigned int)pthread_self()); while(1) { /*对线程池加互斥锁*/ pthread_mutex_lock(&(pool->queue_lock)); /*如果线程当前的等待队列为0,并且没有被销毁,则处于阻塞状态 pthread_cond_wait是一个原子操作,等待前解锁,唤醒后会加锁*/ while( (pool->cur_queue_size == 0) && (!pool->destroy) ) { printf("thread 0x%x is waiting\n", (unsigned int)pthread_self()); pthread_cond_wait(&(pool->queue_ready), &(pool->queue_lock)); } /*线程池销毁了*/ if(pool->destroy) { pthread_mutex_lock(&(pool->queue_lock)); printf("thread 0x%x will exit\n", (unsigned int)pthread_self()); pthread_exit(NULL); } printf("thread 0x%x is starting to work\n", (unsigned int)pthread_self()); assert(pool->cur_queue_size != 0); assert(pool->queue_head != NULL); /*等待队列长度减1,并取出线程池中的线程链表中的头元素*/ pool->cur_queue_size--; thread_job *job = pool->queue_head; pool->queue_head = job->next; pthread_mutex_unlock(&(pool->queue_lock)); /*调用回调函数,执行任务, */ (*(job->process))(job->arg); free(job); job = NULL; } /*这一句是不可达的*/ pthread_exit(NULL);}/*向线程池中添加任务, 重点理解参数, 每个任务要做的事体现在process回调函数上*/int pool_add_job(void *(*process)(void *arg), void *arg){ thread_job *newjob = (thread_job *)malloc(sizeof(thread_job)); newjob->process = process; newjob->arg = arg; newjob->next = NULL; /*对线程池加锁*/ pthread_mutex_lock(&(pool->queue_lock)); thread_job *member = pool->queue_head; if(member != NULL) { while(member->next != NULL) member = member->next; member->next = newjob; } else { pool->queue_head = newjob; } assert(pool->queue_head != NULL); pool->cur_queue_size++; pthread_mutex_unlock(&(pool->queue_lock)); /*等待队列中有任务了,唤醒一个等待线程,如果所有线程都在忙碌,这句没有任何作用*/ pthread_cond_signal(&(pool->queue_ready)); return 0;}int pool_destroy(){ if(pool->destroy) return -1; pool->destroy = 1; /*唤醒所有等待线程,线程池要销毁了*/ pthread_cond_broadcast(&(pool->queue_ready)); /*阻塞等待线程退出,否则变成僵尸了*/ int i = 0; for(i=0; i max_thread_num; i++) { pthread_join(pool->threadid[i], NULL); } free(pool->threadid); thread_job *head = NULL; while(pool->queue_head != NULL) { head = pool->queue_head; pool->queue_head = pool->queue_head->next; free(head); } /*条件变量和互斥量也要销毁*/ pthread_mutex_destroy(&(pool->queue_lock)); pthread_cond_destroy(&(pool->queue_ready)); free(pool); pool = NULL; return 0;}void* myprocess(void *arg){ printf("threadid is 0x%x, working on task %d\n", (unsigned int)pthread_self(), *(int *)arg); sleep(1); return NULL;} int main(int argc, char **argv){ pool_init(3);/*初始话含有3个线程的线程池*/ int *workingnum = (int *)malloc(sizeof(int)*10); int i = 0; for(i=0; i<10; i++) { workingnum[i] = i; pool_add_job(myprocess, &workingnum[i]); } sleep(5); pool_destroy(); free(workingnum); return 0;}