r-type  0.0.0
R-Type main
Loading...
Searching...
No Matches
EventBus.hpp
Go to the documentation of this file.
1///
2/// @file EventBus.hpp
3/// @brief Thread-safe event bus implementation for inter-component communication
4/// @namespace utl
5///
6
7#pragma once
8
10#include "Utils/Event.hpp"
11#include "Utils/Logger.hpp"
12#include <atomic>
13#include <chrono>
14#include <condition_variable>
15#include <functional>
16#include <memory>
17#include <mutex>
18#include <queue>
19#include <type_traits>
20#include <unordered_map>
21#include <unordered_set>
22#include <vector>
23
24namespace utl
25{
26
27 ///
28 /// @brief Thread-safe event bus for decoupled component communication
29 ///
31 {
32 private:
33 ///
34 /// @brief Internal event queue structure
35 ///
37 {
38 std::queue<Event> events;
39 mutable std::mutex mutex;
40 std::condition_variable cv;
41 std::atomic<std::uint64_t> totalEvents{0};
42 };
43
44 ///
45 /// @brief Priority queue for events
46 ///
48 {
49 std::priority_queue<Event, std::vector<Event>, std::function<bool(const Event &, const Event &)>>
51 mutable std::mutex mutex;
52
54 : events([](const Event &a, const Event &b)
55 { return static_cast<int>(a.priority) < static_cast<int>(b.priority); })
56 {
57 }
58 };
59
62 std::unordered_map<EventType, std::unordered_set<std::uint32_t>> m_subscribers;
63 mutable std::mutex m_subscribersMutex;
65 mutable std::mutex m_statsMutex;
66 std::atomic<bool> m_running{true};
67 std::atomic<std::uint64_t> m_maxQueueSize{10000};
68
69 // Event filtering
70 std::unordered_set<EventType> m_filteredTypes;
71 mutable std::mutex m_filterMutex;
72
73 // Component tracking
74 std::unordered_map<std::uint32_t, std::string> m_componentNames;
75 mutable std::mutex m_componentMutex;
76
77 public:
78 ///
79 /// @brief Get singleton instance of EventBus
80 /// @return Reference to the singleton EventBus instance
81 ///
83 {
84 static EventBus instance;
85 return instance;
86 }
87
88 ///
89 /// @brief Constructor
90 ///
92 {
93 m_stats.reset();
94 utl::Logger::log("EventBus: Initialized", utl::LogLevel::INFO);
95 }
96
97 ///
98 /// @brief Destructor
99 ///
101 {
102 m_running.store(false);
103 m_eventQueue.cv.notify_all();
104 utl::Logger::log("EventBus: Destroyed", utl::LogLevel::INFO);
105 }
106
107 // Delete copy constructor and assignment operator
108 EventBus(const EventBus &) = delete;
109 EventBus &operator=(const EventBus &) = delete;
110 EventBus(EventBus &&) = delete;
112
113 ///
114 /// @brief Publish an event to the bus
115 /// @param event Event to publish
116 /// @return True if event was published successfully
117 ///
118 bool publish(const Event &event)
119 {
120 if (!m_running.load())
121 {
122 return false;
123 }
124
125 // Check if event type is filtered
126 {
127 std::lock_guard<std::mutex> lock(m_filterMutex);
128 if (m_filteredTypes.find(event.type) != m_filteredTypes.end())
129 {
130 return false; // Event type is filtered out
131 }
132 }
133
134 // Check queue size limit
135 {
136 std::lock_guard<std::mutex> lock(m_eventQueue.mutex);
137 if (m_eventQueue.events.size() >= m_maxQueueSize.load())
138 {
139 utl::Logger::log("EventBus: Queue full, dropping event type " +
140 std::to_string(static_cast<std::uint32_t>(event.type)),
142 return false;
143 }
144
145 m_eventQueue.events.push(event);
147 }
148
149 // Notify waiting consumers
150 m_eventQueue.cv.notify_all();
151
152 // Update statistics
153 updateStats(event, true);
154
155 utl::Logger::log("EventBus: Published event type " +
156 std::to_string(static_cast<std::uint32_t>(event.type)) + " from source " +
157 std::to_string(event.sourceId),
159
160 return true;
161 }
162
163 ///
164 /// @brief Publish event with serialized data
165 /// @tparam T Type of data to serialize
166 /// @param type Event type
167 /// @param data Data to serialize
168 /// @param sourceId Source component ID
169 /// @param targetId Target component ID (0 for broadcast)
170 /// @param priority Event priority
171 /// @return True if event was published successfully
172 ///
173 template <typename T>
174 bool publish(EventType type, const T &data, std::uint32_t sourceId = 0, std::uint32_t targetId = 0,
176 {
177 Event event(type, sourceId, targetId, priority);
178
179 // Serialize data using existing Serializer
180 try
181 {
182 rnp::Serializer serializer;
183 if constexpr (std::is_same_v<T, rnp::PacketWorldState>)
184 {
185 serializer.serializeWorldState(data);
186 }
187 else if constexpr (std::is_same_v<T, std::vector<rnp::EventRecord>>)
188 {
189 serializer.serializeEntityEvents(data);
190 }
191 else if constexpr (std::is_same_v<T, rnp::PacketLobbyCreate>)
192 {
193 serializer.serializeLobbyCreate(data);
194 }
195 else if constexpr (std::is_same_v<T, rnp::PacketLobbyCreateResponse>)
196 {
197 serializer.serializeLobbyCreateResponse(data);
198 }
199 else if constexpr (std::is_same_v<T, rnp::PacketLobbyListResponse>)
200 {
201 serializer.serializeLobbyListResponse(data);
202 }
203 else if constexpr (std::is_same_v<T, rnp::PacketLobbyJoin>)
204 {
205 serializer.serializeLobbyJoin(data);
206 }
207 else if constexpr (std::is_same_v<T, rnp::PacketLobbyJoinResponse>)
208 {
209 serializer.serializeLobbyJoinResponse(data);
210 }
211 else if constexpr (std::is_same_v<T, rnp::PacketLobbyUpdate>)
212 {
213 serializer.serializeLobbyUpdate(data);
214 }
215 else if constexpr (std::is_same_v<T, rnp::PacketGameStart>)
216 {
217 serializer.serializeGameStart(data);
218 }
219 else if constexpr (std::is_same_v<T, std::string>)
220 {
221 serializer.writeString(data);
222 }
223 else if constexpr (std::is_arithmetic_v<T>)
224 {
225 if constexpr (sizeof(T) == 1)
226 {
227 serializer.writeByte(static_cast<std::uint8_t>(data));
228 }
229 else if constexpr (sizeof(T) == 2)
230 {
231 serializer.writeUInt16(static_cast<std::uint16_t>(data));
232 }
233 else if constexpr (sizeof(T) == 4)
234 {
235 if constexpr (std::is_floating_point_v<T>)
236 {
237 serializer.writeFloat(static_cast<float>(data));
238 }
239 else
240 {
241 serializer.writeUInt32(static_cast<std::uint32_t>(data));
242 }
243 }
244 }
245 else
246 {
247 // For raw byte vectors, use directly
248 static_assert(std::is_same_v<T, std::vector<std::uint8_t>>,
249 "Unsupported data type for EventBus serialization");
250 event.data = data;
251 return publish(event);
252 }
253
254 event.data = serializer.getData();
255 return publish(event);
256 }
257 catch (const std::exception &e)
258 {
259 utl::Logger::log("EventBus: Failed to serialize data for event type " +
260 std::to_string(static_cast<std::uint32_t>(type)) + " - " + e.what(),
262 return false;
263 }
264 }
265
266 ///
267 /// @brief Consume events from the bus (non-blocking)
268 /// @param maxEvents Maximum number of events to consume
269 /// @return Vector of consumed events
270 ///
271 std::vector<Event> consume(std::uint32_t maxEvents = 100)
272 {
273 std::vector<Event> events;
274
275 std::lock_guard<std::mutex> lock(m_eventQueue.mutex);
276
277 std::uint32_t count = 0;
278 while (!m_eventQueue.events.empty() && count < maxEvents)
279 {
280 Event event = m_eventQueue.events.front();
281 m_eventQueue.events.pop();
282
283 // Check if event has expired
284 if (event.hasExpired())
285 {
286 updateStats(event, false, true);
287 continue;
288 }
289
290 events.push_back(event);
291 count++;
292 }
293
294 // Update statistics
295 for (const auto &event : events)
296 {
297 updateStats(event, false);
298 }
299
300 return events;
301 }
302
303 ///
304 /// @brief Consume events with timeout (blocking)
305 /// @param timeout Maximum time to wait for events
306 /// @param maxEvents Maximum number of events to consume
307 /// @return Vector of consumed events
308 ///
309 std::vector<Event> waitForEvents(std::chrono::milliseconds timeout = std::chrono::milliseconds(100),
310 std::uint32_t maxEvents = 100)
311 {
312 std::unique_lock<std::mutex> lock(m_eventQueue.mutex);
313
314 // Wait for events or timeout
315 if (m_eventQueue.cv.wait_for(lock, timeout, [this] { return !m_eventQueue.events.empty(); }))
316 {
317 return consume(maxEvents);
318 }
319
320 return {}; // Timeout occurred
321 }
322
323 ///
324 /// @brief Consume events of specific type
325 /// @param type Event type to filter
326 /// @param maxEvents Maximum number of events to consume
327 /// @return Vector of events of the specified type
328 ///
329 std::vector<Event> consumeType(EventType type, std::uint32_t maxEvents = 100)
330 {
331 std::vector<Event> allEvents = consume(maxEvents);
332 std::vector<Event> filteredEvents;
333
334 for (const auto &event : allEvents)
335 {
336 if (event.type == type)
337 {
338 filteredEvents.push_back(event);
339 }
340 else
341 {
342 // Put other events back in queue
343 std::lock_guard<std::mutex> lock(m_eventQueue.mutex);
344 m_eventQueue.events.push(event);
345 }
346 }
347
348 return filteredEvents;
349 }
350
351 ///
352 /// @brief Consume events targeted to specific component
353 /// @param targetId Target component ID
354 /// @param maxEvents Maximum number of events to consume
355 /// @return Vector of events targeted to the component
356 ///
357 std::vector<Event> consumeForTarget(std::uint32_t targetId, std::uint32_t maxEvents = 100)
358 {
359 std::vector<Event> allEvents = consume(maxEvents);
360 std::vector<Event> filteredEvents;
361
362 for (const auto &event : allEvents)
363 {
364 if (event.targetId == targetId || event.isBroadcast())
365 {
366 filteredEvents.push_back(event);
367 }
368 else
369 {
370 // Put other events back in queue
371 std::lock_guard<std::mutex> lock(m_eventQueue.mutex);
372 m_eventQueue.events.push(event);
373 }
374 }
375
376 return filteredEvents;
377 }
378
379 ///
380 /// @brief Subscribe component to specific event types
381 /// @param componentId Component ID
382 /// @param type Event type to subscribe to
383 ///
384 void subscribe(std::uint32_t componentId, EventType type)
385 {
386 std::lock_guard<std::mutex> lock(m_subscribersMutex);
387 m_subscribers[type].insert(componentId);
388
389 utl::Logger::log("EventBus: Component " + std::to_string(componentId) + " subscribed to event type " +
390 std::to_string(static_cast<std::uint32_t>(type)),
392 }
393
394 ///
395 /// @brief Unsubscribe component from specific event type
396 /// @param componentId Component ID
397 /// @param type Event type to unsubscribe from
398 ///
399 void unsubscribe(std::uint32_t componentId, EventType type)
400 {
401 std::lock_guard<std::mutex> lock(m_subscribersMutex);
402 auto it = m_subscribers.find(type);
403 if (it != m_subscribers.end())
404 {
405 it->second.erase(componentId);
406 if (it->second.empty())
407 {
408 m_subscribers.erase(it);
409 }
410 }
411
412 utl::Logger::log("EventBus: Component " + std::to_string(componentId) +
413 " unsubscribed from event type " +
414 std::to_string(static_cast<std::uint32_t>(type)),
416 }
417
418 ///
419 /// @brief Register component name for better debugging
420 /// @param componentId Component ID
421 /// @param name Component name
422 ///
423 void registerComponent(std::uint32_t componentId, const std::string &name)
424 {
425 std::lock_guard<std::mutex> lock(m_componentMutex);
426 m_componentNames[componentId] = name;
427
428 utl::Logger::log("EventBus: Registered component '" + name + "' with ID " + std::to_string(componentId),
430 }
431
432 ///
433 /// @brief Unregister component
434 /// @param componentId Component ID
435 ///
436 void unregisterComponent(std::uint32_t componentId)
437 {
438 {
439 std::lock_guard<std::mutex> lock(m_componentMutex);
440 auto it = m_componentNames.find(componentId);
441 if (it != m_componentNames.end())
442 {
443 utl::Logger::log("EventBus: Unregistered component '" + it->second +
444 "' (ID: " + std::to_string(componentId) + ")",
446 m_componentNames.erase(it);
447 }
448 }
449
450 // Remove from all subscriptions
451 std::lock_guard<std::mutex> lock(m_subscribersMutex);
452 for (auto &pair : m_subscribers)
453 {
454 pair.second.erase(componentId);
455 }
456 }
457
458 ///
459 /// @brief Add event type to filter (filtered events will be dropped)
460 /// @param type Event type to filter
461 ///
463 {
464 std::lock_guard<std::mutex> lock(m_filterMutex);
465 m_filteredTypes.insert(type);
466
467 utl::Logger::log("EventBus: Added filter for event type " +
468 std::to_string(static_cast<std::uint32_t>(type)),
470 }
471
472 ///
473 /// @brief Remove event type from filter
474 /// @param type Event type to unfilter
475 ///
477 {
478 std::lock_guard<std::mutex> lock(m_filterMutex);
479 m_filteredTypes.erase(type);
480
481 utl::Logger::log("EventBus: Removed filter for event type " +
482 std::to_string(static_cast<std::uint32_t>(type)),
484 }
485
486 ///
487 /// @brief Set maximum queue size
488 /// @param maxSize Maximum number of events in queue
489 ///
490 void setMaxQueueSize(std::uint64_t maxSize) { m_maxQueueSize.store(maxSize); }
491
492 ///
493 /// @brief Get current queue size
494 /// @return Current number of events in queue
495 ///
496 std::uint64_t getQueueSize() const
497 {
498 std::lock_guard<std::mutex> lock(m_eventQueue.mutex);
499 return m_eventQueue.events.size();
500 }
501
502 ///
503 /// @brief Get event bus statistics
504 /// @return Copy of current statistics
505 ///
507 {
508 std::lock_guard<std::mutex> lock(m_statsMutex);
509 EventStats stats = m_stats;
511 return stats;
512 }
513
514 ///
515 /// @brief Clear all statistics
516 ///
518 {
519 std::lock_guard<std::mutex> lock(m_statsMutex);
520 m_stats.reset();
521 }
522
523 ///
524 /// @brief Clear all events from queue
525 ///
526 void clear()
527 {
528 std::lock_guard<std::mutex> lock(m_eventQueue.mutex);
529 while (!m_eventQueue.events.empty())
530 {
531 m_eventQueue.events.pop();
532 }
533 utl::Logger::log("EventBus: Cleared all events from queue", utl::LogLevel::INFO);
534 }
535
536 ///
537 /// @brief Stop the event bus
538 ///
539 void stop()
540 {
541 m_running.store(false);
542 m_eventQueue.cv.notify_all();
543 utl::Logger::log("EventBus: Stopped", utl::LogLevel::INFO);
544 }
545
546 ///
547 /// @brief Check if event bus is running
548 /// @return True if running
549 ///
550 bool isRunning() const { return m_running.load(); }
551
552 private:
553 ///
554 /// @brief Update statistics for an event
555 /// @param event Event to update stats for
556 /// @param isPublish True if this is a publish operation
557 /// @param isExpired True if event expired
558 ///
559 void updateStats(const Event &event, bool isPublish, bool isExpired = false)
560 {
561 std::lock_guard<std::mutex> lock(m_statsMutex);
562
563 if (isPublish)
564 {
566 }
567 else
568 {
570 }
571
572 if (isExpired)
573 {
575 }
576
577 m_stats.eventTypeCount[event.type]++;
578 m_stats.priorityCount[event.priority]++;
579 m_stats.sourceCount[event.sourceId]++;
580
581 if (event.targetId != 0)
582 {
583 m_stats.targetCount[event.targetId]++;
584 }
585
586 // Note: We don't update max queue size here to avoid deadlock
587 // (getQueueSize() would try to lock m_eventQueue.mutex which may already be locked)
588 }
589 };
590
591} // namespace utl
Event structures and types for event-driven communication.
This file contains the Logger class.
Network packet serializer for RNP protocol.
Binary serializer for RNP protocol packets.
void serializeLobbyCreateResponse(const PacketLobbyCreateResponse &packet)
Serialize LOBBY_CREATE_RESPONSE packet.
void serializeLobbyListResponse(const PacketLobbyListResponse &packet)
Serialize LOBBY_LIST_RESPONSE packet.
void writeByte(std::uint8_t value)
Write a single byte.
void writeString(const std::string &str, std::size_t maxLength)
Write a string with length prefix.
void serializeWorldState(const PacketWorldState &packet)
Serialize WORLD_STATE packet.
void writeUInt16(std::uint16_t value)
Write a 16-bit integer (network byte order)
void serializeLobbyCreate(const PacketLobbyCreate &packet)
Serialize LOBBY_CREATE packet.
void serializeGameStart(const PacketGameStart &packet)
Serialize GAME_START packet.
void writeUInt32(std::uint32_t value)
Write a 32-bit integer (network byte order)
void writeFloat(float value)
Write a float (network byte order)
void serializeLobbyJoinResponse(const PacketLobbyJoinResponse &packet)
Serialize LOBBY_JOIN_RESPONSE packet.
const std::vector< std::uint8_t > & getData() const
Get the serialized data.
void serializeEntityEvents(const std::vector< EventRecord > &events)
Serialize multiple EventRecords for ENTITY_EVENT packet.
void serializeLobbyJoin(const PacketLobbyJoin &packet)
Serialize LOBBY_JOIN packet.
void serializeLobbyUpdate(const PacketLobbyUpdate &packet)
Serialize LOBBY_UPDATE packet.
Thread-safe event bus for decoupled component communication.
Definition EventBus.hpp:31
EventBus(EventBus &&)=delete
void filterEventType(EventType type)
Add event type to filter (filtered events will be dropped)
Definition EventBus.hpp:462
EventBus()
Constructor.
Definition EventBus.hpp:91
bool publish(const Event &event)
Publish an event to the bus.
Definition EventBus.hpp:118
std::mutex m_componentMutex
Definition EventBus.hpp:75
std::vector< Event > waitForEvents(std::chrono::milliseconds timeout=std::chrono::milliseconds(100), std::uint32_t maxEvents=100)
Consume events with timeout (blocking)
Definition EventBus.hpp:309
bool isRunning() const
Check if event bus is running.
Definition EventBus.hpp:550
std::mutex m_statsMutex
Definition EventBus.hpp:65
std::vector< Event > consumeForTarget(std::uint32_t targetId, std::uint32_t maxEvents=100)
Consume events targeted to specific component.
Definition EventBus.hpp:357
static EventBus & getInstance()
Get singleton instance of EventBus.
Definition EventBus.hpp:82
void unsubscribe(std::uint32_t componentId, EventType type)
Unsubscribe component from specific event type.
Definition EventBus.hpp:399
void clear()
Clear all events from queue.
Definition EventBus.hpp:526
void unfilterEventType(EventType type)
Remove event type from filter.
Definition EventBus.hpp:476
void updateStats(const Event &event, bool isPublish, bool isExpired=false)
Update statistics for an event.
Definition EventBus.hpp:559
EventBus(const EventBus &)=delete
EventBus & operator=(const EventBus &)=delete
std::atomic< std::uint64_t > m_maxQueueSize
Definition EventBus.hpp:67
~EventBus()
Destructor.
Definition EventBus.hpp:100
void setMaxQueueSize(std::uint64_t maxSize)
Set maximum queue size.
Definition EventBus.hpp:490
std::mutex m_filterMutex
Definition EventBus.hpp:71
void registerComponent(std::uint32_t componentId, const std::string &name)
Register component name for better debugging.
Definition EventBus.hpp:423
std::vector< Event > consumeType(EventType type, std::uint32_t maxEvents=100)
Consume events of specific type.
Definition EventBus.hpp:329
std::unordered_map< std::uint32_t, std::string > m_componentNames
Definition EventBus.hpp:74
EventStats m_stats
Definition EventBus.hpp:64
std::unordered_set< EventType > m_filteredTypes
Definition EventBus.hpp:70
std::unordered_map< EventType, std::unordered_set< std::uint32_t > > m_subscribers
Definition EventBus.hpp:62
void unregisterComponent(std::uint32_t componentId)
Unregister component.
Definition EventBus.hpp:436
std::mutex m_subscribersMutex
Definition EventBus.hpp:63
std::uint64_t getQueueSize() const
Get current queue size.
Definition EventBus.hpp:496
std::atomic< bool > m_running
Definition EventBus.hpp:66
bool publish(EventType type, const T &data, std::uint32_t sourceId=0, std::uint32_t targetId=0, EventPriority priority=EventPriority::NORMAL)
Publish event with serialized data.
Definition EventBus.hpp:174
std::vector< Event > consume(std::uint32_t maxEvents=100)
Consume events from the bus (non-blocking)
Definition EventBus.hpp:271
EventBus & operator=(EventBus &&)=delete
void stop()
Stop the event bus.
Definition EventBus.hpp:539
void subscribe(std::uint32_t componentId, EventType type)
Subscribe component to specific event types.
Definition EventBus.hpp:384
void clearStats()
Clear all statistics.
Definition EventBus.hpp:517
PriorityEventQueue m_priorityQueue
Definition EventBus.hpp:61
EventStats getStats() const
Get event bus statistics.
Definition EventBus.hpp:506
EventQueue m_eventQueue
Definition EventBus.hpp:60
Event statistics structure.
Definition Event.hpp:166
std::uint64_t totalEventsConsumed
Total events consumed.
Definition Event.hpp:169
std::unordered_map< std::uint32_t, std::uint64_t > targetCount
Count per target component.
Definition Event.hpp:177
std::uint64_t totalEventsPublished
Total events published.
Definition Event.hpp:168
std::uint64_t currentQueueSize
Current queue size.
Definition Event.hpp:171
std::unordered_map< EventPriority, std::uint64_t > priorityCount
Count per priority.
Definition Event.hpp:175
std::unordered_map< EventType, std::uint64_t > eventTypeCount
Count per event type.
Definition Event.hpp:174
std::uint64_t totalEventsExpired
Total events that expired.
Definition Event.hpp:170
void reset()
Reset all statistics.
Definition Event.hpp:182
std::unordered_map< std::uint32_t, std::uint64_t > sourceCount
Count per source component.
Definition Event.hpp:176
Event structure for inter-component communication.
Definition Event.hpp:89
std::uint32_t sourceId
ID of the component that sent the event (0 = system)
Definition Event.hpp:92
EventType type
Type of the event.
Definition Event.hpp:91
std::uint32_t targetId
ID of the target component (0 = broadcast)
Definition Event.hpp:93
static void log(const std::string &message, const LogLevel &logLevel)
Definition Logger.hpp:51
EventType
Event types for inter-component communication.
Definition Event.hpp:26
EventPriority
Event priority levels.
Definition Event.hpp:71
Internal event queue structure.
Definition EventBus.hpp:37
std::queue< Event > events
Definition EventBus.hpp:38
std::atomic< std::uint64_t > totalEvents
Definition EventBus.hpp:41
std::condition_variable cv
Definition EventBus.hpp:40
Priority queue for events.
Definition EventBus.hpp:48
std::priority_queue< Event, std::vector< Event >, std::function< bool(const Event &, const Event &)> > events
Definition EventBus.hpp:50