commit 2c0dc2aa700262d0377992f6d5d2c467a98255a4 Author: ericek111 Date: Sun May 29 22:41:15 2022 +0200 initial commit diff --git a/.clang-format b/.clang-format new file mode 100644 index 0000000..586bd6f --- /dev/null +++ b/.clang-format @@ -0,0 +1 @@ +BasedOnStyle: WebKit diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..90058ad --- /dev/null +++ b/.gitignore @@ -0,0 +1,2 @@ +cmake-build-* +.idea diff --git a/CMakeLists.txt b/CMakeLists.txt new file mode 100644 index 0000000..86e80a7 --- /dev/null +++ b/CMakeLists.txt @@ -0,0 +1,9 @@ +cmake_minimum_required(VERSION 3.19) +project(tp_impl) + +set(CMAKE_CXX_STANDARD 17) + +add_subdirectory(libs/MyThreadPool) + +add_executable(tp_impl main.cpp) +target_link_libraries(tp_impl PRIVATE MyThreadPool) diff --git a/libs/MyThreadPool/CMakeLists.txt b/libs/MyThreadPool/CMakeLists.txt new file mode 100644 index 0000000..fac1adc --- /dev/null +++ b/libs/MyThreadPool/CMakeLists.txt @@ -0,0 +1,10 @@ +cmake_minimum_required(VERSION 3.19) +project(MyThreadPool) + +set(CMAKE_CXX_STANDARD 17) + +add_library(MyThreadPool INTERFACE include/SafePriorityQueue.h) + +target_include_directories(MyThreadPool + INTERFACE include/ +) diff --git a/libs/MyThreadPool/include/MyThreadPool.h b/libs/MyThreadPool/include/MyThreadPool.h new file mode 100644 index 0000000..daeb513 --- /dev/null +++ b/libs/MyThreadPool/include/MyThreadPool.h @@ -0,0 +1,173 @@ +#pragma once + +#include "SafePriorityQueue.h" +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace MyThreadPool { + +class ThreadPool { + +public: + ThreadPool() + : ThreadPool(0) + { + } + + explicit ThreadPool(size_t workers) + : threadCount(workers) + { + createWorkers(); + } + + template , std::decay_t...>> + std::future enqueue(int priority, F&& f, Args&&... args) + { + using ReturnType = decltype(f(std::forward(args)...)); + + // encapsulate it for copying into the queue + auto funcShared = std::make_shared>( + std::bind(std::forward(f), std::forward(args)...)); + + auto fut = funcShared->get_future(); + + // wrap the packaged task into a lambda + auto wrapperFunc = [task = std::move(funcShared)]() { + (*task)(); + }; + + enqueueAndNotify(priority, std::move(wrapperFunc)); + + return fut; + } + + size_t getTasksLeft() const + { + return queue.length(); + } + + size_t getTasksRunning() const + { + return threadsWorking; + } + + size_t getTotalWorkers() const + { + return threadCount; + } + + bool isIdle() const + { + return queue.empty() && threadsWorking == 0; + } + + bool isClosed() const + { + return done; + } + + /** + * Wait for all tasks to finish. Other threads can still new insert tasks into the pool. + * Will return upon emptying the queue. + */ + void waitForTasks(bool untilDone = true) + { + // Queue will empty when the last task is being processed, not when all tasks finish. + while (!queue.empty() || (untilDone && threadsWorking > 0)) { + std::unique_lock poolLock(poolMtx); + poolCond.wait(poolLock); + } + } + + ~ThreadPool() + { + /* + * No need to wait until all tasks are truly finished. If, for whatever reason, one of the thread crashes or + * doesn't finish execution of the task, we can still join (wait for all threads to finish their tasks) and + * gracefully (though with possible corruption?) destroy the pool. + */ + waitForTasks(false); + destroyThreads(); + } + +private: + void enqueueAndNotify(int priority, std::function&& func) + { + queue.push(priority, std::move(func)); + taskCond.notify_one(); + } + + /** + * Worker function for each thread in the pool. + */ + void worker() + { + while (!done) { + { + std::unique_lock taskLock(taskMtx); + taskCond.wait(taskLock, [&]() { return done || !queue.empty(); }); + } + + // Don't even attempt to do another task if we're destroying the threads. Just bail. + if (done) { + return; + } + + auto callable = std::move(queue.pop()); + if (callable.has_value()) { + threadsWorking++; + callable.value()(); + threadsWorking--; + poolCond.notify_all(); + } + } + } + + void createWorkers() + { + assert(threads.empty()); + + if (threadCount == 0) { + threadCount = std::thread::hardware_concurrency(); + } + + assert(threadCount > 0); + + threads.reserve(threadCount); + for (size_t i = 0; i < threadCount; i++) { + threads.emplace_back(std::thread(&ThreadPool::worker, this)); + } + } + + void destroyThreads() + { + done = true; + taskCond.notify_all(); + for (auto& thread : threads) { + if (thread.joinable()) { + thread.join(); + } + } + } + + SafePriorityQueue> queue; + std::vector threads; + size_t threadCount; + std::atomic threadsWorking { 0 }; + std::atomic done { false }; + + std::condition_variable taskCond; + mutable std::mutex taskMtx; + + std::condition_variable poolCond; + mutable std::mutex poolMtx; +}; + +} diff --git a/libs/MyThreadPool/include/SafePriorityQueue.h b/libs/MyThreadPool/include/SafePriorityQueue.h new file mode 100644 index 0000000..766547e --- /dev/null +++ b/libs/MyThreadPool/include/SafePriorityQueue.h @@ -0,0 +1,90 @@ +#pragma once + +#include +#include +#include +#include +#include + +namespace MyThreadPool { + +/** + * Thread-safe FIFO buffer with insertion based on priority. + * Elements with higher priority value are popped first. + * Pop is O(1), push is O(n), worst case. + * + */ +template +class SafePriorityQueue { + +public: + void push(int priority, T& obj) + { + push(priority, std::forward(obj)); + } + + void push(int priority, T&& obj) + { + std::unique_lock lock(mtx); + + auto it = queue.cbegin(); + auto prevIt = queue.cbefore_begin(); + // move the iterator until a task with lower priority is found + for (; it != queue.cend(); prevIt = it++) { + if (it->first < priority) { + break; + } + } + + // even nothing is found, it's okay, we'll append to the list + queue.emplace_after(prevIt, std::make_pair(priority, std::forward(obj))); + + queueLength++; + } + + std::optional pop() + { + std::unique_lock lock(mtx); + + if (queue.empty()) { + return std::nullopt; + } + + queueLength--; + + auto ret = std::move(queue.front()); + queue.pop_front(); + + return ret.second; + } + + std::optional peek() const + { + std::shared_lock lock(mtx); + + if (queue.empty()) { + return std::nullopt; + } + + return queue.front().second; + } + + bool empty() const + { + std::shared_lock lock(mtx); + return queue.empty(); + } + + size_t length() const + { + std::shared_lock lock(mtx); + return queueLength; + } + +private: + std::forward_list> queue; + mutable std::shared_mutex mtx; + size_t queueLength { 0 }; +}; + +} // namespace MyThreadPool diff --git a/main.cpp b/main.cpp new file mode 100644 index 0000000..8ab01a3 --- /dev/null +++ b/main.cpp @@ -0,0 +1,37 @@ +#include +#include + +int main() +{ + MyThreadPool::ThreadPool tp(4); + + auto task = [](int i) { + std::this_thread::sleep_for(std::chrono::milliseconds(500)); + std::cout << "Testing future " << i << std::endl; + return i / 2; + }; + + tp.enqueue( + 0, [](int i) { + std::this_thread::sleep_for(std::chrono::milliseconds(500)); + std::cout << "Test " << i << std::endl; + return i / 2; + }, + 45); + + auto fut = tp.enqueue(0, task, 10); + + std::cout << "Returned: " << fut.get() << std::endl; + + tp.waitForTasks(); + + std::cout << "Done." << std::endl; + + tp.enqueue(0, []() { + std::this_thread::sleep_for(std::chrono::milliseconds(500)); + std::cout << "Finished with destructor." << std::endl; + std::this_thread::sleep_for(std::chrono::milliseconds(500)); + }); + + return 0; +}