VC++/Windows 如何使用线程池。

2024-11-05 11:05:07

1、使用信号量维护一个任务队列信号量内核对象:可以把一个信号量看作一个计数器,当信号量内的计数器为0时,等待该信号量的线程会被阻塞,当信号量内的计数器>0时,等待该信号量的线程会被激活。创建信号量:HANDLECreateSemaphore(LPSECURITY_ATTRIBUTESlpSemaphoreAttributes,//通常传入NULL即可LONGlInitialCount,//计数器初始值LONGlMaximumCount,//计数器最大值LPCTSTRlpName);增加信号量计数:BOOLReleaseSemaphore(HANDLEhSemaphore,//信号量的句柄LONGlReleaseCount,//计数器要增加的数值LPLONGlpPreviousCount);//计数器原来的值,不需要可以传入NULL等待信号量:WaitForSingleObjectWaitForMultipleObjects循环队列:循环队列可以用数组或者链表实现,当队列有“满”的情况时,使用数组是比较方便的。设置两个索引,一个用来写(nForWirte),一个用来读(nForRead),设最大容量为nMaxSize,则:如果 nForRead == nForWrite,队列为空;如果 ( nForWrite + 1 ) % nMaxSize == nForRead,队列已满。

2、CPP源码:#include"衡痕贤伎stdafx.h"#include"HXThreadPool.h"潮贾篡绐CHXWorker::CHXWorker(){}CHXWorker::~CHXWorker(){}CHXThreadPool::CHXThreadPool():m_bReady(FALSE),m_hForWriter(NULL),m_hForReader(NULL),m_phThreads(NULL),m_pWorkerQueue(NULL),m_nWriter(0),m_nReader(0),m_nMaxWorkers(0),m_nMaxThreads(0){}CHXThreadPool::~CHXThreadPool(){RelaseThreadPool();}BOOLCHXThreadPool::InitThreadPool(longnTaskQueue,intnMaxThreads){BOOLbResult=FALSE;intnThreadCreated=0;__try{if(m_bReady)__leave;//初始化临界资源InitializeCriticalSection(&m_csQueue);//创建两个信号量,一个读,一个写。m_hForReader=CreateSemaphore(NULL,0,nTaskQueue,NULL);m_hForWriter=CreateSemaphore(NULL,nTaskQueue,nTaskQueue,NULL);if(m_hForReader==NULL||m_hForWriter==NULL)__leave;//创建和初始化循环队列m_pWorkerQueue=newCHXWorker*[nTaskQueue];if(m_pWorkerQueue==NULL)__leave;for(inti=0;i<nTaskQueue;++i)m_pWorkerQueue[i]=NULL;m_nMaxWorkers=nTaskQueue;m_nReader=0;m_nWriter=0;//创建工作线程if(nMaxThreads<=0)m_nMaxThreads=GetCPUNumber()*2;elsem_nMaxThreads=nMaxThreads;m_phThreads=newHANDLE[m_nMaxThreads];if(m_phThreads==NULL)__leave;for(inti=0;i<m_nMaxThreads;++i){m_phThreads[i]=CreateThread(NULL,0,&CHXThreadPool::DoWorker,this,0,NULL);if(m_phThreads[i]==NULL)__leave;else++nThreadCreated;}bResult=TRUE;m_bReady=TRUE;}__finally{if(!bResult)//如果失败,清理资源{//DeletequeueEnterCriticalSection(&m_csQueue);if(m_pWorkerQueue!=NULL){delete[]m_pWorkerQueue;m_pWorkerQueue=NULL;m_nReader=0;m_nWriter=0;m_nMaxWorkers=0;}LeaveCriticalSection(&m_csQueue);//ClearThreads...if(m_phThreads!=NULL){ReleaseSemaphore(m_hForReader,nThreadCreated,NULL);WaitForMultipleObjects(nThreadCreated,m_phThreads,TRUE,INFINITE);for(inti=0;i<nThreadCreated;++i)CloseHandle(m_phThreads[i]);delete[]m_phThreads;m_phThreads=NULL;}//CloseSemaphores...if(m_hForReader!=NULL){CloseHandle(m_hForReader);m_hForReader=NULL;}if(m_hForWriter!=NULL){CloseHandle(m_hForWriter);m_hForWriter=NULL;}//Deletecs...DeleteCriticalSection(&m_csQueue);m_dwTimedout=INFINITE;}}returnbResult;}voidCHXThreadPool::RelaseThreadPool(){if(m_bReady){m_bReady=FALSE;EnterCriticalSection(&m_csQueue);if(m_pWorkerQueue!=NULL){delete[]m_pWorkerQueue;m_pWorkerQueue=NULL;m_nReader=0;m_nWriter=0;m_nMaxWorkers=0;}LeaveCriticalSection(&m_csQueue);//ClearThreads...if(m_phThreads!=NULL){ReleaseSemaphore(m_hForReader,m_nMaxThreads,NULL);WaitForMultipleObjects(m_nMaxThreads,m_phThreads,TRUE,INFINITE);for(inti=0;i<m_nMaxThreads;++i)CloseHandle(m_phThreads[i]);delete[]m_phThreads;m_phThreads=NULL;}//CloseSemaphores...if(m_hForReader!=NULL){CloseHandle(m_hForReader);m_hForReader=NULL;}if(m_hForWriter!=NULL){CloseHandle(m_hForWriter);m_hForWriter=NULL;}//Deletecs...DeleteCriticalSection(&m_csQueue);m_nReader=0;m_nWriter=0;m_dwTimedout=INFINITE;}}BOOLCHXThreadPool::PostAWorker(CHXWorker*pWorker)//投递一个工作{DWORDdwWaitfor;//如果队列已满,就等待,直到队列不满(被工作线程取走去处理)dwWaitfor=WaitForSingleObject(m_hForWriter,INFINITE);//将该工作放入循环队列中if(dwWaitfor==WAIT_OBJECT_0){longn;EnterCriticalSection(&m_csQueue);if(m_pWorkerQueue==NULL){LeaveCriticalSection(&m_csQueue);returnFALSE;}n=(m_nWriter+1)%m_nMaxWorkers;if(n==m_nReader){LeaveCriticalSection(&m_csQueue);returnFALSE;}else{m_pWorkerQueue[m_nWriter]=pWorker;m_nWriter=n;LeaveCriticalSection(&m_csQueue);}//增加读的信号量计数,以激活某线程去处理该工作ReleaseSemaphore(m_hForReader,1,NULL);}elsereturnFALSE;returnTRUE;}intCHXThreadPool::GetCurrentWorkers()//获取目前已经投递但尚未被处理的工作数量{intn;EnterCriticalSection(&m_csQueue);n=(m_nWriter-m_nReader+m_nMaxWorkers)%m_nMaxWorkers;LeaveCriticalSection(&m_csQueue);returnn;}DWORDCHXThreadPool::DoWorker(LPVOIDlpVoid)//工作线程{CHXThreadPool*pThreadPool=(CHXThreadPool*)lpVoid;CHXWorker*pWorker;DWORDdwWaitfor;while(TRUE){//如果队列是空的,就等待,直到有任务被投递进来dwWaitfor=WaitForSingleObject(pThreadPool->m_hForReader,INFINITE);if(dwWaitfor==WAIT_OBJECT_0){EnterCriticalSection(&(pThreadPool->m_csQueue));if(pThreadPool->m_pWorkerQueue==NULL){LeaveCriticalSection(&(pThreadPool->m_csQueue));break;}if(pThreadPool->m_nReader==pThreadPool->m_nWriter){LeaveCriticalSection(&(pThreadPool->m_csQueue));continue;}//从队列中取走工作pWorker=pThreadPool->m_pWorkerQueue[pThreadPool->m_nReader];pThreadPool->m_pWorkerQueue[pThreadPool->m_nReader]=NULL;pThreadPool->m_nReader=(pThreadPool->m_nReader+1)%pThreadPool->m_nMaxWorkers;LeaveCriticalSection(&(pThreadPool->m_csQueue));//增加写信号量计数,当队列满时,表示队列中有空余位置被腾出,可以继续投递新的工作了ReleaseSemaphore(pThreadPool->m_hForWriter,1,NULL);if(pWorker==NULL)continue;else{if(pWorker->DoWorker())//如果工作完成后返回TRUE,就在这里清理它。这个完全是自定义的deletepWorker;}}}return0;}intCHXThreadPool::GetCPUNumber()//获取系统中CPU的数量{SYSTEM_INFOsysInfo;GetSystemInfo(&sysInfo);return(int)sysInfo.dwNumberOfProcessors;}

猜你喜欢