Linux线程池使用

02-01

一、线程池概述

线程池是一种多线程处理形式,处理过程中将任务添加到队列,然后在创建线程后自动启动这些任务。线程池线程都是后台线

程。每个线程都使用默认的堆栈大小,以默认的优先级运行,并处于多线程单元中。如果某个线程在托管代码中空闲(如正在等待

某个事件),则线程池将插入另一个辅助线程来使所有处理器保持繁忙。如果所有线程池线程都始终保持繁忙,但队列中包含挂起

的工作,则线程池将在一段时间后创建另一个辅助线程但线程的数目永远不会超过最大值。超过最大值的线程可以排队,但他们

要等到其他线程完成后才启动。应用程序可以有多个线程,这些线程在休眠状态中需要耗费大量时间来等待事件发生。其他线程

可能进入睡眠状态,并且仅定期被唤醒以轮循更改或更新状态信息,然后再次进入休眠状态。为了简化对这些线程的管理,为每

个进程提供了一个线程池,一个线程池有若干个等待操作状态,当一个等待操作完成时,线程池中的辅助线程会执行回调函数。

线程池中的线程由系统管理,程序员不需要费力于线程管理,可以集中精力处理应用程序任务。

使用线程池的好处:

1、减少在创建和销毁线程上所花的时间以及系统资源的开销;

2、如不使用线程池,有可能造成系统创建大量线程而导致消耗完系统内存以及”过度切换”。

在什么情况下使用线程池:

1、单个任务处理的时间比较短;

2、将需处理的任务的数量大。

在什么情况下不使用线程池线程:

1、如果需要使一个任务具有特定优先级;

2、如果具有可能会长时间运行(并因此阻塞其他任务)的任务;

3、如果需要将线程放置到单线程单元中(线程池中的线程均处于多线程单元中);

4、如果需要永久标识来标识和控制线程,比如想使用专用线程来终止该线程,将其挂起或按名称发现它。

二、线程池程序

main.c

#include <stdio.h>#include <unistd.h>#include "thread_pool.h"/**********************************************************************功能: 任务处理函数*参数: 无*返回值: NULL*********************************************************************/void *task_test(void *arg){ printf("/t/tworking on task %d/n", (int)arg); sleep(1); /*休息一秒,延长任务的执行时间*/ return NULL;}int main (int argc, char *argv[]){ pool_t pool; int i = 0; pool_init(&pool, 2);//初始化线程池 sleep(1); for(i = 0; i < 5; i++){ sleep(1); pool_add_task(&pool, task_test, (void *)i);//向线程池中添加一个任务 } sleep(4); pool_uninit(&pool);//销毁线程池 return 0;}

thread_pool.c

#include <stdio.h>#include <stdlib.h>#include <pthread.h>#include <assert.h>#include "thread_pool.h"static void *pool_thread_server(void *arg);/**********************************************************************功能: 初始化线程池结构体并创建线程*参数: pool:线程池句柄* threads_limit:线程池中线程的数量*返回值: 无*********************************************************************/void pool_init(pool_t *pool, int threads_limit){ pool->threads_limit = threads_limit; pool->queue_head = NULL; pool->task_in_queue = 0; pool->destroy_flag = 0; /*创建存放线程ID的空间*/ pool->threadid = (pthread_t *)calloc(threads_limit, sizeof(pthread_t)); int i = 0; /*初始化互斥锁和条件变量*/ pthread_mutex_init(&(pool->queue_lock), NULL); pthread_cond_init(&(pool->queue_ready), NULL); /*循环创建threads_limit个线程*/ for (i = 0; i < threads_limit; i++){ pthread_create(&(pool->threadid[i]), NULL, pool_thread_server, pool); } return;}/**********************************************************************功能: 销毁线程池,等待队列中的任务不会再被执行,* 但是正在运行的线程会一直,把任务运行完后再退出*参数: 线程池句柄*返回值: 成功:0,失败非0*********************************************************************/int pool_uninit(pool_t *pool){ pool_task *head = NULL; int i; pthread_mutex_lock(&(pool->queue_lock)); if(pool->destroy_flag)/* 防止两次调用 */ return -1; pool->destroy_flag = 1; pthread_mutex_unlock(&(pool->queue_lock)); /* 唤醒所有等待线程,线程池要销毁了 */ pthread_cond_broadcast(&(pool->queue_ready)); /* 阻塞等待线程退出,否则就成僵尸了 */ for (i = 0; i < pool->threads_limit; i++) pthread_join(pool->threadid[i], NULL); free(pool->threadid); /* 销毁等待队列 */ pthread_mutex_lock(&(pool->queue_lock)); while(pool->queue_head != NULL){ head = pool->queue_head; pool->queue_head = pool->queue_head->next; free(head); } pthread_mutex_unlock(&(pool->queue_lock)); /*条件变量和互斥量也别忘了销毁*/ pthread_mutex_destroy(&(pool->queue_lock)); pthread_cond_destroy(&(pool->queue_ready)); return 0;}/**********************************************************************功能: 向任务队列中添加一个任务*参数: pool:线程池句柄* process:任务处理函数* arg:任务参数*返回值: 无*********************************************************************/static void enqueue_task(pool_t *pool, pool_task_f process, void *arg){ pool_task *task = NULL; pool_task *member = NULL; pthread_mutex_lock(&(pool->queue_lock)); if(pool->task_in_queue >= pool->threads_limit){ printf("task_in_queue > threads_limit!/n"); pthread_mutex_unlock (&(pool->queue_lock)); return; } task = (pool_task *)calloc(1, sizeof(pool_task)); assert(task != NULL); task->process = process; task->arg = arg; task->next = NULL; pool->task_in_queue++; member = pool->queue_head; if(member != NULL){ while(member->next != NULL) /* 将任务加入到任务链连的最后位置. */ member = member->next; member->next = task; }else{ pool->queue_head = task; /* 如果是第一个任务的话,就指向头 */ } printf("/ttasks %d/n", pool->task_in_queue); /* 等待队列中有任务了,唤醒一个等待线程 */ pthread_cond_signal (&(pool->queue_ready)); pthread_mutex_unlock (&(pool->queue_lock));}/**********************************************************************功能: 从任务队列中取出一个任务*参数: 线程池句柄*返回值: 任务句柄*********************************************************************/static pool_task *dequeue_task(pool_t *pool){ pool_task *task = NULL; pthread_mutex_lock(&(pool->queue_lock)); /* 判断线程池是否要销毁了 */ if(pool->destroy_flag){ pthread_mutex_unlock(&(pool->queue_lock)); printf("thread 0x%lx will be destroyed/n", pthread_self()); pthread_exit(NULL); } /* 如果等待队列为0并且不销毁线程池,则处于阻塞状态 */ if(pool->task_in_queue == 0){ while((pool->task_in_queue == 0) && (!pool->destroy_flag)){ printf("thread 0x%lx is leisure/n", pthread_self()); /* 注意:pthread_cond_wait是一个原子操作,等待前会解锁,唤醒后会加锁 */ pthread_cond_wait(&(pool->queue_ready), &(pool->queue_lock)); } }else{ /* 等待队列长度减去1,并取出队列中的第一个元素 */ pool->task_in_queue--; task = pool->queue_head; pool->queue_head = task->next; printf("thread 0x%lx received a task/n", pthread_self()); } pthread_mutex_unlock(&(pool->queue_lock)); return task;}/**********************************************************************功能: 向线程池中添加一个任务*参数: pool:线程池句柄* process:任务处理函数* arg:任务参数*返回值: 0*********************************************************************/int pool_add_task(pool_t *pool, pool_task_f process, void *arg){ enqueue_task(pool, process, arg); return 0;}/**********************************************************************功能: 线程池服务程序*参数: 略*返回值: 略*********************************************************************/static void *pool_thread_server(void *arg){ pool_t *pool = NULL; pool = (pool_t *)arg; while(1){ pool_task *task = NULL; task = dequeue_task(pool); /*调用回调函数,执行任务*/ if(task != NULL){ printf ("thread 0x%lx is busy/n", pthread_self()); task->process(task->arg); free(task); task = NULL; } } /*这一句应该是不可达的*/ pthread_exit(NULL); return NULL;}

thread_pool.h

#ifndef __THREAD_POOL_H__#define __THREAD_POOL_H__#include <pthread.h>/********************************************************************** 任务回调函数,也可根据需要自行修改*********************************************************************/typedef void *(*pool_task_f)(void *arg);/********************************************************************** 任务句柄*********************************************************************/typedef struct _task{ pool_task_f process;/*回调函数,任务运行时会调用此函数,注意也可声明成其它形式*/ void *arg; /*回调函数的参数*/ struct _task *next;}pool_task;/********************************************************************** 线程池句柄*********************************************************************/typedef struct{ pthread_t *threadid; /* 线程号 */ int threads_limit; /* 线程池中允许的活动线程数目 */ int destroy_flag; /* 是否销毁线程池 , 0销毁,1不销毁*/ pool_task *queue_head; /* 链表结构,线程池中所有等待任务 */ int task_in_queue; /* 当前等待队列的任务数目 */ pthread_mutex_t queue_lock; /* 锁 */ pthread_cond_t queue_ready; /* 条件变量 */}pool_t;/**********************************************************************功能: 初始化线程池结构体并创建线程*参数: pool:线程池句柄* threads_limit:线程池中线程的数量*返回值: 无*********************************************************************/void pool_init(pool_t *pool, int threads_limit);/**********************************************************************功能: 销毁线程池,等待队列中的任务不会再被执行,* 但是正在运行的线程会一直,把任务运行完后再退出*参数: 线程池句柄*返回值: 成功:0,失败非0*********************************************************************/int pool_uninit(pool_t *pool);/**********************************************************************功能: 向线程池中添加一个任务*参数: pool:线程池句柄* process:任务处理函数* arg:任务参数*返回值: 0*********************************************************************/int pool_add_task(pool_t *pool, pool_task_f process, void *arg);#endif