Run Code
|
API
|
Code Wall
|
Misc
|
Feedback
|
Login
|
Theme
|
Privacy
|
Patreon
Thread-safe Interval Average Calculator
//clang 3.7.0 // deepaknettem@gmail.com, // code released in public domain // a simple interval based thread-safe avg calculator - for learning purposes. // in a concurrent (transactional) system, something like this is useful in tracking latencies, and pin-pointing // the 'epoch' / 'time' at which latencies started going up or down. #include <iostream> #include <cstdint> #include <algorithm> #include <vector> #include <functional> #include <numeric> #include <chrono> #include <thread> #include <memory> #include <mutex> #include <random> #include <cassert> #include <future> #include <queue> // A simple thread-safe class to calculate interval-based moving averages template <typename T> class IntervalAvg { public: IntervalAvg( const int64_t& intervalInSeconds) : m_intervalInSeconds(intervalInSeconds), m_currentSecond(0), m_samples(std::make_shared<std::vector<T>>()) {} void AddSample(const int64_t& sample) { // get current timestamp (outside lock) auto time_since_epoch = std::chrono::duration_cast<std::chrono::seconds>(std::chrono::system_clock::now().time_since_epoch()); auto currentSecond = time_since_epoch.count(); std::shared_ptr<std::vector<T>> prevSamples(nullptr); auto prevSecond = 0; // acquire mutex (lock) { std::lock_guard<std::mutex> lck(m_mtx); if (m_currentSecond < currentSecond) { // capture older timestamp (for logging purposes), and update timestamp prevSecond = m_currentSecond; m_currentSecond = currentSecond; // copy shared_ptr within lock, so we hold a ref count to it prevSamples = m_samples; // update samples pointer m_samples = std::make_shared<std::vector<T>>(); } else if (m_currentSecond > currentSecond) { std::cout << "Unexpected" << std::endl; } else { m_samples->push_back(sample); } } // inside lock, only 1 thread would have been able to update the samples store // and hence has access to prevSamples // outside lock, calculate average in previous interval if (prevSamples != nullptr) { // sanity check assert(prevSamples.use_count() == 1); double sum = 0.0; if (prevSamples->size() > 0) { for (const auto& elem : *prevSamples) { sum+= elem; } double avg = sum / prevSamples->size(); std::cout << " Time: " << prevSecond << " Average: " << avg << " Sample Count " << prevSamples->size() << std::endl; } } } private: int64_t m_intervalInSeconds; int64_t m_currentSecond; std::shared_ptr<std::vector<T>> m_samples; std::mutex m_mtx; }; void test_sequential() { IntervalAvg<int64_t> avg(1); auto threadWork = [&avg](int64_t sample, int64_t sleepDuration = 50){ std::this_thread::sleep_for(std::chrono::milliseconds(sleepDuration)); avg.AddSample(sample); }; std::cout << "-------------------Test_Sequential-------------------------------" << std::endl; for (int i = 0; i < 40; i++) { threadWork(i); } } /* Using C++11 thread-pool implementation from https://github.com/galek/ThreadPool/blob/master/ThreadPool.h */ class ThreadPool { public: explicit ThreadPool(std::size_t threads = (std::max)(2u, std::thread::hardware_concurrency() * 2)); template<class F, class... Args> auto enqueue(F&& f, Args&&... args) -> std::future<typename std::result_of<F(Args...)>::type>; void wait_until_empty(); void wait_until_nothing_in_flight(); void set_queue_size_limit(std::size_t limit); ~ThreadPool(); private: // need to keep track of threads so we can join them std::vector< std::thread > workers; // the task queue std::queue< std::function<void()> > tasks; // queue length limit std::size_t max_queue_size = 100000; // stop signal bool stop = false; // synchronization std::mutex queue_mutex; std::condition_variable condition_producers; std::condition_variable condition_consumers; std::mutex in_flight_mutex; std::condition_variable in_flight_condition; std::atomic<std::size_t> in_flight; struct handle_in_flight_decrement { ThreadPool & tp; handle_in_flight_decrement(ThreadPool & tp_) : tp(tp_) { } ~handle_in_flight_decrement() { std::size_t prev = std::atomic_fetch_sub_explicit(&tp.in_flight, std::size_t(1), std::memory_order_acq_rel); if (prev == 1) { std::unique_lock<std::mutex> guard(tp.in_flight_mutex); tp.in_flight_condition.notify_all(); } } }; }; // the constructor just launches some amount of workers inline ThreadPool::ThreadPool(std::size_t threads) : in_flight(0) { for(size_t i = 0;i<threads;++i) workers.emplace_back( [this] { for(;;) { std::function<void()> task; bool notify; { std::unique_lock<std::mutex> lock(this->queue_mutex); this->condition_consumers.wait(lock, [this]{ return this->stop || !this->tasks.empty(); }); if(this->stop && this->tasks.empty()) return; task = std::move(this->tasks.front()); this->tasks.pop(); notify = this->tasks.size() + 1 == max_queue_size || this->tasks.empty(); } handle_in_flight_decrement guard(*this); if (notify) { std::unique_lock<std::mutex> lock(this->queue_mutex); condition_producers.notify_all(); } task(); } } ); } // add new work item to the pool template<class F, class... Args> auto ThreadPool::enqueue(F&& f, Args&&... args) -> std::future<typename std::result_of<F(Args...)>::type> { using return_type = typename std::result_of<F(Args...)>::type; auto task = std::make_shared< std::packaged_task<return_type()> >( std::bind(std::forward<F>(f), std::forward<Args>(args)...) ); std::future<return_type> res = task->get_future(); std::unique_lock<std::mutex> lock(queue_mutex); if (tasks.size () >= max_queue_size) // wait for the queue to empty or be stopped condition_producers.wait(lock, [this] { return tasks.size () < max_queue_size || stop; }); // don't allow enqueueing after stopping the pool if (stop) throw std::runtime_error("enqueue on stopped ThreadPool"); tasks.emplace([task](){ (*task)(); }); std::atomic_fetch_add_explicit(&in_flight, std::size_t(1), std::memory_order_relaxed); condition_consumers.notify_one(); return res; } // the destructor joins all threads inline ThreadPool::~ThreadPool() { { std::unique_lock<std::mutex> lock(queue_mutex); stop = true; condition_consumers.notify_all(); condition_producers.notify_all(); } for(std::thread &worker: workers) worker.join(); assert(in_flight == 0); } inline void ThreadPool::wait_until_empty() { std::unique_lock<std::mutex> lock(this->queue_mutex); this->condition_producers.wait(lock, [this]{ return this->tasks.empty(); }); } inline void ThreadPool::wait_until_nothing_in_flight() { std::unique_lock<std::mutex> lock(this->in_flight_mutex); this->in_flight_condition.wait(lock, [this]{ return this->in_flight == 0; }); } inline void ThreadPool::set_queue_size_limit(std::size_t limit) { std::unique_lock<std::mutex> lock(this->queue_mutex); std::size_t const old_limit = max_queue_size; max_queue_size = (std::max)(limit, std::size_t(1)); if (old_limit < max_queue_size) condition_producers.notify_all(); } void test_concurrent() { ThreadPool pool(8); IntervalAvg<int64_t> avg(5); auto threadWork = [&avg](int64_t sample, int64_t sleepDuration = 50){ std::this_thread::sleep_for(std::chrono::milliseconds(sleepDuration)); avg.AddSample(sample); return 0; }; std::cout << "-------------------Test_Concurrent-------------------------------" << std::endl; std::random_device rd; std::mt19937 mt(rd()); std::uniform_int_distribution<int> dist(500, 4000); int numThreads = 50; std::vector<std::future<int>> results; for (int i = 0; i < numThreads; i++) { results.emplace_back(pool.enqueue(threadWork, i, dist(mt))); } pool.wait_until_empty(); pool.wait_until_nothing_in_flight(); for (auto && result : results) result.get(); } int main() { test_sequential(); test_concurrent(); }
run
|
edit
|
history
|
help
0
__FUNCTION__ not a preprocessor macro on clang
using directives: qualified lookup rules are different from unqualified lookup rules
What's the problem with this?
<string> No indirect include of <errno.h>
Struct memory ordering
virtual members
Test3
GetTypeName
First test
Access to temporary object