14#include <condition_variable>
20#include <unordered_map>
21#include <unordered_set>
40 std::condition_variable
cv;
49 std::priority_queue<Event, std::vector<Event>, std::function<bool(
const Event &,
const Event &)>>
55 {
return static_cast<int>(a.priority) <
static_cast<int>(b.priority); })
62 std::unordered_map<EventType, std::unordered_set<std::uint32_t>>
m_subscribers;
140 std::to_string(
static_cast<std::uint32_t
>(event.
type)),
156 std::to_string(
static_cast<std::uint32_t
>(event.
type)) +
" from source " +
173 template <
typename T>
174 bool publish(
EventType type,
const T &data, std::uint32_t sourceId = 0, std::uint32_t targetId = 0,
177 Event event(type, sourceId, targetId, priority);
183 if constexpr (std::is_same_v<T, rnp::PacketWorldState>)
187 else if constexpr (std::is_same_v<T, std::vector<rnp::EventRecord>>)
191 else if constexpr (std::is_same_v<T, rnp::PacketLobbyCreate>)
195 else if constexpr (std::is_same_v<T, rnp::PacketLobbyCreateResponse>)
199 else if constexpr (std::is_same_v<T, rnp::PacketLobbyListResponse>)
203 else if constexpr (std::is_same_v<T, rnp::PacketLobbyJoin>)
207 else if constexpr (std::is_same_v<T, rnp::PacketLobbyJoinResponse>)
211 else if constexpr (std::is_same_v<T, rnp::PacketLobbyUpdate>)
215 else if constexpr (std::is_same_v<T, rnp::PacketGameStart>)
219 else if constexpr (std::is_same_v<T, std::string>)
223 else if constexpr (std::is_arithmetic_v<T>)
225 if constexpr (
sizeof(T) == 1)
227 serializer.
writeByte(
static_cast<std::uint8_t
>(data));
229 else if constexpr (
sizeof(T) == 2)
231 serializer.
writeUInt16(
static_cast<std::uint16_t
>(data));
233 else if constexpr (
sizeof(T) == 4)
235 if constexpr (std::is_floating_point_v<T>)
237 serializer.
writeFloat(
static_cast<float>(data));
241 serializer.
writeUInt32(
static_cast<std::uint32_t
>(data));
248 static_assert(std::is_same_v<T, std::vector<std::uint8_t>>,
249 "Unsupported data type for EventBus serialization");
254 event.data = serializer.
getData();
257 catch (
const std::exception &e)
260 std::to_string(
static_cast<std::uint32_t
>(type)) +
" - " + e.what(),
271 std::vector<Event>
consume(std::uint32_t maxEvents = 100)
273 std::vector<Event> events;
277 std::uint32_t count = 0;
284 if (event.hasExpired())
290 events.push_back(event);
295 for (
const auto &event : events)
309 std::vector<Event>
waitForEvents(std::chrono::milliseconds timeout = std::chrono::milliseconds(100),
310 std::uint32_t maxEvents = 100)
315 if (
m_eventQueue.
cv.wait_for(lock, timeout, [
this] { return !m_eventQueue.events.empty(); }))
331 std::vector<Event> allEvents =
consume(maxEvents);
332 std::vector<Event> filteredEvents;
334 for (
const auto &event : allEvents)
336 if (event.type == type)
338 filteredEvents.push_back(event);
348 return filteredEvents;
359 std::vector<Event> allEvents =
consume(maxEvents);
360 std::vector<Event> filteredEvents;
362 for (
const auto &event : allEvents)
364 if (event.targetId == targetId || event.isBroadcast())
366 filteredEvents.push_back(event);
376 return filteredEvents;
389 utl::Logger::log(
"EventBus: Component " + std::to_string(componentId) +
" subscribed to event type " +
390 std::to_string(
static_cast<std::uint32_t
>(type)),
405 it->second.erase(componentId);
406 if (it->second.empty())
413 " unsubscribed from event type " +
414 std::to_string(
static_cast<std::uint32_t
>(type)),
428 utl::Logger::log(
"EventBus: Registered component '" + name +
"' with ID " + std::to_string(componentId),
444 "' (ID: " + std::to_string(componentId) +
")",
454 pair.second.erase(componentId);
468 std::to_string(
static_cast<std::uint32_t
>(type)),
482 std::to_string(
static_cast<std::uint32_t
>(type)),
Event structures and types for event-driven communication.
This file contains the Logger class.
Network packet serializer for RNP protocol.
Binary serializer for RNP protocol packets.
void serializeLobbyCreateResponse(const PacketLobbyCreateResponse &packet)
Serialize LOBBY_CREATE_RESPONSE packet.
void serializeLobbyListResponse(const PacketLobbyListResponse &packet)
Serialize LOBBY_LIST_RESPONSE packet.
void writeByte(std::uint8_t value)
Write a single byte.
void writeString(const std::string &str, std::size_t maxLength)
Write a string with length prefix.
void serializeWorldState(const PacketWorldState &packet)
Serialize WORLD_STATE packet.
void writeUInt16(std::uint16_t value)
Write a 16-bit integer (network byte order)
void serializeLobbyCreate(const PacketLobbyCreate &packet)
Serialize LOBBY_CREATE packet.
void serializeGameStart(const PacketGameStart &packet)
Serialize GAME_START packet.
void writeUInt32(std::uint32_t value)
Write a 32-bit integer (network byte order)
void writeFloat(float value)
Write a float (network byte order)
void serializeLobbyJoinResponse(const PacketLobbyJoinResponse &packet)
Serialize LOBBY_JOIN_RESPONSE packet.
const std::vector< std::uint8_t > & getData() const
Get the serialized data.
void serializeEntityEvents(const std::vector< EventRecord > &events)
Serialize multiple EventRecords for ENTITY_EVENT packet.
void serializeLobbyJoin(const PacketLobbyJoin &packet)
Serialize LOBBY_JOIN packet.
void serializeLobbyUpdate(const PacketLobbyUpdate &packet)
Serialize LOBBY_UPDATE packet.
Thread-safe event bus for decoupled component communication.
EventBus(EventBus &&)=delete
void filterEventType(EventType type)
Add event type to filter (filtered events will be dropped)
bool publish(const Event &event)
Publish an event to the bus.
std::mutex m_componentMutex
std::vector< Event > waitForEvents(std::chrono::milliseconds timeout=std::chrono::milliseconds(100), std::uint32_t maxEvents=100)
Consume events with timeout (blocking)
bool isRunning() const
Check if event bus is running.
std::vector< Event > consumeForTarget(std::uint32_t targetId, std::uint32_t maxEvents=100)
Consume events targeted to specific component.
static EventBus & getInstance()
Get singleton instance of EventBus.
void unsubscribe(std::uint32_t componentId, EventType type)
Unsubscribe component from specific event type.
void clear()
Clear all events from queue.
void unfilterEventType(EventType type)
Remove event type from filter.
void updateStats(const Event &event, bool isPublish, bool isExpired=false)
Update statistics for an event.
EventBus(const EventBus &)=delete
EventBus & operator=(const EventBus &)=delete
std::atomic< std::uint64_t > m_maxQueueSize
void setMaxQueueSize(std::uint64_t maxSize)
Set maximum queue size.
void registerComponent(std::uint32_t componentId, const std::string &name)
Register component name for better debugging.
std::vector< Event > consumeType(EventType type, std::uint32_t maxEvents=100)
Consume events of specific type.
std::unordered_map< std::uint32_t, std::string > m_componentNames
std::unordered_set< EventType > m_filteredTypes
std::unordered_map< EventType, std::unordered_set< std::uint32_t > > m_subscribers
void unregisterComponent(std::uint32_t componentId)
Unregister component.
std::mutex m_subscribersMutex
std::uint64_t getQueueSize() const
Get current queue size.
std::atomic< bool > m_running
bool publish(EventType type, const T &data, std::uint32_t sourceId=0, std::uint32_t targetId=0, EventPriority priority=EventPriority::NORMAL)
Publish event with serialized data.
std::vector< Event > consume(std::uint32_t maxEvents=100)
Consume events from the bus (non-blocking)
EventBus & operator=(EventBus &&)=delete
void stop()
Stop the event bus.
void subscribe(std::uint32_t componentId, EventType type)
Subscribe component to specific event types.
void clearStats()
Clear all statistics.
PriorityEventQueue m_priorityQueue
EventStats getStats() const
Get event bus statistics.
Event statistics structure.
std::uint64_t totalEventsConsumed
Total events consumed.
std::unordered_map< std::uint32_t, std::uint64_t > targetCount
Count per target component.
std::uint64_t totalEventsPublished
Total events published.
std::uint64_t currentQueueSize
Current queue size.
std::unordered_map< EventPriority, std::uint64_t > priorityCount
Count per priority.
std::unordered_map< EventType, std::uint64_t > eventTypeCount
Count per event type.
std::uint64_t totalEventsExpired
Total events that expired.
void reset()
Reset all statistics.
std::unordered_map< std::uint32_t, std::uint64_t > sourceCount
Count per source component.
Event structure for inter-component communication.
std::uint32_t sourceId
ID of the component that sent the event (0 = system)
EventType type
Type of the event.
std::uint32_t targetId
ID of the target component (0 = broadcast)
static void log(const std::string &message, const LogLevel &logLevel)
EventType
Event types for inter-component communication.
EventPriority
Event priority levels.
Internal event queue structure.
std::queue< Event > events
std::atomic< std::uint64_t > totalEvents
std::condition_variable cv
Priority queue for events.
std::priority_queue< Event, std::vector< Event >, std::function< bool(const Event &, const Event &)> > events