Run Code
|
API
|
Code Wall
|
Misc
|
Feedback
|
Login
|
Theme
|
Privacy
|
Patreon
Throttle Example using circular queue (push all but 2 less than maxSize; then pop all but 2 less than current size)
//Throttle Example using a circular queue using namespace std; #include <queue> #include <thread> #include <mutex> #include <condition_variable> #include <iostream> #include <unistd.h> #include "time.h" #include "sys/time.h" template <typename T> class Queue { public: Queue(int iMaxSize) { head = -1; tail = -1; maxSize = iMaxSize; data = new T[iMaxSize]; } Queue(const Queue& aQueue) { if(this != &aQueue) { head = aQueue.head; tail = aQueue.tail; maxSize = aQueue.maxSize; data = new T[maxSize]; for(int iCnt = 0; iCnt < maxSize; iCnt++) { data[iCnt] = aQueue.data[iCnt]; } } } T pop() { std::unique_lock<std::mutex> mlock(mutex_); while (isEmpty()) { cond_.wait(mlock); } if(head == -1) { std::cerr << "Queue Underflow!!" << std::endl; } else { T item = data[head]; if(head == tail) { head = -1; tail = -1; } else if(head == maxSize-1) { head = 0; } else { head++; } return item; } } void push(const T& item) { std::unique_lock<std::mutex> mlock(mutex_); if((head == 0 && tail == maxSize - 1) || (head == (tail + 1))) { std::cerr << "Queue Overflow!!" << std::endl; } else { if(tail == -1) { tail = 0; head = 0; } else if(tail == maxSize - 1) { tail = 0; } else { tail++; } } data[tail] = item; mlock.unlock(); cond_.notify_one(); } int size() { if(head == -1 && tail == -1) { return 0; } else { return abs(tail - head + 1); } } bool isEmpty() { if(head == -1 && tail == -1) { return true; } else { return false; } } bool isFull() { if((head == 0 && tail == maxSize - 1) || (head == (tail + 1))) { return true; } else { return false; } } int getHead() { return head; } int getTail() { return tail; } private: std::mutex mutex_; std::condition_variable cond_; int head; int tail; int maxSize; T* data; }; class Timer { public: static long long get_time_ms() { struct timeval t; gettimeofday(&t, NULL); return (long long)((t.tv_sec + (t.tv_usec / 1000000.0)) * 1000.0); } }; class LeakyBucket { public: LeakyBucket(int maxRate) : maxRate_(maxRate) { this->minTime_ = (long)(maxRate_); this->lastSchedAction_ = (long long) Timer::get_time_ms(); } const int getMaxRate() { return maxRate_; } const long getMinTime() { return minTime_; } const long long getLastSchedAction() { return lastSchedAction_; } void consume() { long long curTime = Timer::get_time_ms(); long long timeLeft; timeLeft = lastSchedAction_ + (minTime_ * 1000) - curTime; std::cerr << "Time Left = " << timeLeft << std::endl << std::flush; if(timeLeft > 0) { lastSchedAction_ += (minTime_ * 1000); } else { lastSchedAction_ = curTime; } if(timeLeft <= 0) { return; } else { sleep((int)((timeLeft+1)/1000)); } return; } private: int maxRate_; long minTime_; long long lastSchedAction_; }; int main() { const int MAXQUEUESIZE = 400; // Maximum size of Queue Queue<int> queue = Queue<int>(MAXQUEUESIZE); const int MAXFLOWRATE = 2; // Time interval at which to pick up items from the queue const int MAXCONCURRENTREQUESTRATE = 80; // Maximum picked out of queue for processing at any given time // Fill in all but 2 elements less than MAXQUEUESIZE for(int iCnt = 0; iCnt < MAXQUEUESIZE - 2; iCnt++) { queue.push(iCnt+1); } std::cerr << "Size of Queue = " << queue.size() << std::endl << std::flush; std::cerr << "Head = " << queue.getHead() << std::endl << std::flush; std::cerr << "Tail = " << queue.getTail() << std::endl << std::flush; LeakyBucket leakyBucket = LeakyBucket(MAXFLOWRATE); std::cerr << "Time : " << Timer::get_time_ms() << std::endl << std::flush; bool bForcedBreak = false; // Fetch all but two elements from the queue subject to its current size; force break when just 2 elements are remaining while(queue.isEmpty() == false && bForcedBreak == false) { for(int iCnt = 0; iCnt < MAXCONCURRENTREQUESTRATE; ++iCnt) { if(queue.isEmpty() == false) { std::cerr << "Fetched value :" << queue.pop() << std::endl << std::flush; if(queue.size() == 2) { bForcedBreak = true; break; } } else { break; } } if(queue.isEmpty() == false) { leakyBucket.consume(); std::cerr << "Fetching next set !!" << std::endl << std::flush; std::cerr << "Time : " << Timer::get_time_ms() << std::endl << std::flush; } } std::cerr << "Head = " << queue.getHead() << std::endl << std::flush; std::cerr << "Tail = " << queue.getTail() << std::endl << std::flush; std::cerr << "Done with fetching all values !!" << std::endl << std::flush; return 0; }
run
|
edit
|
history
|
help
0
Graphs Iteration1 Undirected.
Derivation of the 0x9E3779B97F4A7C17u constant
Test size_t
Erase a std::unordered_map::local_iterator by key
Dat pointers
Namespace scope qualifier
Bubble Sort Example
Division by zero exception example
test string
using directives: qualified lookup rules are different from unqualified lookup rules