8#ifndef SEN_CORE_OBJ_DETAIL_EVENT_BUFFER_H
9#define SEN_CORE_OBJ_DETAIL_EVENT_BUFFER_H
44using SerializationFunc = std::function<void(
OutputStream&)>;
47struct SerializableEvent
49 MemberHash eventId = 0U;
50 TimeStamp creationTime;
51 SerializationFunc serializeFunc =
nullptr;
54 uint32_t serializedSize = 0U;
62class SerializableEventQueue
65 SEN_NOCOPY_NOMOVE(SerializableEventQueue)
68 SerializableEventQueue(std::size_t maxSize,
bool dropOldest): maxSize_(maxSize), dropOldest_(dropOldest) {}
70 ~SerializableEventQueue() =
default;
74 void push(SerializableEvent&& event);
75 void clear() { queue_.clear(); }
76 [[nodiscard]]
const std::list<SerializableEvent>& getContents() const noexcept {
return queue_; }
79 std::list<SerializableEvent> queue_;
81 std::size_t overflowCount_ = 0U;
93template <
typename... T>
94class EventBuffer final
96 SEN_MOVE_ONLY(EventBuffer)
102 EventBuffer() =
default;
103 ~EventBuffer() =
default;
107 [[nodiscard]] ConnectionGuard addConnection(Object* source, Callback callback, ConnId
id);
110 [[nodiscard]]
bool removeConnection(ConnId
id);
114 void produce(
Emit emissionMode,
116 TimeStamp creationTime,
119 bool addToTransportQueue,
120 SerializableEventQueue* transportQueue,
126 void dispatchFromStream(MemberHash eventId,
127 TimeStamp creationTime,
130 const Span<const uint8_t>& buffer,
131 Object* producer)
const;
134 void dispatch(MemberHash eventId,
135 TimeStamp creationTime,
138 bool addToTransportQueue,
139 SerializableEventQueue* transportQueue,
144 void immediateDispatch(MemberHash eventId,
145 TimeStamp creationTime,
154 using CallbackStorageType = std::shared_ptr<Callback>;
156 CallbackEntry(ConnId
id, CallbackStorageType callback): id_(id), callback_(std::move(callback)) {}
158 [[nodiscard]] ConnId getConnectionId() const noexcept {
return id_; }
159 const CallbackStorageType& getCallback() const noexcept {
return callback_; }
163 CallbackStorageType callback_;
165 std::vector<CallbackEntry> eventCallbacks_;
177inline void addVariantToList(
const T& val,
VarList& list)
180 VariantTraits<T>::valueToVariant(val, list.back());
189inline void SerializableEventQueue::push(SerializableEvent&& event)
193 queue_.push_back(std::move(event));
197 if (queue_.size() < maxSize_)
199 queue_.push_back(std::move(event));
206 queue_.push_back(std::move(event));
216template <
typename... T>
217inline ConnectionGuard EventBuffer<T...>::addConnection(Object* source, Callback callback, ConnId
id)
219 eventCallbacks_.emplace_back(
id, std::make_shared<
decltype(callback)>(std::move(callback)));
220 return {source,
id.get(), 0U,
true};
223template <
typename... T>
224inline bool EventBuffer<T...>::removeConnection(ConnId
id)
226 for (
auto itr = eventCallbacks_.begin(); itr != eventCallbacks_.end(); ++itr)
228 if (itr->getConnectionId() ==
id)
234 if (
auto callbackLock = itr->getCallback()->lock(); callbackLock.isValid())
236 callbackLock.invalidate();
239 eventCallbacks_.erase(itr);
247template <
typename... T>
248inline void EventBuffer<T...>::produce(
Emit emissionMode,
250 TimeStamp creationTime,
253 bool addToTransportQueue,
254 SerializableEventQueue* transportQueue,
261 immediateDispatch(eventId, creationTime, transportMode, producer, queue, args...);
266 [
this, eventId, creationTime, producerId, transportMode, addToTransportQueue, transportQueue, producer, args...]()
269 eventId, creationTime, producerId, transportMode, addToTransportQueue, transportQueue, producer, args...);
271 impl::cannotBeDropped(transportMode));
275template <
typename... T>
276inline void EventBuffer<T...>::dispatch(MemberHash eventId,
277 TimeStamp creationTime,
280 bool addToTransportQueue,
281 SerializableEventQueue* transportQueue,
285 EventInfo info {creationTime};
288 for (
const auto& callbackEntry: eventCallbacks_)
290 if (
auto callbackLock = callbackEntry.getCallback()->lock(); callbackLock.isValid())
292#if SEN_GCC_VERSION_CHECK_SMALLER(12, 4, 0)
294# pragma GCC diagnostic push
295# pragma GCC diagnostic ignored "-Wmaybe-uninitialized"
296 auto work = [cb = callbackEntry.getCallback(), info, args...]() { cb->invoke(info, args...); };
297# pragma GCC diagnostic pop
300 auto work = [cb = callbackEntry.getCallback(), info, args...]() { cb->invoke(info, args...); };
302 callbackLock.pushAnswer(std::move(work), impl::cannotBeDropped(transportMode));
306 if (addToTransportQueue)
308 uint32_t serializedSize = 0U;
309 if constexpr (
sizeof...(T) != 0U)
311 serializedSize = (... + SerializationTraits<T>::serializedSize(args));
314 transportQueue->push({eventId,
319 (SerializationTraits<T>::write(out, args), ...);
326 producer->senImplEventEmitted(
331 result.reserve(sizeof...(T));
332 (addVariantToList<T>(args, result), ...);
338template <
typename... T>
339inline void EventBuffer<T...>::immediateDispatch(MemberHash eventId,
340 TimeStamp creationTime,
346 EventInfo info {creationTime};
348 for (
auto& callbackEntry: eventCallbacks_)
350 if (
auto callbackLock = callbackEntry.getCallback()->lock(); callbackLock.isValid())
352 if (callbackLock.isSameQueue(queue))
354 callbackEntry.getCallback()->invoke(info, args...);
358 auto work = [cb = callbackEntry.getCallback(), info, args...]() { cb->invoke(info, args...); };
359 callbackLock.pushAnswer(std::move(work), impl::cannotBeDropped(transportMode));
364 producer->senImplEventEmitted(
369 result.reserve(sizeof...(T));
370 (addVariantToList<T>(args, result), ...);
376template <
typename... T>
377inline void EventBuffer<T...>::dispatchFromStream(MemberHash eventId,
378 TimeStamp creationTime,
381 const Span<const uint8_t>& buffer,
382 Object* producer)
const
384 std::tuple<T...> argValues = {};
389 std::apply([&in](
auto&&... arg)
390 { ((SerializationTraits<std::remove_reference_t<
decltype(arg)>>::read(in, arg)), ...); },
395 &EventBuffer::dispatch,
396 std::tuple_cat(std::make_tuple(
this, eventId, creationTime, producerId, transportMode,
false,
nullptr, producer),
397 std::move(argValues)));
Here we define a set of template meta-programming helpers to let the compiler take some decisions bas...
InputStreamTemplate< LittleEndian > InputStream
Definition input_stream.h:84
Emit
How to emit an event.
Definition object.h:51
Callback< EventInfo, Args... > EventCallback
An event callback.
Definition callback.h:208
@ now
Directly when it happens.
Definition object.h:52
std::conditional_t< std::is_arithmetic_v< T >||shouldBePassedByValueV< T >, T, AddConstRef< T > > MaybeRef
returns 'const T&' or 'T' depending on the type
Definition class_helpers.h:46
std::vector< Var > VarList
A list of vars to represent sequences.
Definition var.h:104
TransportMode
How to transport information.
Definition type.h:56
@ multicast
Directed to all receivers, unreliable, unordered, no congestion control.
Definition type.h:58
@ confirmed
Directed to each receiver, reliable, ordered, with congestion control, relatively heavyweight.
Definition type.h:59
detail::MoveOnlyFunctionImpl< FwdArgs... > move_only_function
Definition move_only_function.h:256
OutputStreamTemplate< LittleEndian > OutputStream
Definition output_stream.h:64