Sen API
Sen Libraries
Loading...
Searching...
No Matches
native_object.h
Go to the documentation of this file.
1// === native_object.h =================================================================================================
2// Sen Infrastructure
3// Released under the Apache License v2.0 (SPDX-License-Identifier Apache-2.0).
4// See the LICENSE.txt file for more information.
5// © Airbus SAS, Airbus Helicopters, and Airbus Defence and Space SAU/GmbH/SAS.
6// =====================================================================================================================
7
8#ifndef SEN_CORE_OBJ_NATIVE_OBJECT_H
9#define SEN_CORE_OBJ_NATIVE_OBJECT_H
10
11// sen
16#include "sen/core/base/span.h"
20#include "sen/core/meta/event.h"
23#include "sen/core/meta/type.h"
24#include "sen/core/meta/var.h"
30#include "sen/core/obj/object.h"
31
32// std
33#include <atomic>
34#include <chrono>
35#include <cstdint>
36#include <exception>
37#include <functional>
38#include <future>
39#include <memory>
40#include <mutex>
41#include <shared_mutex>
42#include <stdexcept>
43#include <string>
44#include <string_view>
45#include <unordered_map>
46#include <utility>
47#include <vector>
48
49namespace sen
50{
51
54
59
62class SEN_EXPORT NativeObject: public Object
63{
64 SEN_NOCOPY_NOMOVE(NativeObject)
65
66public:
67 ~NativeObject() override;
68
69public:
73
77 virtual void update(kernel::RunApi& runApi);
78
82
85 [[nodiscard]] virtual bool needsPreDrainOrPreCommit() const noexcept { return false; }
86
91 virtual void preDrain() {}
92
97 virtual void preCommit() {}
98
100 void setNextPropertyUntyped(const Property* property, const Var& value);
101
103 [[nodiscard]] Var getNextPropertyUntyped(const Property* property) const;
104
105public: // implements Object
106 [[nodiscard]] ObjectId getId() const noexcept final;
107 [[nodiscard]] const std::string& getName() const noexcept final;
108 [[nodiscard]] Var getPropertyUntyped(const Property* prop) const final;
109 [[nodiscard]] NativeObject* asNativeObject() noexcept final;
110 [[nodiscard]] const NativeObject* asNativeObject() const noexcept final;
111 [[nodiscard]] const std::string& getLocalName() const noexcept final;
112 [[nodiscard]] TimeStamp getLastCommitTime() const noexcept override;
113 [[nodiscard]] ConnectionGuard onEventUntyped(const Event* ev, EventCallback<VarList>&& callback) final;
114 void invokeUntyped(const Method* method, const VarList& args, MethodCallback<Var>&& onDone) final;
115 [[nodiscard]] ConnectionGuard onPropertyChangedUntyped(const Property* prop, EventCallback<VarList>&& callback) final;
116
117protected:
118 explicit NativeObject(const std::string& name);
119 void commit(TimeStamp time);
120
121protected: // provided by the generated code
122 [[nodiscard]] virtual Var senImplGetPropertyImpl(MemberHash propertyId) const = 0;
123 virtual void senImplSetNextPropertyUntyped(MemberHash propertyId, const Var& value) = 0;
124 [[nodiscard]] virtual Var senImplGetNextPropertyUntyped(MemberHash propertyId) const = 0;
126 {
127 auto readLock = createReaderLock();
128 return senImplComputeMaxReliableSerializedPropertySizeImpl();
129 }
130 virtual void senImplCommitImpl(TimeStamp time) = 0;
132 impl::BufferProvider uni,
133 impl::BufferProvider multi) = 0;
134 virtual void senImplStreamCall(MemberHash methodId, InputStream& in, StreamCallForwarder&& fwd) = 0;
135 virtual void senImplVariantCall(MemberHash methodId, const VarList& args, VariantCallForwarder&& fwd) = 0;
136
137 [[nodiscard]] virtual impl::FieldValueGetter senImplGetFieldValueGetter(MemberHash propertyId,
138 Span<uint16_t> fields) const = 0;
139
140private:
141 [[nodiscard]] virtual uint32_t senImplComputeMaxReliableSerializedPropertySizeImpl() const = 0;
142
143protected: // called by the generated code
144 [[nodiscard]] ConnId senImplMakeConnectionId() noexcept;
145
146 void senImplRemoveUntypedConnection(ConnId id, MemberHash memberHash) override;
147
149 template <typename R, typename... Args, typename F, class C>
150 inline void senImplAsyncCall(C* instance, MethodCallback<R>&& callback, F&& f, bool forcePush, Args... args);
151
153 template <typename R, typename... Args, typename F, class C>
154 inline void senImplAsyncCall(const C* instance, MethodCallback<R>&& callback, F&& f, bool forcePush, Args... args)
155 const;
156
158 void senImplAsyncCall(const Method* method, const VarList& args, MethodCallback<Var>&& callback);
159
161 template <typename R, typename... Args, typename F, class C>
162 inline void senImplAsyncDeferredCall(C* instance, MethodCallback<R>&& callback, F&& f, bool forcePush, Args... args);
163
165 template <typename R>
166 static inline void tryToGetResult(std::shared_ptr<std::future<R>> future,
167 MethodCallback<R>&& callback,
168 bool forcePush);
169
171 template <typename... T>
172 inline void senImplProduceEvent(impl::EventBuffer<T...>& eventBuffer,
173 Emit emissionMode,
174 MemberHash eventId,
175 TransportMode transportMode,
176 bool addToTransportQueue,
177 MaybeRef<T>... args);
178
180 void senImplEventEmitted(MemberHash id, std::function<VarList()>&& argsGetter, const EventInfo& info) final;
181
183 void addWorkToQueue(sen::std_util::move_only_function<void()>&& call, bool forcePush) const;
184
185 [[nodiscard]] impl::SerializableEventQueue* getOutputEventQueue() noexcept;
186
187private:
188 void setLocalPrefix(std::string_view localPrefix) noexcept;
189
192 void setQueues(impl::WorkQueue* workQueue, impl::SerializableEventQueue* eventQueue) noexcept;
193
196 void setRegistrationTime(TimeStamp time) noexcept { registrationTime_ = time; }
197
199 [[nodiscard]] TimeStamp getRegistrationTime() const noexcept { return registrationTime_; }
200
201private:
202 template <typename... T>
203 friend class impl::EventBuffer;
204
205 struct EventCallbackData
206 {
207 uint32_t id;
208 std::shared_ptr<EventCallback<VarList>> callback;
209 };
210
211 using EventCallbackList = std::vector<EventCallbackData>;
212
213 struct EventCallbackInfo
214 {
215 EventCallbackList list;
216 TransportMode transportMode;
217 };
218
219 using EventCallbackMap = std::unordered_map<MemberHash, EventCallbackInfo>;
220
221protected:
222 [[nodiscard]] std::lock_guard<std::shared_mutex> createWriterLock() const
223 {
224 return std::lock_guard(dataAccessMutex_);
225 }
226
227 [[nodiscard]] std::shared_lock<std::shared_mutex> createReaderLock() const
228 {
229 return std::shared_lock(dataAccessMutex_);
230 }
231
232private: // access to the kernel and generated proxies
233 friend class impl::NativeObjectProxy;
234 friend class impl::FilteredProvider;
235 friend class kernel::impl::RemoteParticipant;
236 friend class kernel::impl::LocalParticipant;
237 friend class kernel::impl::Runner;
238 friend class kernel::impl::ObjectUpdate;
239 friend class kernel::PipelineComponent;
240 friend impl::WorkQueue* impl::getWorkQueue(NativeObject* object) noexcept;
241
242private:
243 mutable std::atomic<impl::WorkQueue*> workQueue_;
244 mutable std::atomic<impl::SerializableEventQueue*> outputEventQueue_;
245 mutable impl::WorkQueue postCommitEvents_;
246 TimeStamp lastCommitTime_;
247 mutable std::shared_mutex dataAccessMutex_;
248 ObjectId id_;
249 std::string name_;
250 std::string localName_;
251 std::atomic_uint32_t nextConnection_ = 1U;
252 std::unique_ptr<EventCallbackMap> eventCallbacks_;
253 std::recursive_mutex eventCallbacksMutex_;
254 TimeStamp registrationTime_;
255};
256
258
259//----------------------------------------------------------------------------------------------------------------------
260// Inline implementation
261//----------------------------------------------------------------------------------------------------------------------
262
263namespace impl
264{
265inline WorkQueue* getWorkQueue(NativeObject* object) noexcept { return object->workQueue_.load(); }
266
267//--------------------------------------------------------------------------------------------------------------
268// Helpers
269//--------------------------------------------------------------------------------------------------------------
270
271template <typename R, typename... Args, typename F, class C>
272[[nodiscard]] inline MethodResult<R> executeCallWithArgs(C instance, F&& f, const Args&... args) noexcept
273{
274 try
275 {
276 if constexpr (std::is_void_v<R>)
277 {
278 (*instance.*f)(args...);
279 return ::sen::Ok();
280 }
281 else
282 {
283 return ::sen::Ok((*instance.*f)(args...));
284 }
285 }
286 catch (...)
287 {
288 return ::sen::Err(std::current_exception());
289 }
290}
291
292template <typename R, typename... Args, typename F, class C>
293[[nodiscard]] inline std::exception_ptr executeDeferredCallWithArgs(C instance,
294 F&& f,
295 std::promise<R> promise,
296 const Args&... args) noexcept
297{
298 try
299 {
300 (*instance.*f)(args..., std::move(promise));
301 return nullptr;
302 }
303 catch (...)
304 {
305 return std::current_exception();
306 }
307}
308
309} // namespace impl
310
311template <typename R, typename... Args, typename F, class C>
312void NativeObject::senImplAsyncCall(C* instance, MethodCallback<R>&& callback, F&& f, bool forcePush, Args... args)
313{
314 // add work to our queue. Our runner will execute it during the drain stage. If the caller provided a callback,
315 // then we need to provide the result and ensure that the caller executes the callback in its own execution context.
317 [instance,
318 forcePush,
319 args...,
320 func = std::forward<F>(f),
321 responseCallback = std::move(callback),
322 us = weak_from_this()]() mutable
323 {
324 // try to execute the call on us, but first check if we have been deleted in the meantime
325 if (us.expired())
326 {
327 if (auto callbackLock = responseCallback.lock(); callbackLock.isValid())
328 {
329 auto err = MethodResult<R> {Err(std::make_exception_ptr(std::runtime_error("object no longer exists")))};
330 auto work = [rcb = std::move(responseCallback), err = std::move(err)]() mutable { rcb.invoke({}, err); };
331 callbackLock.pushAnswer(std::move(work), forcePush);
332 }
333 }
334 else
335 {
336 auto result = impl::executeCallWithArgs<R, Args...>(instance, std::move(func), args...); // NOLINT
337 if (auto callbackLock = responseCallback.lock(); callbackLock.isValid())
338 {
339 auto work = [ret = std::move(result), rcb = std::move(responseCallback)]() mutable { rcb.invoke({}, ret); };
340 callbackLock.pushAnswer(std::move(work), forcePush);
341 }
342 }
343 },
344 forcePush);
345}
346
347inline void NativeObject::invokeUntyped(const Method* method, const VarList& args, MethodCallback<Var>&& onDone)
348{
349 senImplAsyncCall(method, args, std::move(onDone));
350}
351
352// add work to our queue. Our runner will execute it during the drain stage. If the caller provided a callback,
353// then we need to provide the result and ensure that the caller executes the callback in its own execution context.
354template <typename R, typename... Args, typename F, class C>
355void NativeObject::senImplAsyncCall(const C* instance,
356 MethodCallback<R>&& callback,
357 F&& f,
358 bool forcePush,
359 Args... args) const
360{
362 [instance,
363 forcePush,
364 args...,
365 func = std::forward<F>(f),
366 responseCallback = std::move(callback),
367 us = weak_from_this()]() mutable
368 {
369 if (us.expired())
370 {
371 if (auto callbackLock = responseCallback.lock(); callbackLock.isValid())
372 {
373 auto err = MethodResult<R> {Err(std::make_exception_ptr(std::runtime_error("object no longer exists")))};
374 auto work = [rcb = std::move(responseCallback), err = std::move(err)]() mutable { rcb.invoke({}, err); };
375 callbackLock.pushAnswer(std::move(work), forcePush);
376 }
377 }
378 else
379 {
380 auto result = impl::executeCallWithArgs<R, Args...>(instance, std::move(func), args...);
381 if (auto callbackLock = responseCallback.lock(); callbackLock.isValid())
382 {
383 auto work = [ret = std::move(result), rcb = std::move(responseCallback)]() mutable { rcb.invoke({}, ret); };
384 callbackLock.pushAnswer(std::move(work), forcePush);
385 }
386 }
387 },
388 forcePush);
389}
390
391template <typename R, typename... Args, typename F, class C>
393 MethodCallback<R>&& callback,
394 F&& f,
395 bool forcePush,
396 Args... args)
397{
399 [instance,
400 forcePush,
401 args...,
402 func = std::forward<F>(f),
403 responseCallback = std::move(callback),
404 us = weak_from_this()]() mutable
405 {
406 if (us.expired())
407 {
408 if (auto callbackLock = responseCallback.lock(); callbackLock.isValid())
409 {
410 auto err = MethodResult<R> {Err(std::make_exception_ptr(std::runtime_error("object no longer exists")))};
411 auto work = [err = std::move(err), rcb = std::move(responseCallback)]() mutable { rcb.invoke({}, err); };
412 callbackLock.pushAnswer(std::move(work), forcePush);
413 }
414 }
415 else
416 {
417 std::promise<R> promise;
418 auto future = std::make_shared<std::future<R>>(promise.get_future());
419 auto exceptionPtr = impl::executeDeferredCallWithArgs(instance, func, std::move(promise), args...);
420
421 if (auto callbackLock = responseCallback.lock(); callbackLock.isValid())
422 {
423 if (exceptionPtr)
424 {
425 auto err = MethodResult<R> {::sen::Err(exceptionPtr)};
426 auto work = [err = std::move(err), rcb = std::move(responseCallback)]() mutable { rcb.invoke({}, err); };
427 callbackLock.pushAnswer(std::move(work), forcePush);
428 }
429 else
430 {
431 callbackLock.pushAnswer([fut = std::move(future), cb = std::move(responseCallback), forcePush]() mutable
432 { tryToGetResult<R>(std::move(fut), std::move(cb), forcePush); },
433 forcePush);
434 }
435 }
436 }
437 },
438 forcePush);
439}
440
441template <typename R>
442void NativeObject::tryToGetResult(std::shared_ptr<std::future<R>> future, MethodCallback<R>&& callback, bool forcePush)
443{
444 if (!future->valid())
445 {
446 // if the future is not valid, keep trying to get the result
447 if (auto callbackLock = callback.lock(); callbackLock.isValid())
448 {
449 callbackLock.pushAnswer([fut = std::move(future), cb = std::move(callback), forcePush]() mutable
450 { tryToGetResult<R>(std::move(fut), std::move(cb), forcePush); },
451 forcePush);
452 }
453 }
454 else
455 {
456 // poll the future
457 auto status = future->wait_for(std::chrono::microseconds(0));
458
459 // check the status
460 try
461 {
462 switch (status)
463 {
464 case std::future_status::deferred:
465 {
466 if constexpr (std::is_same_v<R, void>)
467 {
468 future->get();
469 callback.invoke({}, Ok());
470 }
471 else
472 {
473 callback.invoke({}, Ok(future->get()));
474 }
475 }
476 break;
477 case std::future_status::ready:
478 {
479 if constexpr (std::is_same_v<R, void>)
480 {
481 future->get();
482 callback.invoke({}, Ok());
483 }
484 else
485 {
486 auto value = future->get();
487 callback.invoke({}, Ok(value));
488 }
489 }
490 break;
491 case std::future_status::timeout:
492 {
493 // keep trying
494 if (auto callbackLock = callback.lock(); callbackLock.isValid())
495 {
496 callbackLock.pushAnswer([fut = std::move(future), cb = std::move(callback), forcePush]() mutable
497 { tryToGetResult<R>(std::move(fut), std::move(cb), forcePush); },
498 forcePush);
499 }
500 }
501 break;
502 default:
503 break;
504 }
505 }
506 catch (...)
507 {
508 callback.invoke({}, Err(std::current_exception()));
509 }
510 }
511}
512
513template <typename... T>
514void NativeObject::senImplProduceEvent(impl::EventBuffer<T...>& eventBuffer,
515 Emit emissionMode,
516 MemberHash eventId,
517 TransportMode transportMode,
518 bool addToTransportQueue,
519 MaybeRef<T>... args)
520{
521 eventBuffer.produce(emissionMode,
522 eventId,
523 lastCommitTime_,
524 id_,
525 transportMode,
526 addToTransportQueue,
527 outputEventQueue_.load(),
528 workQueue_.load(),
529 this,
530 args...);
531}
532
533} // namespace sen
534
535#endif // SEN_CORE_OBJ_NATIVE_OBJECT_H
Here we define a set of template meta-programming helpers to let the compiler take some decisions bas...
Represents an event.
Definition core/include/sen/core/meta/event.h:34
Represents a method.
Definition method.h:96
An object instantiated in this process. This is the base class for all user-implemented objects.
Definition native_object.h:63
ConnectionGuard onEventUntyped(const Event *ev, EventCallback< VarList > &&callback) final
Reflection-based interface for reacting to events.
virtual void preCommit()
Implement this function if you need to perform some action before the commit is called in the compone...
Definition native_object.h:97
void senImplAsyncDeferredCall(C *instance, MethodCallback< R > &&callback, F &&f, bool forcePush, Args... args)
Queues a call to a method for future execution with deferred semantics (non-const version).
Definition native_object.h:392
ObjectId getId() const noexcept final
Global unique object identification.
ConnId senImplMakeConnectionId() noexcept
virtual void preDrain()
Implement this function if you need to perform some action before the draining inputs the component w...
Definition native_object.h:91
virtual void update(kernel::RunApi &runApi)
Implement this function to perform changes to your object state or interactions with other objects....
virtual void senImplStreamCall(MemberHash methodId, InputStream &in, StreamCallForwarder &&fwd)=0
NativeObject(const std::string &name)
std::shared_lock< std::shared_mutex > createReaderLock() const
Definition native_object.h:227
void invokeUntyped(const Method *method, const VarList &args, MethodCallback< Var > &&onDone) final
Reflection- and variant- based interface for (asynchronously) invoking a method on this object....
Definition native_object.h:347
~NativeObject() override
virtual void senImplWriteChangedPropertiesToStream(OutputStream &confirmed, impl::BufferProvider uni, impl::BufferProvider multi)=0
uint32_t senImplComputeMaxReliableSerializedPropertySize() const
Definition native_object.h:125
NativeObject * asNativeObject() noexcept final
Helper that checks if the object is local (without dynamic casts).
virtual Var senImplGetPropertyImpl(MemberHash propertyId) const =0
ConnectionGuard onPropertyChangedUntyped(const Property *prop, EventCallback< VarList > &&callback) final
Reflection-based interface for detecting property changes.
virtual impl::FieldValueGetter senImplGetFieldValueGetter(MemberHash propertyId, Span< uint16_t > fields) const =0
void addWorkToQueue(sen::std_util::move_only_function< void()> &&call, bool forcePush) const
Queues a function (no specific task) into the work queue.
Var getNextPropertyUntyped(const Property *property) const
Untyped version of the getNext family of methods that get generated by subclasses.
std::lock_guard< std::shared_mutex > createWriterLock() const
Definition native_object.h:222
const std::string & getName() const noexcept final
The name given to the object upon construction.
virtual void senImplSetNextPropertyUntyped(MemberHash propertyId, const Var &value)=0
const std::string & getLocalName() const noexcept final
An alias used as an alternate way of identifying this object locally.
virtual void senImplCommitImpl(TimeStamp time)=0
virtual void unregistered(kernel::RegistrationApi &api)
Implement this function to react to the fact that the object has been unregistered from the execution...
impl::SerializableEventQueue * getOutputEventQueue() noexcept
Var getPropertyUntyped(const Property *prop) const final
Variant-based property getter.
void senImplProduceEvent(impl::EventBuffer< T... > &eventBuffer, Emit emissionMode, MemberHash eventId, TransportMode transportMode, bool addToTransportQueue, MaybeRef< T >... args)
Helper to call eventBuffer.produce() with our data.
Definition native_object.h:514
void senImplAsyncCall(C *instance, MethodCallback< R > &&callback, F &&f, bool forcePush, Args... args)
Queues a call to a method for future execution (non-const version).
Definition native_object.h:312
friend class impl::EventBuffer
Definition native_object.h:203
void setNextPropertyUntyped(const Property *property, const Var &value)
Untyped version of the setNext family of methods that get generated by subclasses.
virtual Var senImplGetNextPropertyUntyped(MemberHash propertyId) const =0
static void tryToGetResult(std::shared_ptr< std::future< R > > future, MethodCallback< R > &&callback, bool forcePush)
Tries to get a result from the future, queuing back the check until timed out.
Definition native_object.h:442
virtual void senImplVariantCall(MemberHash methodId, const VarList &args, VariantCallForwarder &&fwd)=0
void senImplRemoveUntypedConnection(ConnId id, MemberHash memberHash) override
virtual void registered(kernel::RegistrationApi &api)
Implement this function to react to the fact that the object has been created and registered into the...
virtual bool needsPreDrainOrPreCommit() const noexcept
True if preUpdate and postUpdate needs to be called for this object. Defaults to false.
Definition native_object.h:85
void senImplEventEmitted(MemberHash id, std::function< VarList()> &&argsGetter, const EventInfo &info) final
Called by EventBuffer when emitting an event.
void commit(TimeStamp time)
TimeStamp getLastCommitTime() const noexcept override
The point in time when the last commit was called.
Object()=default
friend class ConnectionGuard
Definition object.h:141
Represents a property.
Definition property.h:79
Contiguous view of elements of type T. Inspired by http://www.open-std.org/jtc1/sc22/wg21/docs/papers...
Definition span.h:34
A point in time.
Definition timestamp.h:26
API for objects when registered.
Definition component_api.h:170
What can be done while a component is running.
Definition component_api.h:224
impl::Err< void > Err() noexcept
If E is void, use the void specialization of Err.
Definition result.h:439
impl::Ok< void > Ok() noexcept
If T is void, use the void specialization of Ok.
Definition result.h:434
InputStreamTemplate< LittleEndian > InputStream
Definition input_stream.h:84
sen::std_util::move_only_function< void(OutputStream &)> StreamCall
Definition native_object.h:55
Emit
How to emit an event.
Definition object.h:51
Result< R, std::exception_ptr > MethodResult
The result of a method (which can be an exception in case of error).
Definition callback.h:204
Callback< MethodCallInfo, MethodResult< R > > MethodCallback
A method callback.
Definition callback.h:212
sen::std_util::move_only_function< void(Var &)> VariantCall
Definition native_object.h:56
sen::std_util::move_only_function< void(StreamCall &&)> StreamCallForwarder
Definition native_object.h:57
sen::std_util::move_only_function< void(VariantCall &&)> VariantCallForwarder
Definition native_object.h:58
Callback< EventInfo, Args... > EventCallback
An event callback.
Definition callback.h:208
Information about an event emission.
Definition callback.h:50
std::conditional_t< std::is_arithmetic_v< T >||shouldBePassedByValueV< T >, T, AddConstRef< T > > MaybeRef
returns 'const T&' or 'T' depending on the type
Definition class_helpers.h:46
std::vector< Var > VarList
A list of vars to represent sequences.
Definition var.h:104
@ time
Definition unit.h:34
TransportMode
How to transport information.
Definition type.h:56
Definition bits.h:26
detail::MoveOnlyFunctionImpl< FwdArgs... > move_only_function
Definition move_only_function.h:256
Definition assert.h:17
OutputStreamTemplate< LittleEndian > OutputStream
Definition output_stream.h:64
STL namespace.
The hash of a member.
Definition type.h:39
Can hold any supported value type. Wraps std::variant to allow recursion and implements some helpers.
Definition var.h:119