Sen API
Sen Libraries
Loading...
Searching...
No Matches
event_buffer.h
Go to the documentation of this file.
1// === event_buffer.h ==================================================================================================
2// Sen Infrastructure
3// Released under the Apache License v2.0 (SPDX-License-Identifier Apache-2.0).
4// See the LICENSE.txt file for more information.
5// © Airbus SAS, Airbus Helicopters, and Airbus Defence and Space SAU/GmbH/SAS.
6// =====================================================================================================================
7
8#ifndef SEN_CORE_OBJ_DETAIL_EVENT_BUFFER_H
9#define SEN_CORE_OBJ_DETAIL_EVENT_BUFFER_H
10
11// sen
15#include "sen/core/base/span.h"
19#include "sen/core/meta/type.h"
20#include "sen/core/meta/var.h"
24#include "sen/core/obj/object.h"
25
26// std
27#include <cstddef>
28#include <cstdint>
29#include <functional>
30#include <list>
31#include <memory>
32#include <tuple>
33#include <type_traits>
34#include <utility>
35#include <vector>
36
37namespace sen::impl
38{
39
40//--------------------------------------------------------------------------------------------------------------
41// SerializableEvent
42//--------------------------------------------------------------------------------------------------------------
43
44using SerializationFunc = std::function<void(OutputStream&)>;
45
47struct SerializableEvent
48{
49 MemberHash eventId = 0U;
50 TimeStamp creationTime;
51 SerializationFunc serializeFunc = nullptr;
52 ObjectId producerId;
54 uint32_t serializedSize = 0U;
55};
56
57//--------------------------------------------------------------------------------------------------------------
58// SerializableEventQueue
59//--------------------------------------------------------------------------------------------------------------
60
62class SerializableEventQueue
63{
64public:
65 SEN_NOCOPY_NOMOVE(SerializableEventQueue)
66
67public:
68 SerializableEventQueue(std::size_t maxSize, bool dropOldest): maxSize_(maxSize), dropOldest_(dropOldest) {}
69
70 ~SerializableEventQueue() = default;
71
72public:
73 void setOnDropped(std_util::move_only_function<void()>&& func) { onDropped_ = std::move(func); }
74 void push(SerializableEvent&& event);
75 void clear() { queue_.clear(); }
76 [[nodiscard]] const std::list<SerializableEvent>& getContents() const noexcept { return queue_; }
77
78private:
79 std::list<SerializableEvent> queue_;
80 std::size_t maxSize_;
81 std::size_t overflowCount_ = 0U;
82 bool dropOldest_;
83 std_util::move_only_function<void()> onDropped_ = []() {};
84};
85
86//--------------------------------------------------------------------------------------------------------------
87// EventBuffer
88//--------------------------------------------------------------------------------------------------------------
89
93template <typename... T>
94class EventBuffer final
95{
96 SEN_MOVE_ONLY(EventBuffer)
97
98public:
99 using Callback = EventCallback<T...>;
100
101public: // special members
102 EventBuffer() = default;
103 ~EventBuffer() = default;
104
105public:
107 [[nodiscard]] ConnectionGuard addConnection(Object* source, Callback callback, ConnId id);
108
110 [[nodiscard]] bool removeConnection(ConnId id);
111
114 void produce(Emit emissionMode,
115 MemberHash eventId,
116 TimeStamp creationTime,
117 ObjectId producerId,
118 TransportMode transportMode,
119 bool addToTransportQueue,
120 SerializableEventQueue* transportQueue,
121 WorkQueue* queue,
122 Object* producer,
123 MaybeRef<T>... args) const;
124
126 void dispatchFromStream(MemberHash eventId,
127 TimeStamp creationTime,
128 ObjectId producerId,
129 TransportMode transportMode,
130 const Span<const uint8_t>& buffer,
131 Object* producer) const;
132
134 void dispatch(MemberHash eventId,
135 TimeStamp creationTime,
136 ObjectId producerId,
137 TransportMode transportMode,
138 bool addToTransportQueue,
139 SerializableEventQueue* transportQueue,
140 Object* producer,
141 MaybeRef<T>... args) const;
142
144 void immediateDispatch(MemberHash eventId,
145 TimeStamp creationTime,
146 TransportMode transportMode,
147 Object* producer,
148 WorkQueue* queue,
149 MaybeRef<T>... args) const;
150
151private:
152 std::vector<ConnId> ids_;
153 std::list<std::shared_ptr<Callback>> callbacks_;
154};
155
156//----------------------------------------------------------------------------------------------------------------------
157// Inline implementation
158//----------------------------------------------------------------------------------------------------------------------
159
160//--------------------------------------------------------------------------------------------------------------
161// Helpers
162//--------------------------------------------------------------------------------------------------------------
163
164template <typename T>
165inline void addVariantToList(const T& val, VarList& list)
166{
167 list.emplace_back();
168 VariantTraits<T>::valueToVariant(val, list.back());
169}
170
171[[nodiscard]] inline bool cannotBeDropped(TransportMode mode) noexcept { return mode == TransportMode::confirmed; }
172
173//--------------------------------------------------------------------------------------------------------------
174// SerializableEventQueue
175//--------------------------------------------------------------------------------------------------------------
176
177inline void SerializableEventQueue::push(SerializableEvent&& event)
178{
179 if (maxSize_ == 0)
180 {
181 queue_.push_back(std::move(event));
182 return;
183 }
184
185 if (queue_.size() < maxSize_)
186 {
187 queue_.push_back(std::move(event));
188 return;
189 }
190
191 if (dropOldest_)
192 {
193 queue_.pop_front();
194 queue_.push_back(std::move(event));
195 }
196
197 onDropped_();
198}
199
200//--------------------------------------------------------------------------------------------------------------
201// EventBuffer
202//--------------------------------------------------------------------------------------------------------------
203
204template <typename... T>
205inline ConnectionGuard EventBuffer<T...>::addConnection(Object* source, Callback callback, ConnId id)
206{
207 ids_.push_back(id);
208 callbacks_.push_back(std::make_shared<decltype(callback)>(std::move(callback)));
209 return {source, id.get(), 0U, true};
210}
211
212template <typename... T>
213inline bool EventBuffer<T...>::removeConnection(ConnId id)
214{
215 for (typename decltype(ids_)::size_type i = 0U; i < ids_.size(); ++i)
216 {
217 if (ids_[i] == id)
218 {
219 ids_.erase(ids_.begin() + i);
220
221 auto itr = callbacks_.begin();
222 for (std::size_t pos = 0; pos < i; ++pos)
223 {
224 ++itr;
225 }
226
227 // the callback might survive the erasure from this list when
228 // enqueued in some runner. Therefore, we need to explicitly cancel it,
229 // so that it doesn't get invoked from now on.
230 if (auto callbackLock = (*itr)->lock(); callbackLock.isValid())
231 {
232 callbackLock.invalidate();
233 }
234
235 callbacks_.erase(itr);
236 return true;
237 }
238 }
239 return false;
240}
241
242template <typename... T>
243inline void EventBuffer<T...>::produce(Emit emissionMode,
244 MemberHash eventId,
245 TimeStamp creationTime,
246 ObjectId producerId,
247 TransportMode transportMode,
248 bool addToTransportQueue,
249 SerializableEventQueue* transportQueue,
250 WorkQueue* queue,
251 Object* producer,
252 MaybeRef<T>... args) const
253{
254 if (emissionMode == Emit::now)
255 {
256 immediateDispatch(eventId, creationTime, transportMode, producer, queue, args...);
257 }
258 else
259 {
260 queue->push(
261 [this, eventId, creationTime, producerId, transportMode, addToTransportQueue, transportQueue, producer, args...]()
262 {
263 dispatch(
264 eventId, creationTime, producerId, transportMode, addToTransportQueue, transportQueue, producer, args...);
265 },
266 impl::cannotBeDropped(transportMode));
267 }
268}
269
270template <typename... T>
271inline void EventBuffer<T...>::dispatch(MemberHash eventId,
272 TimeStamp creationTime,
273 ObjectId producerId,
274 TransportMode transportMode,
275 bool addToTransportQueue,
276 SerializableEventQueue* transportQueue,
277 Object* producer,
278 MaybeRef<T>... args) const
279{
280 EventInfo info {creationTime};
281
282 for (const auto& callback: callbacks_)
283 {
284 if (auto callbackLock = callback->lock(); callbackLock.isValid())
285 {
286#if SEN_GCC_VERSION_CHECK_SMALLER(12, 4, 0)
287 // TODO (SEN-717): clean up with gcc12.4 on debian
288# pragma GCC diagnostic push
289# pragma GCC diagnostic ignored "-Wmaybe-uninitialized"
290 auto work = [cb = callback, info, args...]() { cb->invoke(info, args...); };
291# pragma GCC diagnostic pop
292#else
293 // Note: if modified, patch line below
294 auto work = [cb = callback, info, args...]() { cb->invoke(info, args...); };
295#endif
296 callbackLock.pushAnswer(std::move(work), impl::cannotBeDropped(transportMode));
297 }
298 }
299
300 if (addToTransportQueue)
301 {
302 uint32_t serializedSize = 0U;
303 if constexpr (sizeof...(T) != 0U)
304 {
305 serializedSize = (... + SerializationTraits<T>::serializedSize(args));
306 }
307
308 transportQueue->push({eventId,
309 creationTime,
310 [args...](OutputStream& out)
311 {
312 std::ignore = out;
313 (SerializationTraits<T>::write(out, args), ...);
314 },
315 producerId,
316 transportMode,
317 serializedSize});
318 }
319
320 producer->senImplEventEmitted(
321 eventId,
322 [args...]()
323 {
324 VarList result;
325 result.reserve(sizeof...(T));
326 (addVariantToList<T>(args, result), ...);
327 return result;
328 },
329 info);
330}
331
332template <typename... T>
333inline void EventBuffer<T...>::immediateDispatch(MemberHash eventId,
334 TimeStamp creationTime,
335 TransportMode transportMode,
336 Object* producer,
337 WorkQueue* queue,
338 MaybeRef<T>... args) const
339{
340 EventInfo info {creationTime};
341
342 for (auto& callback: callbacks_)
343 {
344 if (auto callbackLock = callback->lock(); callbackLock.isValid())
345 {
346 if (callbackLock.isSameQueue(queue))
347 {
348 callback->invoke(info, args...);
349 }
350 else
351 {
352 auto work = [cb = callback, info, args...]() { cb->invoke(info, args...); };
353 callbackLock.pushAnswer(std::move(work), impl::cannotBeDropped(transportMode));
354 }
355 }
356 }
357
358 producer->senImplEventEmitted(
359 eventId,
360 [args...]()
361 {
362 VarList result;
363 result.reserve(sizeof...(T));
364 (addVariantToList<T>(args, result), ...);
365 return result;
366 },
367 info);
368}
369
370template <typename... T>
371inline void EventBuffer<T...>::dispatchFromStream(MemberHash eventId,
372 TimeStamp creationTime,
373 ObjectId producerId,
374 TransportMode transportMode,
375 const Span<const uint8_t>& buffer,
376 Object* producer) const
377{
378 std::tuple<T...> argValues = {};
379
380 InputStream in(buffer);
381
382 // deserialize into the tuple
383 std::apply([&in](auto&&... arg)
384 { ((SerializationTraits<std::remove_reference_t<decltype(arg)>>::read(in, arg)), ...); },
385 argValues);
386
387 // call emit with the tuple arguments
388 std::apply(
389 &EventBuffer::dispatch,
390 std::tuple_cat(std::make_tuple(this, eventId, creationTime, producerId, transportMode, false, nullptr, producer),
391 std::move(argValues)));
392}
393
394} // namespace sen::impl
395
396#endif // SEN_CORE_OBJ_DETAIL_EVENT_BUFFER_H
Here we define a set of template meta-programming helpers to let the compiler take some decisions bas...
InputStreamTemplate< LittleEndian > InputStream
Definition input_stream.h:84
Emit
How to emit an event.
Definition object.h:51
Callback< EventInfo, Args... > EventCallback
An event callback.
Definition callback.h:208
@ now
Directly when it happens.
Definition object.h:52
std::conditional_t< std::is_arithmetic_v< T >||shouldBePassedByValueV< T >, T, AddConstRef< T > > MaybeRef
returns 'const T&' or 'T' depending on the type
Definition class_helpers.h:46
std::vector< Var > VarList
A list of vars to represent sequences.
Definition var.h:104
TransportMode
How to transport information.
Definition type.h:56
@ multicast
Directed to all receivers, unreliable, unordered, no congestion control.
Definition type.h:58
@ confirmed
Directed to each receiver, reliable, ordered, with congestion control, relatively heavyweight.
Definition type.h:59
detail::MoveOnlyFunctionImpl< FwdArgs... > move_only_function
Definition move_only_function.h:256
OutputStreamTemplate< LittleEndian > OutputStream
Definition output_stream.h:64