DelegateMQ
Loading...
Searching...
No Matches
stdlib/Thread.h
Go to the documentation of this file.
1#ifndef _THREAD_STD_H
2#define _THREAD_STD_H
3
27
28#include "delegate/IThread.h"
29#include "./predef/util/Timer.h"
30#include "ThreadMsg.h"
31#include <thread>
32#include <queue>
33#include <atomic>
34#include <condition_variable>
35#include <future>
36#include <optional>
37
38// Comparator for priority queue
40 bool operator()(const std::shared_ptr<ThreadMsg>& a, const std::shared_ptr<ThreadMsg>& b) const {
41 return static_cast<int>(a->GetPriority()) < static_cast<int>(b->GetPriority());
42 }
43};
44
48class Thread : public dmq::IThread
49{
50public:
55 Thread(const std::string& threadName, size_t maxQueueSize = 0);
56
59
64 bool CreateThread(std::optional<dmq::Duration> watchdogTimeout = std::nullopt);
65
67 void ExitThread();
68
70 std::thread::id GetThreadId();
71
73 static std::thread::id GetCurrentThreadId();
74
76 virtual bool IsCurrentThread() override;
77
79 std::string GetThreadName() { return THREAD_NAME; }
80
82 size_t GetQueueSize();
83
87 virtual void DispatchDelegate(std::shared_ptr<dmq::DelegateMsg> msg);
88
89private:
90 Thread(const Thread&) = delete;
91 Thread& operator=(const Thread&) = delete;
92
94 void Process();
95
96 void SetThreadName(std::thread::native_handle_type handle, const std::string& name);
97
102 void WatchdogCheck();
103
108 void ThreadCheck();
109
110 std::optional<std::thread> m_thread;
111#ifdef DMQ_ALLOCATOR
112 std::priority_queue<std::shared_ptr<ThreadMsg>,
113 std::vector<std::shared_ptr<ThreadMsg>, stl_allocator<std::shared_ptr<ThreadMsg>>>,
114 ThreadMsgComparator> m_queue;
115#else
116 std::priority_queue<std::shared_ptr<ThreadMsg>,
117 std::vector<std::shared_ptr<ThreadMsg>>,
118 ThreadMsgComparator> m_queue;
119#endif
120 std::mutex m_mutex;
121 std::condition_variable m_cv;
122
123 // Condition variable to wake up blocked producers when space is available
124 std::condition_variable m_cvNotFull;
125
126 const std::string THREAD_NAME;
127
128 // Max queue size for back pressure (0 = unlimited)
129 const size_t MAX_QUEUE_SIZE;
130
131 // Promise and future to synchronize thread start (constructed lazily in CreateThread)
132 std::optional<std::promise<void>> m_threadStartPromise;
133 std::optional<std::future<void>> m_threadStartFuture;
134
135 std::atomic<bool> m_exit;
136
137 // Watchdog related members
138 std::atomic<dmq::TimePoint> m_lastAliveTime;
139 std::unique_ptr<Timer> m_watchdogTimer;
140 dmq::ScopedConnection m_watchdogTimerConn;
141 std::unique_ptr<Timer> m_threadTimer;
142 dmq::ScopedConnection m_threadTimerConn;
143 std::atomic<dmq::Duration> m_watchdogTimeout;
144};
145
146#endif
Interface for cross-thread delegate dispatching.
Cross-platform thread for any system supporting C++11 std::thread (e.g. Windows, Linux).
Definition cmsis-rtos2/Thread.h:33
std::string GetThreadName()
Get thread name.
Definition stdlib/Thread.h:79
virtual void DispatchDelegate(std::shared_ptr< dmq::DelegateMsg > msg)
std::thread::id GetThreadId()
Get the ID of this thread instance.
static std::thread::id GetCurrentThreadId()
Get the ID of the currently executing thread.
~Thread()
Destructor.
virtual bool IsCurrentThread() override
Returns true if the calling thread is this thread.
bool CreateThread()
Definition cmsis-rtos2/Thread.cpp:42
Thread(const std::string &threadName, size_t maxQueueSize=0)
size_t GetQueueSize()
Get size of thread message queue.
Definition stdlib/Thread.cpp:107
void ExitThread()
Called once at program exit to shut down the worker thread.
A base class for a delegate enabled execution thread. Implemented by application code if asynchronous...
Definition IThread.h:22
RAII handle to a single Signal subscription. Disconnects automatically on destruction....
Definition Signal.h:104
Definition stl_allocator.h:29
Definition stdlib/Thread.h:39
bool operator()(const std::shared_ptr< ThreadMsg > &a, const std::shared_ptr< ThreadMsg > &b) const
Definition stdlib/Thread.h:40