Sen API
Sen Libraries
Loading...
Searching...
No Matches
remote_object.h
Go to the documentation of this file.
1// === remote_object.h =================================================================================================
2// Sen Infrastructure
3// Released under the Apache License v2.0 (SPDX-License-Identifier Apache-2.0).
4// See the LICENSE.txt file for more information.
5// © Airbus SAS, Airbus Helicopters, and Airbus Defence and Space SAU/GmbH/SAS.
6// =====================================================================================================================
7
8#ifndef SEN_CORE_OBJ_DETAIL_REMOTE_OBJECT_H
9#define SEN_CORE_OBJ_DETAIL_REMOTE_OBJECT_H
10
11// sen
17#include "sen/core/base/span.h"
22#include "sen/core/io/util.h"
25#include "sen/core/meta/event.h"
27#include "sen/core/meta/type.h"
29#include "sen/core/meta/var.h"
36
37// std
38#include <atomic>
39#include <cstddef>
40#include <cstdint>
41#include <exception>
42#include <functional>
43#include <memory>
44#include <mutex>
45#include <optional>
46#include <stdexcept>
47#include <string>
48#include <tuple>
49#include <type_traits>
50#include <unordered_map>
51#include <utility>
52#include <vector>
53
54namespace sen
55{
56
57// Forward declarations
58namespace kernel::impl
59{
60
61class RemoteParticipant;
62class ProxyManager;
63
64} // namespace kernel::impl
65
66namespace impl
67{
68
70using CallId = uint32_t;
71
73using SendCallFunc = // NOLINTNEXTLINE(misc-include-cleaner) false positive about ObjectId
74 std::function<Result<CallId, std::string>(TransportMode, ObjectOwnerId, ObjectId, MemberHash, MemBlockPtr&& args)>;
75
77enum class RemoteCallResult : uint8_t
78{
79 success,
80 objectNotFound,
81 runtimeError,
82 logicError,
83 unknownException,
84};
85
87struct MethodResponseData
88{
89 CallId callId;
90 std::shared_ptr<std::vector<uint8_t>> returnValBuffer;
91 RemoteCallResult result;
92 std::string error;
93};
94
95// Forward declarations
96class RemoteObject;
97
99struct RemoteObjectInfo
100{
102 std::string name;
103 ObjectId id;
104 WorkQueue* workQueue = nullptr;
105 SendCallFunc sendCallFunc;
106 std::string localName;
107 std::function<void(RemoteObject*)> destructionCallback = nullptr;
108 ObjectOwnerId ownerId;
109 std::weak_ptr<kernel::impl::RemoteParticipant> remoteParticipant;
110 MaybeConstTypeHandle<ClassType> writerSchema = std::nullopt;
111};
112
114class SEN_EXPORT RemoteObject: public ProxyObject
115{
116 SEN_NOCOPY_NOMOVE(RemoteObject)
117
118public: // special members
119 explicit RemoteObject(RemoteObjectInfo&& info);
120 ~RemoteObject() override;
121
122public:
123 virtual void copyStateFrom(const RemoteObject& other);
124
125public: // implements Object
126 [[nodiscard]] ObjectId getId() const noexcept final;
127 [[nodiscard]] const std::string& getName() const noexcept final;
128 [[nodiscard]] ConstTypeHandle<ClassType> getClass() const noexcept override;
129 [[nodiscard]] const ProxyObject* asProxyObject() const noexcept final;
130 [[nodiscard]] ProxyObject* asProxyObject() noexcept final;
131 [[nodiscard]] RemoteObject* asRemoteObject() noexcept final;
132 [[nodiscard]] const RemoteObject* asRemoteObject() const noexcept final;
133 [[nodiscard]] const std::string& getLocalName() const noexcept final;
134 [[nodiscard]] TimeStamp getLastCommitTime() const noexcept final;
135 [[nodiscard]] Var getPropertyUntyped(const Property* prop) const final;
136 [[nodiscard]] ConnectionGuard onEventUntyped(const Event* ev, EventCallback<VarList>&& callback) final;
137 // NOLINTNEXTLINE
138 void invokeUntyped(const Method* method, const VarList& args, MethodCallback<Var>&& onDone = {}) final;
139 [[nodiscard]] ConnectionGuard onPropertyChangedUntyped(const Property* prop, EventCallback<VarList>&& callback) final;
140
141public: // implements ProxyObject
142 [[nodiscard]] bool isRemote() const noexcept final;
143
144public:
145 void initializeState(Span<const uint8_t> state, const TimeStamp& time);
146 std::weak_ptr<const kernel::impl::RemoteParticipant> getParticipant() const;
147 [[nodiscard]] MaybeConstTypeHandle<ClassType> getWriterSchema() const noexcept;
148
149protected:
150 friend class kernel::impl::RemoteParticipant;
151 friend class kernel::impl::ProxyManager;
152
153 [[nodiscard]] bool responseReceived(MethodResponseData& response);
154 void propertyUpdatesReceived(const Span<const uint8_t>& properties, TimeStamp commitTime);
155 virtual void eventReceived(MemberHash eventId, TimeStamp creationTime, const Span<const uint8_t>& args) = 0;
156 [[nodiscard]] virtual bool readPropertyFromStream(MemberHash id, InputStream& in, bool initialState) = 0;
157 [[nodiscard]] virtual Var senImplGetPropertyImpl(MemberHash propertyId) const = 0;
158 virtual void serializeMethodArgumentsFromVariants(MemberHash methodId,
159 const VarList& args,
160 OutputStream& out) const = 0;
161 [[nodiscard]] virtual Var deserializeMethodReturnValueAsVariant(MemberHash methodId, InputStream& in) const = 0;
162 void clearDestructionCallback() noexcept;
163 void senImplEventEmitted(MemberHash id, std::function<VarList()>&& argsGetter, const EventInfo& info) final;
164 void senImplRemoveUntypedConnection(ConnId id, MemberHash memberHash) final; // NOLINT
165
166 template <typename R, typename... Args>
167 void makeRemoteCall(MemberHash id, TransportMode transportMode, MethodCallback<R>&& callback, Args... args) const;
168
169 template <typename R, typename... Args>
170 void adaptAndMakeRemoteCall(MemberHash id,
171 const Method* readerMethod,
172 TransportMode transportMode,
173 MethodCallback<R>&& callback,
174 Args... args) const;
175
176 Result<CallId, std::string> sendCall(TransportMode mode,
177 ObjectOwnerId ownerId,
178 ObjectId receiverId,
179 MemberHash id,
180 MemBlockPtr&& args) const;
181
182 ConnId senImplMakeConnectionId() noexcept;
183
184 template <typename T>
185 void readOrAdapt(InputStream& in, T& val, MemberHash id, const Type& readerType, bool isSync);
186
187 [[nodiscard]] bool memberTypesInSync(MemberHash id) const noexcept;
188
189 [[nodiscard]] bool checkMemberTypesInSyncInDetail(MemberHash id) const noexcept;
190
191 template <typename... T>
192 void dispatchEventFromStream(const EventBuffer<T...>& eventBuffer,
193 bool inSync,
194 const Span<const Arg>& readerArgs,
195 MemberHash eventId,
196 TimeStamp creationTime,
197 ObjectId producerId,
198 TransportMode transportMode,
199 const Span<const uint8_t>& buffer,
200 Object* producer) const;
201
202 void invalidateTransport();
203
204private:
205 template <typename R>
206 [[nodiscard]] static inline MethodResult<R> readMethodCallReturnValue(const std::vector<uint8_t>& buffer);
207
208 template <typename R, typename F>
209 [[nodiscard]] static inline MethodResult<R> makeMethodResult(const MethodResponseData& response, F&& func);
210
211 template <size_t i = 0, typename... T>
212 void adaptEventArgs(std::tuple<T...>& args,
213 VarList& argsAsVariants,
214 const Span<const Arg>& readerArgs,
215 const Event* writerEvent) const;
216
217 static void logEventArgsWarning(const std::string& eventName,
218 size_t numOfWriterEventArgs,
219 size_t numOfReaderEventArgs);
220
221 virtual void cancelPendingCalls();
222
223private:
224 using MutexLock = std::scoped_lock<std::recursive_mutex>;
225 struct EventCallbackData
226 {
227 uint32_t id;
228 EventCallback<VarList> callback;
229 };
230
231 using EventCallbackList = std::vector<EventCallbackData>;
232
233 struct EventCallbackInfo
234 {
235 EventCallbackList list;
236 TransportMode transportMode;
237 };
238
239 using EventCallbackMap = std::unordered_map<MemberHash, EventCallbackInfo>;
240
241private:
242 using PendingResponseContainerType =
243 std::unordered_map<CallId, sen::std_util::move_only_function<void(MethodResponseData&&)>>;
244
245 mutable RemoteObjectInfo info_;
246 mutable PendingResponseContainerType pendingResponses_;
247 mutable std::recursive_mutex pendingResponsesMutex_;
248 std::atomic_uint32_t nextConnection_ = 1;
249 std::atomic<TimeStamp> lastCommitTime_;
250 std::unique_ptr<EventCallbackMap> eventCallbacks_;
251 std::recursive_mutex eventCallbacksMutex_;
252};
253
254//----------------------------------------------------------------------------------------------------------------------
255// Inline implementation
256//----------------------------------------------------------------------------------------------------------------------
257
258template <typename R, typename... Args>
259void RemoteObject::makeRemoteCall(MemberHash id,
260 TransportMode transportMode,
261 MethodCallback<R>&& callback,
262 Args... args) const
263{
264 // create a buffer containing the arguments
265 auto argsBuffer = std::make_shared<ResizableHeapBlock>();
266
267 if constexpr (sizeof...(Args) != 0U)
268 {
269 // resize buffer
270 argsBuffer->resize((... + SerializationTraits<Args>::serializedSize(args)));
271
272 BufferWriter writer(*argsBuffer);
273 OutputStream out(writer);
274
275 // serialize args
276 (SerializationTraits<Args>::write(out, args), ...);
277 }
278
279 // send the call and get a ticket
280 if (auto callId = sendCall(transportMode, info_.ownerId, info_.id, id, std::move(argsBuffer)); callId.isOk())
281 {
282 // if no callback, dismiss the response
283 if (auto cbLock = callback.lock(); cbLock.isValid())
284 {
285 // when receiving the response, add the callback to the work queue
286 auto responseHandler =
287 [cb = std::move(callback), transportMode, us = weak_from_this()](const auto& response) mutable
288 {
289 if (auto callbackLock = cb.lock(); callbackLock.isValid())
290 {
291 if (us.expired())
292 {
293 auto work = [cb = std::move(cb)]() mutable
294 { cb.invoke({}, ::sen::Err(std::make_exception_ptr(std::runtime_error("object deleted")))); };
295 callbackLock.pushAnswer(std::move(work), impl::cannotBeDropped(transportMode));
296 }
297 else
298 {
299 auto result =
300 makeMethodResult<R>(response, [](const auto& buffer) { return readMethodCallReturnValue<R>(buffer); });
301
302 auto work = [res = std::move(result), cb = std::move(cb)]() mutable { cb.invoke({}, res); };
303 callbackLock.pushAnswer(std::move(work), impl::cannotBeDropped(transportMode));
304 }
305 }
306 };
307
308 MutexLock pendingResponsesLock(pendingResponsesMutex_);
309 pendingResponses_.insert({callId.getValue(), std::move(responseHandler)});
310 }
311 }
312 else
313 {
314 // if no callback, dismiss the response
315 if (auto cbLock = callback.lock(); cbLock.isValid())
316 {
317 auto work = [cb = std::move(callback), err = callId.getError()]() mutable
318 { cb.invoke({}, ::sen::Err(std::make_exception_ptr(std::runtime_error(err)))); };
319 cbLock.pushAnswer(std::move(work), impl::cannotBeDropped(transportMode));
320 }
321 }
322}
323
324template <typename R, typename... Args>
325inline void RemoteObject::adaptAndMakeRemoteCall(MemberHash id,
326 const Method* readerMethod,
327 TransportMode transportMode,
328 MethodCallback<R>&& callback,
329 Args... args) const
330{
331 SEN_ASSERT(info_.writerSchema.has_value() && "WriterSchema is expected");
332 const auto* writerMethod = info_.writerSchema.value()->searchMethodById(id);
333 if (writerMethod == nullptr)
334 {
335 if (auto callbackLock = callback.lock(); callbackLock.isValid())
336 {
337 std::string err = "Cannot call remote method ";
338 err.append(readerMethod->getName());
339 err.append(" because it is not defined in the remote object.");
340
341 auto result = Err(std::make_exception_ptr(std::runtime_error(err)));
342
343 auto work = [res = std::move(err), callback = std::move(callback)]() mutable
344 { callback.invoke({}, Err(std::make_exception_ptr(std::runtime_error(res)))); };
345
346 callbackLock.pushAnswer(std::move(work), cannotBeDropped(transportMode));
347 }
348 return;
349 }
350
351 const auto& readerArgs = readerMethod->getArgs();
352 const auto& writerArgs = writerMethod->getArgs();
353 if (writerArgs.size() > readerArgs.size())
354 {
355 if (auto callbackLock = callback.lock(); callbackLock.isValid())
356 {
357 std::string err = "Cannot call remote method ";
358 err.append(readerMethod->getName());
359 err.append(". Remote method requires ");
360 err.append(std::to_string(writerArgs.size()));
361 err.append(" arguments. Local method specifies ");
362 err.append(std::to_string(readerArgs.size()));
363 err.append(" arguments.");
364
365 auto result = Err(std::make_exception_ptr(std::runtime_error(err)));
366 auto work = [res = std::move(err), callback = std::move(callback)]() mutable
367 { callback.invoke({}, Err(std::make_exception_ptr(std::runtime_error(res)))); };
368
369 callbackLock.pushAnswer(std::move(work), cannotBeDropped(transportMode));
370 }
371
372 return;
373 }
374
375 // create a buffer containing the arguments
376 auto argsBuffer = std::make_shared<ResizableHeapBlock>();
377
378 if (!readerArgs.empty())
379 {
380 ResizableBufferWriter writer(*argsBuffer);
381 OutputStream out(writer);
382 VarList argsAsVariants {toVariant(args)...};
383
384 for (const auto& arg: writerArgs)
385 {
386 const auto readerArgIndex = readerMethod->getArgIndexFromNameHash(arg.getNameHash());
387 auto& varArg = argsAsVariants.at(readerArgIndex);
388 writeToStream(varArg, out, *arg.type, readerMethod->getArgs()[readerArgIndex].type);
389 }
390 }
391
392 // send the call and get a ticket
393 auto callId = sendCall(transportMode, info_.ownerId, info_.id, id, std::move(argsBuffer));
394
395 // if no callback, dismiss the response
396 if (callback.lock().isValid())
397 {
398 // when receiving the response, add the callback to the work queue
399 auto responseHandler = [cb = std::move(callback),
400 writerReturnType = writerMethod->getReturnType(),
401 us = shared_from_this(), // NOLINT
402 transportMode](const auto& response) mutable
403 {
404 if (auto cbLock = cb.lock(); cbLock.isValid())
405 {
406 auto result = makeMethodResult<R>(response,
407 [writerReturnType](const auto& buffer)
408 {
409 if constexpr (std::is_void_v<R>)
410 {
411 std::ignore = buffer;
412 return ::sen::Ok();
413 }
414 else
415 {
416 InputStream in(buffer);
417 Var var {};
418 readFromStream(var, in, *writerReturnType);
419 std::ignore =
420 adaptVariant(*MetaTypeTrait<R>::meta(), var, writerReturnType);
421 return Ok(toValue<R>(var));
422 }
423 });
424
425 auto work = [res = std::move(result), cb = std::move(cb)]() mutable { cb.invoke({}, res); };
426 cbLock.pushAnswer(std::move(work), cannotBeDropped(transportMode));
427 }
428 };
429
430 MutexLock pendingResponsesLock(pendingResponsesMutex_);
431 pendingResponses_.try_emplace(callId.getValue(), std::move(responseHandler));
432 }
433}
434
435template <typename T>
436inline void RemoteObject::readOrAdapt(InputStream& in, T& val, MemberHash id, const Type& readerType, bool isSync)
437{
438 if (SEN_LIKELY(isSync))
439 {
440 SerializationTraits<T>::read(in, val);
441 return;
442 }
443
444 SEN_ASSERT(info_.writerSchema.has_value() && "WriterSchema is expected");
445 const auto* writerProp = info_.writerSchema.value()->searchPropertyById(id);
446 if (writerProp == nullptr)
447 {
448 std::string err = "Error while adapting the runtime type of a property from a remote object. The property ID ";
449 err.append(std::to_string(id.get()));
450 err.append(" cannot be found among the properties of the writer schema.");
452 }
453
454 Var var {};
455 readFromStream(var, in, *writerProp->getType());
456 std::ignore = adaptVariant(readerType, var, writerProp->getType());
457 VariantTraits<T>::variantToValue(var, val);
458}
459
460inline bool RemoteObject::memberTypesInSync(MemberHash id) const noexcept
461{
462 return SEN_LIKELY(!info_.writerSchema) || checkMemberTypesInSyncInDetail(id);
463}
464
465template <typename... T>
466inline void RemoteObject::dispatchEventFromStream(const EventBuffer<T...>& eventBuffer,
467 bool inSync,
468 const Span<const Arg>& readerArgs,
469 MemberHash eventId,
470 TimeStamp creationTime,
471 ObjectId producerId,
472 TransportMode transportMode,
473 const Span<const uint8_t>& buffer,
474 Object* producer) const
475{
476 if (SEN_LIKELY(inSync))
477 {
478 eventBuffer.dispatchFromStream(eventId, creationTime, producerId, transportMode, buffer, producer);
479 return;
480 }
481
482 SEN_ASSERT(info_.writerSchema.has_value() && "WriterSchema is expected");
483 const auto* writerEvent = info_.writerSchema.value()->searchEventById(eventId);
484 const auto& writerArgs = writerEvent->getArgs();
485
486 InputStream in(buffer);
487 VarList argsAsVariants {};
488 const auto& writerArgsSize = writerArgs.size();
489 const auto& readerArgsSize = readerArgs.size();
490 argsAsVariants.reserve(writerArgsSize);
491
492 // if the remote implementation has fewer arguments than the local implementation, the event cannot be dispatched
493 if (writerArgsSize < readerArgsSize)
494 {
495 logEventArgsWarning(writerEvent->getName().data(), writerArgsSize, readerArgsSize);
496 return;
497 }
498
499 for (const auto& arg: writerArgs)
500 {
501 argsAsVariants.emplace_back();
502 readFromStream(argsAsVariants.back(), in, *arg.type);
503 }
504
505 std::tuple<T...> argsNative {};
506 adaptEventArgs(argsNative, argsAsVariants, readerArgs, writerEvent);
507
508 // call emit with the tuple arguments
509 std::apply([&eventBuffer](auto&&... arg)
510 { eventBuffer.dispatch(std::forward<std::remove_reference_t<decltype(arg)>>(arg)...); },
511 std::tuple_cat(std::make_tuple(eventId, creationTime, producerId, transportMode, false, nullptr, producer),
512 std::move(argsNative)));
513}
514
515template <typename R>
516MethodResult<R> RemoteObject::readMethodCallReturnValue(const std::vector<uint8_t>& buffer)
517{
518 if constexpr (std::is_void_v<R>)
519 {
520 return ::sen::Ok();
521 }
522 else
523 {
524 InputStream in(buffer);
525
526 R returnVal {};
527 SerializationTraits<R>::read(in, returnVal);
528 return ::sen::Ok(returnVal);
529 }
530}
531
532template <typename R, typename F>
533MethodResult<R> RemoteObject::makeMethodResult(const MethodResponseData& response, F&& func)
534{
535 switch (response.result)
536 {
537 case RemoteCallResult::logicError:
538 return ::sen::Err(std::make_exception_ptr(std::logic_error(response.error)));
539
540 case RemoteCallResult::objectNotFound:
541 return ::sen::Err(std::make_exception_ptr(std::invalid_argument(response.error)));
542
543 case RemoteCallResult::runtimeError:
544 return ::sen::Err(std::make_exception_ptr(std::runtime_error(response.error)));
545
546 case RemoteCallResult::unknownException:
547 return ::sen::Err(std::make_exception_ptr(std::exception {}));
548
549 case RemoteCallResult::success:
550 return func(*response.returnValBuffer);
551
552 default:
554 }
555
556 SEN_ASSERT(false);
557}
558
559template <size_t i, typename... T>
560inline void RemoteObject::adaptEventArgs(std::tuple<T...>& argsNative,
561 VarList& argsAsVariants,
562 const Span<const Arg>& readerArgs,
563 const Event* writerEvent) const
564{
565 if constexpr (i == sizeof...(T))
566 {
567 return;
568 }
569 else
570 {
571 const auto writerArgIndex = writerEvent->getArgIndexFromNameHash(readerArgs[i].getNameHash());
572 auto& varArg = argsAsVariants.at(writerArgIndex);
573 std::ignore = adaptVariant(*readerArgs[i].type, varArg, writerEvent->getArgs()[writerArgIndex].type);
574 VariantTraits<std::remove_reference_t<decltype(std::get<i>(argsNative))>>::variantToValue(varArg,
575 std::get<i>(argsNative));
576
577 // next element of the tuple
578 adaptEventArgs<i + 1>(argsNative, argsAsVariants, readerArgs, writerEvent);
579 }
580}
581
582inline void RemoteObject::invalidateTransport()
583{
584 info_.sendCallFunc = nullptr;
585 cancelPendingCalls();
586}
587
588} // namespace impl
589} // namespace sen
590
591#endif // SEN_CORE_OBJ_DETAIL_REMOTE_OBJECT_H
The following macros implement a replacement of assert that is connected to the overall fault handlin...
impl::Err< void > Err() noexcept
If E is void, use the void specialization of Err.
Definition result.h:439
impl::Ok< void > Ok() noexcept
If T is void, use the void specialization of Ok.
Definition result.h:434
#define SEN_ASSERT(expr)
Checks an intermediate result produced by a procedure (not an input or output). NOLINTNEXTLINE.
Definition assert.h:39
void throwRuntimeError(const std::string &err)
Throws std::exception that attempts to collect the stack trace. We also wrap it to avoid including st...
InputStreamTemplate< LittleEndian > InputStream
Definition input_stream.h:84
Var toVariant(const T &val)
Definition core/include/sen/core/io/util.h:160
T toValue(const Var &var)
Definition core/include/sen/core/io/util.h:152
#define SEN_UNREACHABLE()
Definition compiler_macros.h:420
Result< R, std::exception_ptr > MethodResult
The result of a method (which can be an exception in case of error).
Definition callback.h:204
Callback< MethodCallInfo, MethodResult< R > > MethodCallback
A method callback.
Definition callback.h:212
std::vector< Var > VarList
A list of vars to represent sequences.
Definition var.h:104
std::optional< ConstTypeHandle< T > > MaybeConstTypeHandle
Definition type.h:322
TypeHandle< const T > ConstTypeHandle
Definition type.h:319
TransportMode
How to transport information.
Definition type.h:56
detail::MoveOnlyFunctionImpl< FwdArgs... > move_only_function
Definition move_only_function.h:256
Definition assert.h:17
OutputStreamTemplate< LittleEndian > OutputStream
Definition output_stream.h:64