Sen API
Sen Libraries
Loading...
Searching...
No Matches
event_buffer.h
Go to the documentation of this file.
1// === event_buffer.h ==================================================================================================
2// Sen Infrastructure
3// Released under the Apache License v2.0 (SPDX-License-Identifier Apache-2.0).
4// See the LICENSE.txt file for more information.
5// © Airbus SAS, Airbus Helicopters, and Airbus Defence and Space SAU/GmbH/SAS.
6// =====================================================================================================================
7
8#ifndef SEN_CORE_OBJ_DETAIL_EVENT_BUFFER_H
9#define SEN_CORE_OBJ_DETAIL_EVENT_BUFFER_H
10
11// sen
15#include "sen/core/base/span.h"
19#include "sen/core/meta/type.h"
20#include "sen/core/meta/var.h"
24#include "sen/core/obj/object.h"
25
26// std
27#include <cstddef>
28#include <cstdint>
29#include <functional>
30#include <list>
31#include <memory>
32#include <tuple>
33#include <type_traits>
34#include <utility>
35#include <vector>
36
37namespace sen::impl
38{
39
40//--------------------------------------------------------------------------------------------------------------
41// SerializableEvent
42//--------------------------------------------------------------------------------------------------------------
43
44using SerializationFunc = std::function<void(OutputStream&)>;
45
47struct SerializableEvent
48{
49 MemberHash eventId = 0U;
50 TimeStamp creationTime;
51 SerializationFunc serializeFunc = nullptr;
52 ObjectId producerId;
54 uint32_t serializedSize = 0U;
55};
56
57//--------------------------------------------------------------------------------------------------------------
58// SerializableEventQueue
59//--------------------------------------------------------------------------------------------------------------
60
62class SerializableEventQueue
63{
64public:
65 SEN_NOCOPY_NOMOVE(SerializableEventQueue)
66
67public:
68 SerializableEventQueue(std::size_t maxSize, bool dropOldest): maxSize_(maxSize), dropOldest_(dropOldest) {}
69
70 ~SerializableEventQueue() = default;
71
72public:
73 void setOnDropped(std_util::move_only_function<void()>&& func) { onDropped_ = std::move(func); }
74 void push(SerializableEvent&& event);
75 void clear() { queue_.clear(); }
76 [[nodiscard]] const std::list<SerializableEvent>& getContents() const noexcept { return queue_; }
77
78private:
79 std::list<SerializableEvent> queue_;
80 std::size_t maxSize_;
81 std::size_t overflowCount_ = 0U;
82 bool dropOldest_;
83 std_util::move_only_function<void()> onDropped_ = []() {};
84};
85
86//--------------------------------------------------------------------------------------------------------------
87// EventBuffer
88//--------------------------------------------------------------------------------------------------------------
89
93template <typename... T>
94class EventBuffer final
95{
96 SEN_MOVE_ONLY(EventBuffer)
97
98public:
99 using Callback = EventCallback<T...>;
100
101public: // special members
102 EventBuffer() = default;
103 ~EventBuffer() = default;
104
105public:
107 [[nodiscard]] ConnectionGuard addConnection(Object* source, Callback callback, ConnId id);
108
110 [[nodiscard]] bool removeConnection(ConnId id);
111
114 void produce(Emit emissionMode,
115 MemberHash eventId,
116 TimeStamp creationTime,
117 ObjectId producerId,
118 TransportMode transportMode,
119 bool addToTransportQueue,
120 SerializableEventQueue* transportQueue,
121 WorkQueue* queue,
122 Object* producer,
123 MaybeRef<T>... args) const;
124
126 void dispatchFromStream(MemberHash eventId,
127 TimeStamp creationTime,
128 ObjectId producerId,
129 TransportMode transportMode,
130 const Span<const uint8_t>& buffer,
131 Object* producer) const;
132
134 void dispatch(MemberHash eventId,
135 TimeStamp creationTime,
136 ObjectId producerId,
137 TransportMode transportMode,
138 bool addToTransportQueue,
139 SerializableEventQueue* transportQueue,
140 Object* producer,
141 MaybeRef<T>... args) const;
142
144 void immediateDispatch(MemberHash eventId,
145 TimeStamp creationTime,
146 TransportMode transportMode,
147 Object* producer,
148 WorkQueue* queue,
149 MaybeRef<T>... args) const;
150
151private:
152 struct CallbackEntry
153 {
154 using CallbackStorageType = std::shared_ptr<Callback>;
155
156 CallbackEntry(ConnId id, CallbackStorageType callback): id_(id), callback_(std::move(callback)) {}
157
158 [[nodiscard]] ConnId getConnectionId() const noexcept { return id_; }
159 const CallbackStorageType& getCallback() const noexcept { return callback_; }
160
161 private:
162 ConnId id_;
163 CallbackStorageType callback_;
164 };
165 std::vector<CallbackEntry> eventCallbacks_;
166};
167
168//----------------------------------------------------------------------------------------------------------------------
169// Inline implementation
170//----------------------------------------------------------------------------------------------------------------------
171
172//--------------------------------------------------------------------------------------------------------------
173// Helpers
174//--------------------------------------------------------------------------------------------------------------
175
176template <typename T>
177inline void addVariantToList(const T& val, VarList& list)
178{
179 list.emplace_back();
180 VariantTraits<T>::valueToVariant(val, list.back());
181}
182
183[[nodiscard]] inline bool cannotBeDropped(TransportMode mode) noexcept { return mode == TransportMode::confirmed; }
184
185//--------------------------------------------------------------------------------------------------------------
186// SerializableEventQueue
187//--------------------------------------------------------------------------------------------------------------
188
189inline void SerializableEventQueue::push(SerializableEvent&& event)
190{
191 if (maxSize_ == 0)
192 {
193 queue_.push_back(std::move(event));
194 return;
195 }
196
197 if (queue_.size() < maxSize_)
198 {
199 queue_.push_back(std::move(event));
200 return;
201 }
202
203 if (dropOldest_)
204 {
205 queue_.pop_front();
206 queue_.push_back(std::move(event));
207 }
208
209 onDropped_();
210}
211
212//--------------------------------------------------------------------------------------------------------------
213// EventBuffer
214//--------------------------------------------------------------------------------------------------------------
215
216template <typename... T>
217inline ConnectionGuard EventBuffer<T...>::addConnection(Object* source, Callback callback, ConnId id)
218{
219 eventCallbacks_.emplace_back(id, std::make_shared<decltype(callback)>(std::move(callback)));
220 return {source, id.get(), 0U, true};
221}
222
223template <typename... T>
224inline bool EventBuffer<T...>::removeConnection(ConnId id)
225{
226 for (auto itr = eventCallbacks_.begin(); itr != eventCallbacks_.end(); ++itr)
227 {
228 if (itr->getConnectionId() == id)
229 {
230
231 // the callback might survive the erasure from this list when
232 // enqueued in some runner. Therefore, we need to explicitly cancel it,
233 // so that it doesn't get invoked from now on.
234 if (auto callbackLock = itr->getCallback()->lock(); callbackLock.isValid())
235 {
236 callbackLock.invalidate();
237 }
238
239 eventCallbacks_.erase(itr);
240 return true;
241 }
242 }
243
244 return false;
245}
246
247template <typename... T>
248inline void EventBuffer<T...>::produce(Emit emissionMode,
249 MemberHash eventId,
250 TimeStamp creationTime,
251 ObjectId producerId,
252 TransportMode transportMode,
253 bool addToTransportQueue,
254 SerializableEventQueue* transportQueue,
255 WorkQueue* queue,
256 Object* producer,
257 MaybeRef<T>... args) const
258{
259 if (emissionMode == Emit::now)
260 {
261 immediateDispatch(eventId, creationTime, transportMode, producer, queue, args...);
262 }
263 else
264 {
265 queue->push(
266 [this, eventId, creationTime, producerId, transportMode, addToTransportQueue, transportQueue, producer, args...]()
267 {
268 dispatch(
269 eventId, creationTime, producerId, transportMode, addToTransportQueue, transportQueue, producer, args...);
270 },
271 impl::cannotBeDropped(transportMode));
272 }
273}
274
275template <typename... T>
276inline void EventBuffer<T...>::dispatch(MemberHash eventId,
277 TimeStamp creationTime,
278 ObjectId producerId,
279 TransportMode transportMode,
280 bool addToTransportQueue,
281 SerializableEventQueue* transportQueue,
282 Object* producer,
283 MaybeRef<T>... args) const
284{
285 EventInfo info {creationTime};
286
287 // for (const auto& callback: callbacks_)
288 for (const auto& callbackEntry: eventCallbacks_)
289 {
290 if (auto callbackLock = callbackEntry.getCallback()->lock(); callbackLock.isValid())
291 {
292#if SEN_GCC_VERSION_CHECK_SMALLER(12, 4, 0)
293 // TODO (SEN-717): clean up with gcc12.4 on debian
294# pragma GCC diagnostic push
295# pragma GCC diagnostic ignored "-Wmaybe-uninitialized"
296 auto work = [cb = callbackEntry.getCallback(), info, args...]() { cb->invoke(info, args...); };
297# pragma GCC diagnostic pop
298#else
299 // Note: if modified, patch line below
300 auto work = [cb = callbackEntry.getCallback(), info, args...]() { cb->invoke(info, args...); };
301#endif
302 callbackLock.pushAnswer(std::move(work), impl::cannotBeDropped(transportMode));
303 }
304 }
305
306 if (addToTransportQueue)
307 {
308 uint32_t serializedSize = 0U;
309 if constexpr (sizeof...(T) != 0U)
310 {
311 serializedSize = (... + SerializationTraits<T>::serializedSize(args));
312 }
313
314 transportQueue->push({eventId,
315 creationTime,
316 [args...](OutputStream& out)
317 {
318 std::ignore = out;
319 (SerializationTraits<T>::write(out, args), ...);
320 },
321 producerId,
322 transportMode,
323 serializedSize});
324 }
325
326 producer->senImplEventEmitted(
327 eventId,
328 [args...]()
329 {
330 VarList result;
331 result.reserve(sizeof...(T));
332 (addVariantToList<T>(args, result), ...);
333 return result;
334 },
335 info);
336}
337
338template <typename... T>
339inline void EventBuffer<T...>::immediateDispatch(MemberHash eventId,
340 TimeStamp creationTime,
341 TransportMode transportMode,
342 Object* producer,
343 WorkQueue* queue,
344 MaybeRef<T>... args) const
345{
346 EventInfo info {creationTime};
347
348 for (auto& callbackEntry: eventCallbacks_)
349 {
350 if (auto callbackLock = callbackEntry.getCallback()->lock(); callbackLock.isValid())
351 {
352 if (callbackLock.isSameQueue(queue))
353 {
354 callbackEntry.getCallback()->invoke(info, args...);
355 }
356 else
357 {
358 auto work = [cb = callbackEntry.getCallback(), info, args...]() { cb->invoke(info, args...); };
359 callbackLock.pushAnswer(std::move(work), impl::cannotBeDropped(transportMode));
360 }
361 }
362 }
363
364 producer->senImplEventEmitted(
365 eventId,
366 [args...]()
367 {
368 VarList result;
369 result.reserve(sizeof...(T));
370 (addVariantToList<T>(args, result), ...);
371 return result;
372 },
373 info);
374}
375
376template <typename... T>
377inline void EventBuffer<T...>::dispatchFromStream(MemberHash eventId,
378 TimeStamp creationTime,
379 ObjectId producerId,
380 TransportMode transportMode,
381 const Span<const uint8_t>& buffer,
382 Object* producer) const
383{
384 std::tuple<T...> argValues = {};
385
386 InputStream in(buffer);
387
388 // deserialize into the tuple
389 std::apply([&in](auto&&... arg)
390 { ((SerializationTraits<std::remove_reference_t<decltype(arg)>>::read(in, arg)), ...); },
391 argValues);
392
393 // call emit with the tuple arguments
394 std::apply(
395 &EventBuffer::dispatch,
396 std::tuple_cat(std::make_tuple(this, eventId, creationTime, producerId, transportMode, false, nullptr, producer),
397 std::move(argValues)));
398}
399
400} // namespace sen::impl
401
402#endif // SEN_CORE_OBJ_DETAIL_EVENT_BUFFER_H
Here we define a set of template meta-programming helpers to let the compiler take some decisions bas...
InputStreamTemplate< LittleEndian > InputStream
Definition input_stream.h:84
Emit
How to emit an event.
Definition object.h:51
Callback< EventInfo, Args... > EventCallback
An event callback.
Definition callback.h:208
@ now
Directly when it happens.
Definition object.h:52
std::conditional_t< std::is_arithmetic_v< T >||shouldBePassedByValueV< T >, T, AddConstRef< T > > MaybeRef
returns 'const T&' or 'T' depending on the type
Definition class_helpers.h:46
std::vector< Var > VarList
A list of vars to represent sequences.
Definition var.h:104
TransportMode
How to transport information.
Definition type.h:56
@ multicast
Directed to all receivers, unreliable, unordered, no congestion control.
Definition type.h:58
@ confirmed
Directed to each receiver, reliable, ordered, with congestion control, relatively heavyweight.
Definition type.h:59
detail::MoveOnlyFunctionImpl< FwdArgs... > move_only_function
Definition move_only_function.h:256
OutputStreamTemplate< LittleEndian > OutputStream
Definition output_stream.h:64