8#ifndef SEN_CORE_OBJ_DETAIL_EVENT_BUFFER_H
9#define SEN_CORE_OBJ_DETAIL_EVENT_BUFFER_H
44using SerializationFunc = std::function<void(
OutputStream&)>;
47struct SerializableEvent
49 MemberHash eventId = 0U;
50 TimeStamp creationTime;
51 SerializationFunc serializeFunc =
nullptr;
54 uint32_t serializedSize = 0U;
62class SerializableEventQueue
65 SEN_NOCOPY_NOMOVE(SerializableEventQueue)
68 SerializableEventQueue(std::size_t maxSize,
bool dropOldest): maxSize_(maxSize), dropOldest_(dropOldest) {}
70 ~SerializableEventQueue() =
default;
74 void push(SerializableEvent&& event);
75 void clear() { queue_.clear(); }
76 [[nodiscard]]
const std::list<SerializableEvent>& getContents() const noexcept {
return queue_; }
79 std::list<SerializableEvent> queue_;
81 std::size_t overflowCount_ = 0U;
93template <
typename... T>
94class EventBuffer final
96 SEN_MOVE_ONLY(EventBuffer)
102 EventBuffer() =
default;
103 ~EventBuffer() =
default;
107 [[nodiscard]] ConnectionGuard addConnection(Object* source, Callback callback, ConnId
id);
110 [[nodiscard]]
bool removeConnection(ConnId
id);
114 void produce(
Emit emissionMode,
116 TimeStamp creationTime,
119 bool addToTransportQueue,
120 SerializableEventQueue* transportQueue,
126 void dispatchFromStream(MemberHash eventId,
127 TimeStamp creationTime,
130 const Span<const uint8_t>& buffer,
131 Object* producer)
const;
134 void dispatch(MemberHash eventId,
135 TimeStamp creationTime,
138 bool addToTransportQueue,
139 SerializableEventQueue* transportQueue,
144 void immediateDispatch(MemberHash eventId,
145 TimeStamp creationTime,
152 std::vector<ConnId> ids_;
153 std::list<std::shared_ptr<Callback>> callbacks_;
165inline void addVariantToList(
const T& val,
VarList& list)
168 VariantTraits<T>::valueToVariant(val, list.back());
177inline void SerializableEventQueue::push(SerializableEvent&& event)
181 queue_.push_back(std::move(event));
185 if (queue_.size() < maxSize_)
187 queue_.push_back(std::move(event));
194 queue_.push_back(std::move(event));
204template <
typename... T>
205inline ConnectionGuard EventBuffer<T...>::addConnection(Object* source, Callback callback, ConnId
id)
208 callbacks_.push_back(std::make_shared<
decltype(callback)>(std::move(callback)));
209 return {source,
id.get(), 0U,
true};
212template <
typename... T>
213inline bool EventBuffer<T...>::removeConnection(ConnId
id)
215 for (
typename decltype(ids_)::size_type i = 0U; i < ids_.size(); ++i)
219 ids_.erase(ids_.begin() + i);
221 auto itr = callbacks_.begin();
222 for (std::size_t pos = 0; pos < i; ++pos)
230 if (
auto callbackLock = (*itr)->lock(); callbackLock.isValid())
232 callbackLock.invalidate();
235 callbacks_.erase(itr);
242template <
typename... T>
243inline void EventBuffer<T...>::produce(
Emit emissionMode,
245 TimeStamp creationTime,
248 bool addToTransportQueue,
249 SerializableEventQueue* transportQueue,
256 immediateDispatch(eventId, creationTime, transportMode, producer, queue, args...);
261 [
this, eventId, creationTime, producerId, transportMode, addToTransportQueue, transportQueue, producer, args...]()
264 eventId, creationTime, producerId, transportMode, addToTransportQueue, transportQueue, producer, args...);
266 impl::cannotBeDropped(transportMode));
270template <
typename... T>
271inline void EventBuffer<T...>::dispatch(MemberHash eventId,
272 TimeStamp creationTime,
275 bool addToTransportQueue,
276 SerializableEventQueue* transportQueue,
280 EventInfo info {creationTime};
282 for (
const auto& callback: callbacks_)
284 if (
auto callbackLock = callback->lock(); callbackLock.isValid())
286#if SEN_GCC_VERSION_CHECK_SMALLER(12, 4, 0)
288# pragma GCC diagnostic push
289# pragma GCC diagnostic ignored "-Wmaybe-uninitialized"
290 auto work = [cb = callback, info, args...]() { cb->invoke(info, args...); };
291# pragma GCC diagnostic pop
294 auto work = [cb = callback, info, args...]() { cb->invoke(info, args...); };
296 callbackLock.pushAnswer(std::move(work), impl::cannotBeDropped(transportMode));
300 if (addToTransportQueue)
302 uint32_t serializedSize = 0U;
303 if constexpr (
sizeof...(T) != 0U)
305 serializedSize = (... + SerializationTraits<T>::serializedSize(args));
308 transportQueue->push({eventId,
313 (SerializationTraits<T>::write(out, args), ...);
320 producer->senImplEventEmitted(
325 result.reserve(sizeof...(T));
326 (addVariantToList<T>(args, result), ...);
332template <
typename... T>
333inline void EventBuffer<T...>::immediateDispatch(MemberHash eventId,
334 TimeStamp creationTime,
340 EventInfo info {creationTime};
342 for (
auto& callback: callbacks_)
344 if (
auto callbackLock = callback->lock(); callbackLock.isValid())
346 if (callbackLock.isSameQueue(queue))
348 callback->invoke(info, args...);
352 auto work = [cb = callback, info, args...]() { cb->invoke(info, args...); };
353 callbackLock.pushAnswer(std::move(work), impl::cannotBeDropped(transportMode));
358 producer->senImplEventEmitted(
363 result.reserve(sizeof...(T));
364 (addVariantToList<T>(args, result), ...);
370template <
typename... T>
371inline void EventBuffer<T...>::dispatchFromStream(MemberHash eventId,
372 TimeStamp creationTime,
375 const Span<const uint8_t>& buffer,
376 Object* producer)
const
378 std::tuple<T...> argValues = {};
383 std::apply([&in](
auto&&... arg)
384 { ((SerializationTraits<std::remove_reference_t<
decltype(arg)>>::read(in, arg)), ...); },
389 &EventBuffer::dispatch,
390 std::tuple_cat(std::make_tuple(
this, eventId, creationTime, producerId, transportMode,
false,
nullptr, producer),
391 std::move(argValues)));
Here we define a set of template meta-programming helpers to let the compiler take some decisions bas...
InputStreamTemplate< LittleEndian > InputStream
Definition input_stream.h:84
Emit
How to emit an event.
Definition object.h:51
Callback< EventInfo, Args... > EventCallback
An event callback.
Definition callback.h:208
@ now
Directly when it happens.
Definition object.h:52
std::conditional_t< std::is_arithmetic_v< T >||shouldBePassedByValueV< T >, T, AddConstRef< T > > MaybeRef
returns 'const T&' or 'T' depending on the type
Definition class_helpers.h:46
std::vector< Var > VarList
A list of vars to represent sequences.
Definition var.h:104
TransportMode
How to transport information.
Definition type.h:56
@ multicast
Directed to all receivers, unreliable, unordered, no congestion control.
Definition type.h:58
@ confirmed
Directed to each receiver, reliable, ordered, with congestion control, relatively heavyweight.
Definition type.h:59
detail::MoveOnlyFunctionImpl< FwdArgs... > move_only_function
Definition move_only_function.h:256
OutputStreamTemplate< LittleEndian > OutputStream
Definition output_stream.h:64