300字范文,内容丰富有趣,生活中的好帮手!
300字范文 > 可伸缩多线程任务队列

可伸缩多线程任务队列

时间:2024-03-01 11:54:47

相关推荐

可伸缩多线程任务队列

在我们的工作中,我们经常需要异步执行一些任务,下面介绍的这个可伸缩多线程队列,可满足我们的需求。

出自:/Articles/4148/Multithreaded-Job-Queue,主要有以下几个功能:

1、任务队列是多线程,许多任务可以异步进行,任务队列使用线程池来执行任务。

2、任务队列支持优先级,优先级高的任务优先执行(即使是后来添加的)

3、任务队列可以被暂停,但是用户还是可以添加任务,当任务队列被唤醒时,任务可以继续执行下去

4、在运行过程中,任务队列使用的线程池,用户可以自行增加和减少

大体框架主要由3个类构成

1、CJob,任务类,用户需要从该类派生来实现自身需要完成的任务

2、CJobExecuter,任务执行类,任务均由该类来调用执行,每一个类相当于对应一个线程

3、CMThreadedJobQ,多线程任务队列,添加任务已经任务的分发均由该类完成,该类维护一个任务队列和一个完成队列的线程池。

类图如下:

该例子中,CJobExecuter和CMThreadJobQ这两个类的调用关系是非常值得我们学习的,同时,CJob作为一个基类,子类派生可以实现不同的任务,可扩展性也不错。源代码解析如下:

Job.h文件:

class CJob{public:CJob();virtual ~CJob();BOOL m_Completed; //任务是否完成:TRUE 完成,FALSE 未完成static long lastUsedID; //最后的ID//================================================================================================//函数名: setPriority//函数描述:设置任务优先级//输入:[in] priority 优先级别//输出:无//返回:无//================================================================================================void setPriority(int priority);//================================================================================================//函数名: getPriority//函数描述:返回任务优先级//输入:无//输出:无//返回:任务优先级//================================================================================================int getPriority();//================================================================================================//函数名: getID//函数描述:返回任务ID//输入:无//输出:无//返回:任务ID//================================================================================================long getID();//================================================================================================//函数名: setAutoDelete//函数描述:设置完成任务后是否删除任务//输入:[in] autoDeleteFlag//输出:无//返回:无//================================================================================================void setAutoDelete(BOOL autoDeleteFlag = TRUE);//================================================================================================//函数名: AutoDelete//函数描述:返回删除任务标记//输入:无//输出:无//返回:任务标记//================================================================================================ BOOL AutoDelete();//================================================================================================//函数名: execute//函数描述:任务真正工作的函数,纯虚函数,需要子类化实现//输入:无//输出:无//返回:任务ID//================================================================================================virtual void execute() = 0; private:long m_ID;//任务IDBOOL m_autoDeleteFlag; //是否自动删除任务标记,TRUE 删除,FALSE 不删除,默认为TRUEint m_priority;//任务优先级,默认为5};

Job.cpp文件:

long CJob::lastUsedID = 0;CJob::CJob(){this->m_ID = InterlockedIncrement(&lastUsedID);this->m_autoDeleteFlag = TRUE;this->m_priority = 5;this->m_Completed= FALSE;}CJob::~CJob(){}BOOL CJob::AutoDelete(){return m_autoDeleteFlag;}void CJob::setAutoDelete(BOOL autoDeleteFlag){m_autoDeleteFlag = autoDeleteFlag;}long CJob::getID(){return this->m_ID;}int CJob::getPriority(){return this->m_priority; }void CJob::setPriority(int priority){this->m_priority = priority;}

JobExecuter.h文件:

//一个对象对应一个线程,执行任务Jobclass CJobExecuter{public:CJobExecuter(CMThreadedJobQ *pJobQ);virtual ~CJobExecuter();//================================================================================================//函数名: stop//函数描述:停止执行任务//输入:无//输出:无//返回:无//================================================================================================void stop();//================================================================================================//函数名: execute//函数描述:执行一个任务//输入:[in] pJob 任务指针//输出:无//返回:无//================================================================================================void execute(CJob* pJob);static UINT ThreadFunction(LPVOID pParam); //线程函数 CMThreadedJobQ* m_pJobQ; //指向线程任务队列指针CJob* m_pJob2Do; //指向正在执行任务的指针int m_flag; //线程执行标记CWinThread* m_pExecuterThread; //线程标识符};

JobExecuter.cpp文件:

#define STOP_WORKING -1#define KEEP_WORKING 0CJobExecuter::CJobExecuter(CMThreadedJobQ *pJobQ){this->m_pJobQ= pJobQ;this->m_pExecuterThread= AfxBeginThread(ThreadFunction,this);this->m_pJob2Do = NULL;this->m_flag = KEEP_WORKING;}CJobExecuter::~CJobExecuter(){if(this->m_pExecuterThread!= NULL ) {this->m_pExecuterThread->ExitInstance();delete m_pExecuterThread; }}UINT CJobExecuter::ThreadFunction(LPVOID pParam){ CJobExecuter *pExecuter = (CJobExecuter *)pParam;pExecuter->m_flag = 1;::Sleep(1);CSingleLock singleLock(&pExecuter->m_pJobQ->m_cs); while(pExecuter->m_flag !=STOP_WORKING ){if(pExecuter->m_pJob2Do!= NULL){pExecuter->m_pJob2Do->execute();pExecuter->m_pJob2Do->m_Completed = TRUE; if(pExecuter->m_pJob2Do->AutoDelete())delete pExecuter->m_pJob2Do;pExecuter->m_pJob2Do = NULL;}if(pExecuter->m_pJobQ == NULL) break;CSingleLock singleLock(&pExecuter->m_pJobQ->m_cs); singleLock.Lock();if(pExecuter->m_pJobQ->getNoOfExecuter() > pExecuter->m_pJobQ->getMaxNoOfExecuter()) //CJobExecuter个数大于最大值,自动销毁 {pExecuter->stop(); singleLock.Unlock(); }else{pExecuter->m_pJobQ->addFreeJobExecuter(pExecuter);//完成任务后,添加到CMThreadedJobQ的空闲队列中 singleLock.Unlock(); pExecuter->m_pJobQ->m_pObserverThread->ResumeThread(); pExecuter->m_pExecuterThread->SuspendThread(); }}if(pExecuter->m_pJobQ != NULL){pExecuter->m_pJobQ->deleteJobExecuter(pExecuter);}else{delete pExecuter;}return 0;}void CJobExecuter::execute(CJob* pJob){this->m_pJob2Do = pJob;::Sleep(0);this->m_pExecuterThread->ResumeThread();}void CJobExecuter::stop(){this->m_flag = STOP_WORKING;this->m_pExecuterThread->ResumeThread();}

MThreadedJobQ.h文件:

typedef CTypedPtrList< CPtrList ,CJob*>CJobQList;//线程池任务队列class CMThreadedJobQ{public:typedef struct THNODE{CJobExecuter* pExecuter;THNODE * pNext ;} THNODE;CMThreadedJobQ();virtual ~CMThreadedJobQ();//================================================================================================//函数名: deleteJobExecuter//函数描述:删除一个JobExecuter对象//输入:[in] pEx//输出:无//返回:无//================================================================================================void deleteJobExecuter(CJobExecuter *pEx);//================================================================================================//函数名: setMaxNoOfExecuter//函数描述:设置CJobExecuter的个数//输入:[in] value//输出:无//返回:无//================================================================================================void setMaxNoOfExecuter(int value);//================================================================================================//函数名: addJobExecuter//函数描述:添加一个CJobExecuter//输入:[in] pEx//输出:无//返回:无//================================================================================================void addJobExecuter(CJobExecuter *pEx);//================================================================================================//函数名: getJobExecuter//函数描述:返回一个CJobExecuter//输入:无//输出:无//返回:处理任务的指针//================================================================================================CJobExecuter* getJobExecuter();//================================================================================================//函数名: addFreeJobExecuter//函数描述:添加一个CJobExecuter//输入:[in] pEx//输出:无//返回:无//================================================================================================void addFreeJobExecuter(CJobExecuter *pEx);//================================================================================================//函数名: addJob//函数描述:添加一个任务//输入:[in] pJob//输出:无//返回:无//================================================================================================void addJob(CJob *pJob);//================================================================================================//函数名: getMaxNoOfExecuter//函数描述:获取CJobExecuter个数的最大值//输入:无//输出:无//返回:无//================================================================================================int getMaxNoOfExecuter();//================================================================================================//函数名: getNoOfExecuter//函数描述:获取当前CJobExecuter的个数//输入:无//输出:无//返回:无//================================================================================================int getNoOfExecuter();static UINT JobObserverThreadFunction(LPVOID);//================================================================================================//函数名: pause//函数描述:挂起JobObserverThread线程//输入:无//输出:无//返回:无//================================================================================================void pause();//================================================================================================//函数名: resume//函数描述:唤醒JobObserverThread线程//输入:无//输出:无//返回:无//================================================================================================void resume(); CWinThread* m_pObserverThread; //向空闲的executer线程添加任务的线程CCriticalSection m_cs; //关键代码段,用于互斥CJobQList m_jobQList;//任务队列private :BOOL m_pause; //JobObserverThread线程运行标记int m_MaxNoOfExecuter; //CJobExecuter最大个数int m_NoOfExecuter; //当前CJobExecuter个数THNODE* m_pFreeEList;//维护空闲处理任务线程的队列THNODE* m_pAllEList; //维护所有处理任务线程的队列};

MThreadedJobQ.cpp文件:

CMThreadedJobQ::CMThreadedJobQ(){m_MaxNoOfExecuter = 2;m_pause = FALSE;m_pObserverThread = AfxBeginThread(JobObserverThreadFunction,this);m_pFreeEList =NULL;m_NoOfExecuter =0;m_pAllEList = NULL;}CMThreadedJobQ::~CMThreadedJobQ(){THNODE* pTempNode;while (m_pAllEList != NULL) { pTempNode = m_pAllEList->pNext;delete m_pAllEList->pExecuter; delete m_pAllEList; m_pAllEList = pTempNode; } while (m_pFreeEList != NULL) { pTempNode = m_pFreeEList->pNext; delete m_pFreeEList; m_pFreeEList = pTempNode; } m_pObserverThread->ExitInstance(); delete m_pObserverThread;}void CMThreadedJobQ::pause(){this->m_pause = TRUE;}void CMThreadedJobQ::resume(){this->m_pause = FALSE;this->m_pObserverThread->ResumeThread();}UINT CMThreadedJobQ::JobObserverThreadFunction(LPVOID pParam){CMThreadedJobQ *pMTJQ = (CMThreadedJobQ *)pParam;CJobExecuter *pJExecuter;while(TRUE){Sleep(100);if(pMTJQ->m_pause != TRUE){while(!pMTJQ->m_jobQList.IsEmpty() ){pJExecuter = pMTJQ->getJobExecuter();if( pJExecuter!=NULL){pMTJQ->m_cs.Lock();pJExecuter->execute(pMTJQ->m_jobQList.GetHead());pMTJQ->m_jobQList.RemoveHead();AfxGetApp()->m_pMainWnd->PostMessage(REFRESH_LIST);pMTJQ->m_cs.Unlock();}else{break;}if(pMTJQ->m_pause == TRUE)break;}}pMTJQ->m_pObserverThread->SuspendThread();}return 0;}int CMThreadedJobQ::getNoOfExecuter(){return this->m_NoOfExecuter;}int CMThreadedJobQ::getMaxNoOfExecuter(){return this->m_MaxNoOfExecuter;}void CMThreadedJobQ::addJob(CJob *pJob){CJob * pTempJob;CSingleLock sLock(&this->m_cs);sLock.Lock(); POSITION pos,lastPos;pos = this->m_jobQList.GetHeadPosition(); lastPos = pos;if(pos != NULL)pTempJob =this->m_jobQList.GetHead();while(pos != NULL ){ if( pJob->getPriority() > pTempJob->getPriority())break;lastPos = pos;pTempJob =this->m_jobQList.GetNext(pos); } if(pos == NULL) this->m_jobQList.AddTail(pJob);elsethis->m_jobQList.InsertBefore(lastPos,pJob);this->m_pObserverThread->ResumeThread();sLock.Unlock();}void CMThreadedJobQ::addFreeJobExecuter(CJobExecuter *pEx){m_cs.Lock();THNODE* node = new THNODE;node->pExecuter = pEx;node->pNext = this->m_pFreeEList;this->m_pFreeEList = node;m_cs.Unlock();}CJobExecuter* CMThreadedJobQ::getJobExecuter(){THNODE *pTemp;CJobExecuter *pEx=NULL;m_cs.Lock();if(this->m_pFreeEList != NULL) //有空闲CJobExecuter,就返回 {pTemp = this->m_pFreeEList;this->m_pFreeEList = this->m_pFreeEList->pNext;pEx = pTemp->pExecuter;delete pTemp ;m_cs.Unlock();return pEx;}if(this->m_NoOfExecuter < this->m_MaxNoOfExecuter) //没有空闲CJobExecuter,并且当前CJobExecuter小于最大值,就生成一个新的CJobExecuter {pEx = new CJobExecuter(this);this->addJobExecuter(pEx);this->m_NoOfExecuter++;m_cs.Unlock();return pEx;}m_cs.Unlock();return NULL;}void CMThreadedJobQ::addJobExecuter(CJobExecuter *pEx){m_cs.Lock();THNODE* node = new THNODE;node->pExecuter= pEx;node->pNext = this->m_pAllEList;this->m_pAllEList = node;m_cs.Unlock();}void CMThreadedJobQ::setMaxNoOfExecuter(int value){this->m_cs.Lock();if(value >1 && value <11)this->m_MaxNoOfExecuter = value;m_pObserverThread->ResumeThread();this->m_cs.Unlock();}void CMThreadedJobQ::deleteJobExecuter(CJobExecuter *pEx){THNODE* pNode,*pNodeP;CSingleLock singleLock(&m_cs); singleLock.Lock(); if(this->m_pAllEList != NULL){pNode = this->m_pAllEList;if(pNode->pExecuter == pEx ) {this->m_pAllEList = pNode->pNext;delete pNode;}else{pNodeP =pNode;pNode = pNode->pNext ; while(pNode != NULL ){if(pNode->pExecuter== pEx ) break;pNodeP = pNode;pNode = pNode->pNext ; }if(pNode!= NULL){pNodeP->pNext = pNode->pNext;delete pNode;}}}this->m_NoOfExecuter--;singleLock.Unlock();pEx->stop();Sleep(1);delete pEx;}

以上,就是该可伸缩多线程任务的主体框架,当我们工作需要实现类似这样的需要:异步执行多个不同的任务时,这个例子就是一个很好的参考例子,我研究这些代码只是为了让我在遇到这种问题的时候,可以有一个思路去思考,而不至于无从下手,仅此而已。

本内容不代表本网观点和政治立场,如有侵犯你的权益请联系我们处理。
网友评论
网友评论仅供其表达个人看法,并不表明网站立场。