initial commit

This commit is contained in:
Erik Bročko 2022-05-29 22:41:15 +02:00
commit 2c0dc2aa70
Signed by: ericek111
GPG Key ID: 414DED726771329C
7 changed files with 322 additions and 0 deletions

1
.clang-format Normal file
View File

@ -0,0 +1 @@
BasedOnStyle: WebKit

2
.gitignore vendored Normal file
View File

@ -0,0 +1,2 @@
cmake-build-*
.idea

9
CMakeLists.txt Normal file
View File

@ -0,0 +1,9 @@
cmake_minimum_required(VERSION 3.19)
project(tp_impl)
set(CMAKE_CXX_STANDARD 17)
add_subdirectory(libs/MyThreadPool)
add_executable(tp_impl main.cpp)
target_link_libraries(tp_impl PRIVATE MyThreadPool)

View File

@ -0,0 +1,10 @@
cmake_minimum_required(VERSION 3.19)
project(MyThreadPool)
set(CMAKE_CXX_STANDARD 17)
add_library(MyThreadPool INTERFACE include/SafePriorityQueue.h)
target_include_directories(MyThreadPool
INTERFACE include/
)

View File

@ -0,0 +1,173 @@
#pragma once
#include "SafePriorityQueue.h"
#include <atomic>
#include <cassert>
#include <condition_variable>
#include <functional>
#include <future>
#include <iostream>
#include <thread>
#include <utility>
#include <vector>
namespace MyThreadPool {
class ThreadPool {
public:
ThreadPool()
: ThreadPool(0)
{
}
explicit ThreadPool(size_t workers)
: threadCount(workers)
{
createWorkers();
}
template <typename F, typename... Args, typename R = std::invoke_result_t<std::decay_t<F>, std::decay_t<Args>...>>
std::future<R> enqueue(int priority, F&& f, Args&&... args)
{
using ReturnType = decltype(f(std::forward<Args>(args)...));
// encapsulate it for copying into the queue
auto funcShared = std::make_shared<std::packaged_task<ReturnType()>>(
std::bind(std::forward<F>(f), std::forward<Args>(args)...));
auto fut = funcShared->get_future();
// wrap the packaged task into a lambda
auto wrapperFunc = [task = std::move(funcShared)]() {
(*task)();
};
enqueueAndNotify(priority, std::move(wrapperFunc));
return fut;
}
size_t getTasksLeft() const
{
return queue.length();
}
size_t getTasksRunning() const
{
return threadsWorking;
}
size_t getTotalWorkers() const
{
return threadCount;
}
bool isIdle() const
{
return queue.empty() && threadsWorking == 0;
}
bool isClosed() const
{
return done;
}
/**
* Wait for all tasks to finish. Other threads can still new insert tasks into the pool.
* Will return upon emptying the queue.
*/
void waitForTasks(bool untilDone = true)
{
// Queue will empty when the last task is being processed, not when all tasks finish.
while (!queue.empty() || (untilDone && threadsWorking > 0)) {
std::unique_lock<std::mutex> poolLock(poolMtx);
poolCond.wait(poolLock);
}
}
~ThreadPool()
{
/*
* No need to wait until all tasks are truly finished. If, for whatever reason, one of the thread crashes or
* doesn't finish execution of the task, we can still join (wait for all threads to finish their tasks) and
* gracefully (though with possible corruption?) destroy the pool.
*/
waitForTasks(false);
destroyThreads();
}
private:
void enqueueAndNotify(int priority, std::function<void()>&& func)
{
queue.push(priority, std::move(func));
taskCond.notify_one();
}
/**
* Worker function for each thread in the pool.
*/
void worker()
{
while (!done) {
{
std::unique_lock<std::mutex> taskLock(taskMtx);
taskCond.wait(taskLock, [&]() { return done || !queue.empty(); });
}
// Don't even attempt to do another task if we're destroying the threads. Just bail.
if (done) {
return;
}
auto callable = std::move(queue.pop());
if (callable.has_value()) {
threadsWorking++;
callable.value()();
threadsWorking--;
poolCond.notify_all();
}
}
}
void createWorkers()
{
assert(threads.empty());
if (threadCount == 0) {
threadCount = std::thread::hardware_concurrency();
}
assert(threadCount > 0);
threads.reserve(threadCount);
for (size_t i = 0; i < threadCount; i++) {
threads.emplace_back(std::thread(&ThreadPool::worker, this));
}
}
void destroyThreads()
{
done = true;
taskCond.notify_all();
for (auto& thread : threads) {
if (thread.joinable()) {
thread.join();
}
}
}
SafePriorityQueue<std::function<void()>> queue;
std::vector<std::thread> threads;
size_t threadCount;
std::atomic<size_t> threadsWorking { 0 };
std::atomic<bool> done { false };
std::condition_variable taskCond;
mutable std::mutex taskMtx;
std::condition_variable poolCond;
mutable std::mutex poolMtx;
};
}

View File

@ -0,0 +1,90 @@
#pragma once
#include <forward_list>
#include <iostream>
#include <mutex>
#include <optional>
#include <shared_mutex>
namespace MyThreadPool {
/**
* Thread-safe FIFO buffer with insertion based on priority.
* Elements with higher priority value are popped first.
* Pop is O(1), push is O(n), worst case.
*
*/
template <typename T>
class SafePriorityQueue {
public:
void push(int priority, T& obj)
{
push(priority, std::forward<T>(obj));
}
void push(int priority, T&& obj)
{
std::unique_lock<std::shared_mutex> lock(mtx);
auto it = queue.cbegin();
auto prevIt = queue.cbefore_begin();
// move the iterator until a task with lower priority is found
for (; it != queue.cend(); prevIt = it++) {
if (it->first < priority) {
break;
}
}
// even nothing is found, it's okay, we'll append to the list
queue.emplace_after(prevIt, std::make_pair(priority, std::forward<T>(obj)));
queueLength++;
}
std::optional<T> pop()
{
std::unique_lock<std::shared_mutex> lock(mtx);
if (queue.empty()) {
return std::nullopt;
}
queueLength--;
auto ret = std::move(queue.front());
queue.pop_front();
return ret.second;
}
std::optional<T> peek() const
{
std::shared_lock<std::shared_mutex> lock(mtx);
if (queue.empty()) {
return std::nullopt;
}
return queue.front().second;
}
bool empty() const
{
std::shared_lock<std::shared_mutex> lock(mtx);
return queue.empty();
}
size_t length() const
{
std::shared_lock<std::shared_mutex> lock(mtx);
return queueLength;
}
private:
std::forward_list<std::pair<int, T>> queue;
mutable std::shared_mutex mtx;
size_t queueLength { 0 };
};
} // namespace MyThreadPool

37
main.cpp Normal file
View File

@ -0,0 +1,37 @@
#include <MyThreadPool.h>
#include <iostream>
int main()
{
MyThreadPool::ThreadPool tp(4);
auto task = [](int i) {
std::this_thread::sleep_for(std::chrono::milliseconds(500));
std::cout << "Testing future " << i << std::endl;
return i / 2;
};
tp.enqueue(
0, [](int i) {
std::this_thread::sleep_for(std::chrono::milliseconds(500));
std::cout << "Test " << i << std::endl;
return i / 2;
},
45);
auto fut = tp.enqueue(0, task, 10);
std::cout << "Returned: " << fut.get() << std::endl;
tp.waitForTasks();
std::cout << "Done." << std::endl;
tp.enqueue(0, []() {
std::this_thread::sleep_for(std::chrono::milliseconds(500));
std::cout << "Finished with destructor." << std::endl;
std::this_thread::sleep_for(std::chrono::milliseconds(500));
});
return 0;
}