Run Code
|
API
|
Code Wall
|
Misc
|
Feedback
|
Login
|
Theme
|
Privacy
|
Patreon
ThreadPool
//Title of this code #include <iostream> #include <thread> #include <list> #include <mutex> #include <queue> #include <vector> using namespace std; struct Worker { int workerdId; bool working; thread worker; Worker(int id) : workerdId(id), working(false) {} }; class Job { public: virtual ~Job() {} virtual void work() = 0; }; class ThreadPool { int numThreads; vector<Worker> threads; queue<Job*> jobs; list<Job*> doneJobs; thread schedulerThread; mutex schedulerLock; mutex jobsLock; mutex doneJobsLock; mutex threadLock; volatile bool stop = true; public: ThreadPool(int threads): numThreads(threads) {} ~ThreadPool() {} void initialize() { for (int i = 0; i < numThreads; ++i) { threads.push_back(Worker(i)); } } void addJob(Job* j) { jobsLock.lock(); jobs.push(j); jobsLock.unlock(); } void startJobScheduler() { stop = false; schedulerThread = thread(&ThreadPool::scheduler, this); schedulerThread.detach(); } void stopJobScheduler() { stop = true; } list<Job*> getDoneJobs() { list<Job*> ret; doneJobsLock.lock(); ret = doneJobs; doneJobs.clear(); doneJobsLock.unlock(); return ret; } private: void scheduler() { Job* job = NULL; int threadId = -1; while (!stop) { //cout << "Running scheduler: " << job << " " << threadId << endl; if (job == NULL) job = getJob(); if (threadId == -1) threadId = getThread(); if ((threadId != -1) && (job != NULL)){ threads[threadId].worker = thread(&ThreadPool::jobWrapper, this, threadId, job); threads[threadId].worker.detach(); job = NULL; threadId = -1; } } } void jobWrapper(int threadId, Job* j) { j->work(); jobIsDone(j); setThreadFree(threadId); } void jobIsDone(Job* j) { doneJobsLock.lock(); doneJobs.push_back(j); doneJobsLock.unlock(); } int getThread() { int threadId = -1; threadLock.lock(); for (int i = 0; i < numThreads; ++i){ if (!threads[i].working) { threads[i].working = true; threadId = i; break; } } threadLock.unlock(); return threadId; } void setThreadFree(int threadId) { threadLock.lock(); threads[threadId].working = false; threadLock.unlock(); } Job* getJob() { Job* j = NULL; jobsLock.lock(); if (!jobs.empty()) { j = jobs.front(); jobs.pop(); } jobsLock.unlock(); return j; } }; class MyJob : public Job { int num; public: MyJob(int i) : num(i) {} void work() { num = num * 2; } int get() { return num; } }; int main() { cout << "ThreadPool:" << endl; ThreadPool tp(3); tp.initialize(); tp.startJobScheduler(); vector<MyJob*> tasks; tasks.push_back(new MyJob(1)); tasks.push_back(new MyJob(2)); tasks.push_back(new MyJob(3)); tasks.push_back(new MyJob(4)); for (unsigned i = 0; i < tasks.size(); ++i) { tp.addJob(tasks[i]); } int n = tasks.size(); while (true) { list<Job*> l = tp.getDoneJobs(); for (auto it = l.begin(); it != l.end(); ++it) { cout << "Job is done: "<< dynamic_cast<MyJob*>(*it)->get() << endl; --n; } if(n < 1) break; } tp.stopJobScheduler(); for (unsigned i = 0; i < tasks.size(); ++i) { delete tasks[i]; } }
run
|
edit
|
history
|
help
0
Equilateral triangle
map_find
Binary Search
Wygner aprendizado
DayTempEnum
tuple_list_of
c++ car racing game
Stock buy/sell, maximum subarray problem
Inventory
Anagrams