Sen API
Sen Libraries
Loading...
Searching...
No Matches
remote_object.h
Go to the documentation of this file.
1// === remote_object.h =================================================================================================
2// Sen Infrastructure
3// Released under the Apache License v2.0 (SPDX-License-Identifier Apache-2.0).
4// See the LICENSE.txt file for more information.
5// © Airbus SAS, Airbus Helicopters, and Airbus Defence and Space SAU/GmbH/SAS.
6// =====================================================================================================================
7
8#ifndef SEN_CORE_OBJ_DETAIL_REMOTE_OBJECT_H
9#define SEN_CORE_OBJ_DETAIL_REMOTE_OBJECT_H
10
11// sen
17#include "sen/core/base/span.h"
22#include "sen/core/io/util.h"
25#include "sen/core/meta/event.h"
27#include "sen/core/meta/type.h"
29#include "sen/core/meta/var.h"
36
37// std
38#include <atomic>
39#include <cstddef>
40#include <cstdint>
41#include <exception>
42#include <functional>
43#include <memory>
44#include <mutex>
45#include <optional>
46#include <stdexcept>
47#include <string>
48#include <tuple>
49#include <type_traits>
50#include <unordered_map>
51#include <utility>
52#include <vector>
53
54namespace sen
55{
56
57// Forward declarations
58namespace kernel::impl
59{
60
61class RemoteParticipant;
62class ProxyManager;
63
64} // namespace kernel::impl
65
66namespace impl
67{
68
70using CallId = uint32_t;
71
73using SendCallFunc = // NOLINTNEXTLINE(misc-include-cleaner) false positive about ObjectId
74 std::function<Result<CallId, std::string>(TransportMode, ObjectOwnerId, ObjectId, MemberHash, MemBlockPtr&& args)>;
75
77enum class RemoteCallResult : uint8_t
78{
79 success,
80 objectNotFound,
81 runtimeError,
82 logicError,
83 unknownException,
84};
85
87struct MethodResponseData
88{
89 CallId callId;
90 std::shared_ptr<std::vector<uint8_t>> returnValBuffer;
91 RemoteCallResult result;
92 std::string error;
93};
94
95// Forward declarations
96class RemoteObject;
97
99struct RemoteObjectInfo
100{
102 std::string name;
103 ObjectId id;
104 WorkQueue* workQueue = nullptr;
105 SendCallFunc sendCallFunc;
106 std::string localName;
107 std::function<void(RemoteObject*)> destructionCallback = nullptr;
108 ObjectOwnerId ownerId;
109 std::weak_ptr<kernel::impl::RemoteParticipant> remoteParticipant;
110 MaybeConstTypeHandle<ClassType> writerSchema = std::nullopt;
111};
112
114class SEN_EXPORT RemoteObject: public ProxyObject
115{
116 SEN_NOCOPY_NOMOVE(RemoteObject)
117
118public: // special members
119 explicit RemoteObject(RemoteObjectInfo&& info);
120 ~RemoteObject() override;
121
122public:
123 virtual void copyStateFrom(const RemoteObject& other);
124
125public: // implements Object
126 [[nodiscard]] ObjectId getId() const noexcept final;
127 [[nodiscard]] const std::string& getName() const noexcept final;
128 [[nodiscard]] ConstTypeHandle<ClassType> getClass() const noexcept override;
129 [[nodiscard]] const ProxyObject* asProxyObject() const noexcept final;
130 [[nodiscard]] ProxyObject* asProxyObject() noexcept final;
131 [[nodiscard]] RemoteObject* asRemoteObject() noexcept final;
132 [[nodiscard]] const RemoteObject* asRemoteObject() const noexcept final;
133 [[nodiscard]] const std::string& getLocalName() const noexcept final;
134 [[nodiscard]] TimeStamp getLastCommitTime() const noexcept final;
135 [[nodiscard]] Var getPropertyUntyped(const Property* prop) const final;
136 [[nodiscard]] ConnectionGuard onEventUntyped(const Event* ev, EventCallback<VarList>&& callback) final;
137 // NOLINTNEXTLINE
138 void invokeUntyped(const Method* method, const VarList& args, MethodCallback<Var>&& onDone = {}) final;
139 [[nodiscard]] ConnectionGuard onPropertyChangedUntyped(const Property* prop, EventCallback<VarList>&& callback) final;
140
141public: // implements ProxyObject
142 [[nodiscard]] bool isRemote() const noexcept final;
143
144public:
145 void initializeState(Span<const uint8_t> state, const TimeStamp& time);
146 std::weak_ptr<const kernel::impl::RemoteParticipant> getParticipant() const;
147 [[nodiscard]] MaybeConstTypeHandle<ClassType> getWriterSchema() const noexcept;
148
149protected:
150 friend class kernel::impl::RemoteParticipant;
151 friend class kernel::impl::ProxyManager;
152
153 [[nodiscard]] bool responseReceived(MethodResponseData& response);
154 void propertyUpdatesReceived(const Span<const uint8_t>& properties, TimeStamp commitTime);
155 virtual void eventReceived(MemberHash eventId, TimeStamp creationTime, const Span<const uint8_t>& args) = 0;
156 [[nodiscard]] virtual bool readPropertyFromStream(MemberHash id, InputStream& in, bool initialState) = 0;
157 [[nodiscard]] virtual Var senImplGetPropertyImpl(MemberHash propertyId) const = 0;
158 virtual void serializeMethodArgumentsFromVariants(MemberHash methodId,
159 const VarList& args,
160 OutputStream& out) const = 0;
161 [[nodiscard]] virtual Var deserializeMethodReturnValueAsVariant(MemberHash methodId, InputStream& in) const = 0;
162 void clearDestructionCallback() noexcept;
163 void senImplEventEmitted(MemberHash id, std::function<VarList()>&& argsGetter, const EventInfo& info) final;
164 void senImplRemoveUntypedConnection(ConnId id, MemberHash memberHash) final; // NOLINT
165
166 template <typename R, typename... Args>
167 void makeRemoteCall(MemberHash id, TransportMode transportMode, MethodCallback<R>&& callback, Args... args) const;
168
169 template <typename R, typename... Args>
170 void adaptAndMakeRemoteCall(MemberHash id,
171 const Method* readerMethod,
172 TransportMode transportMode,
173 MethodCallback<R>&& callback,
174 Args... args) const;
175
176 Result<CallId, std::string> sendCall(TransportMode mode,
177 ObjectOwnerId ownerId,
178 ObjectId receiverId,
179 MemberHash id,
180 MemBlockPtr&& args) const;
181
182 ConnId senImplMakeConnectionId() noexcept;
183
184 template <typename T>
185 void readOrAdapt(InputStream& in, T& val, MemberHash id, const Type& readerType, bool isSync);
186
187 [[nodiscard]] bool memberTypesInSync(MemberHash id) const noexcept;
188
189 [[nodiscard]] bool checkMemberTypesInSyncInDetail(MemberHash id) const noexcept;
190
191 template <typename... T>
192 void dispatchEventFromStream(const EventBuffer<T...>& eventBuffer,
193 bool inSync,
194 const Span<const Arg>& readerArgs,
195 MemberHash eventId,
196 TimeStamp creationTime,
197 ObjectId producerId,
198 TransportMode transportMode,
199 const Span<const uint8_t>& buffer,
200 Object* producer) const;
201
202 void invalidateTransport();
203
204private:
205 template <typename R>
206 [[nodiscard]] static inline MethodResult<R> readMethodCallReturnValue(const std::vector<uint8_t>& buffer);
207
208 template <typename R, typename F>
209 [[nodiscard]] static inline MethodResult<R> makeMethodResult(const MethodResponseData& response, F&& func);
210
211 template <size_t i = 0, typename... T>
212 void adaptEventArgs(std::tuple<T...>& args,
213 VarList& argsAsVariants,
214 const Span<const Arg>& readerArgs,
215 const Event* writerEvent) const;
216
217 static void logEventArgsWarning(const std::string& eventName,
218 size_t numOfWriterEventArgs,
219 size_t numOfReaderEventArgs);
220
221 virtual void cancelPendingCalls();
222
223private:
224 using MutexLock = std::scoped_lock<std::recursive_mutex>;
225 struct EventCallbackData
226 {
227 uint32_t id;
228 EventCallback<VarList> callback;
229 };
230
231 using EventCallbackList = std::vector<EventCallbackData>;
232
233 struct EventCallbackInfo
234 {
235 EventCallbackList list;
236 TransportMode transportMode;
237 };
238
239 using EventCallbackMap = std::unordered_map<MemberHash, EventCallbackInfo>;
240
241private:
242 using PendingResponseContainerType =
243 std::unordered_map<CallId, sen::std_util::move_only_function<void(MethodResponseData&&)>>;
244
245 mutable RemoteObjectInfo info_;
246 mutable PendingResponseContainerType pendingResponses_;
247 mutable std::recursive_mutex pendingResponsesMutex_;
248 std::atomic_uint32_t nextConnection_ = 1;
249 std::atomic<TimeStamp> lastCommitTime_;
250 std::unique_ptr<EventCallbackMap> eventCallbacks_;
251 std::recursive_mutex eventCallbacksMutex_;
252};
253
254//----------------------------------------------------------------------------------------------------------------------
255// Inline implementation
256//----------------------------------------------------------------------------------------------------------------------
257
258template <typename R, typename... Args>
259void RemoteObject::makeRemoteCall(MemberHash id,
260 TransportMode transportMode,
261 MethodCallback<R>&& callback,
262 Args... args) const
263{
264 // create a buffer containing the arguments
265 auto argsBuffer = std::make_shared<ResizableHeapBlock>();
266
267 if constexpr (sizeof...(Args) != 0U)
268 {
269 // resize buffer
270 argsBuffer->resize((... + SerializationTraits<Args>::serializedSize(args)));
271
272 BufferWriter writer(*argsBuffer);
273 OutputStream out(writer);
274
275 // serialize args
276 (SerializationTraits<Args>::write(out, args), ...);
277 }
278
279 // send the call and get a ticket
280 if (auto callId = sendCall(transportMode, info_.ownerId, info_.id, id, std::move(argsBuffer)); callId.isOk())
281 {
282 // if no callback, dismiss the response
283 if (auto cbLock = callback.lock(); cbLock.isValid())
284 {
285 // when receiving the response, add the callback to the work queue
286 auto responseHandler =
287 [cb = std::move(callback), transportMode, us = weak_from_this()](const auto& response) mutable
288 {
289 if (auto callbackLock = cb.lock(); callbackLock.isValid())
290 {
291 if (us.expired())
292 {
293 auto work = [cb = std::move(cb)]() mutable
294 { cb.invoke({}, ::sen::Err(std::make_exception_ptr(std::runtime_error("object deleted")))); };
295 callbackLock.pushAnswer(std::move(work), impl::cannotBeDropped(transportMode));
296 }
297 else
298 {
299 auto result =
300 makeMethodResult<R>(response, [](const auto& buffer) { return readMethodCallReturnValue<R>(buffer); });
301
302 auto work = [res = std::move(result), cb = std::move(cb)]() mutable { cb.invoke({}, res); };
303 callbackLock.pushAnswer(std::move(work), impl::cannotBeDropped(transportMode));
304 }
305 }
306 };
307
308 MutexLock pendingResponsesLock(pendingResponsesMutex_);
309 pendingResponses_.insert({callId.getValue(), std::move(responseHandler)});
310 }
311 }
312 else
313 {
314 // if no callback, dismiss the response
315 if (auto cbLock = callback.lock(); cbLock.isValid())
316 {
317 auto work = [cb = std::move(callback), err = callId.getError()]() mutable
318 { cb.invoke({}, ::sen::Err(std::make_exception_ptr(std::runtime_error(err)))); };
319 cbLock.pushAnswer(std::move(work), impl::cannotBeDropped(transportMode));
320 }
321 }
322}
323
324template <typename R, typename... Args>
325inline void RemoteObject::adaptAndMakeRemoteCall(MemberHash id,
326 const Method* readerMethod,
327 TransportMode transportMode,
328 MethodCallback<R>&& callback,
329 Args... args) const
330{
331 SEN_ASSERT(info_.writerSchema.has_value() && "WriterSchema is expected");
332 const auto* writerMethod = info_.writerSchema.value()->searchMethodById(id);
333 if (writerMethod == nullptr)
334 {
335 if (auto callbackLock = callback.lock(); callbackLock.isValid())
336 {
337 std::string err = "Cannot call remote method ";
338 err.append(readerMethod->getName());
339 err.append(" because it is not defined in the remote object.");
340
341 auto result = Err(std::make_exception_ptr(std::runtime_error(err)));
342
343 auto work = [res = std::move(err), callback = std::move(callback)]() mutable
344 { callback.invoke({}, Err(std::make_exception_ptr(std::runtime_error(res)))); };
345
346 callbackLock.pushAnswer(std::move(work), cannotBeDropped(transportMode));
347 }
348 return;
349 }
350
351 const auto& readerArgs = readerMethod->getArgs();
352 const auto& writerArgs = writerMethod->getArgs();
353 if (writerArgs.size() > readerArgs.size())
354 {
355 if (auto callbackLock = callback.lock(); callbackLock.isValid())
356 {
357 std::string err = "Cannot call remote method ";
358 err.append(readerMethod->getName());
359 err.append(". Remote method requires ");
360 err.append(std::to_string(writerArgs.size()));
361 err.append(" arguments. Local method specifies ");
362 err.append(std::to_string(readerArgs.size()));
363 err.append(" arguments.");
364
365 auto result = Err(std::make_exception_ptr(std::runtime_error(err)));
366 auto work = [res = std::move(err), callback = std::move(callback)]() mutable
367 { callback.invoke({}, Err(std::make_exception_ptr(std::runtime_error(res)))); };
368
369 callbackLock.pushAnswer(std::move(work), cannotBeDropped(transportMode));
370 }
371
372 return;
373 }
374
375 // create a buffer containing the arguments
376 auto argsBuffer = std::make_shared<ResizableHeapBlock>();
377
378 if (!readerArgs.empty())
379 {
380 ResizableBufferWriter writer(*argsBuffer);
381 OutputStream out(writer);
382 VarList argsAsVariants {toVariant(args)...};
383
384 for (const auto& arg: writerArgs)
385 {
386 const auto readerArgIndex = readerMethod->getArgIndexFromNameHash(arg.getNameHash());
387 auto& varArg = argsAsVariants.at(readerArgIndex);
388 writeToStream(varArg, out, *arg.type, readerMethod->getArgs()[readerArgIndex].type);
389 }
390 }
391
392 // send the call and get a ticket
393 auto callId = sendCall(transportMode, info_.ownerId, info_.id, id, std::move(argsBuffer));
394
395 // if no callback, dismiss the response
396 if (callback.lock().isValid())
397 {
398 // when receiving the response, add the callback to the work queue
399 auto responseHandler = [cb = std::move(callback),
400 writerReturnType = writerMethod->getReturnType(),
401 id,
402 us = shared_from_this(), // NOLINT
403 this,
404 transportMode](const auto& response) mutable
405 {
406 if (auto cbLock = cb.lock(); cbLock.isValid())
407 {
408 auto result = makeMethodResult<R>(response,
409 [writerReturnType, id, this](const auto& buffer)
410 {
411 if constexpr (std::is_void_v<R>)
412 {
413 std::ignore = buffer;
414 return ::sen::Ok();
415 }
416 else
417 {
418 InputStream in(buffer);
419 Var var {};
420 readFromStream(var, in, *writerReturnType);
421 std::ignore =
422 adaptVariant(*MetaTypeTrait<R>::meta(), var, writerReturnType);
423 return Ok(toValue<R>(var));
424 }
425 });
426
427 auto work = [res = std::move(result), cb = std::move(cb)]() mutable { cb.invoke({}, res); };
428 cbLock.pushAnswer(std::move(work), cannotBeDropped(transportMode));
429 }
430 };
431
432 MutexLock pendingResponsesLock(pendingResponsesMutex_);
433 pendingResponses_.try_emplace(callId.getValue(), std::move(responseHandler));
434 }
435}
436
437template <typename T>
438inline void RemoteObject::readOrAdapt(InputStream& in, T& val, MemberHash id, const Type& readerType, bool isSync)
439{
440 if (SEN_LIKELY(isSync))
441 {
442 SerializationTraits<T>::read(in, val);
443 return;
444 }
445
446 SEN_ASSERT(info_.writerSchema.has_value() && "WriterSchema is expected");
447 const auto* writerProp = info_.writerSchema.value()->searchPropertyById(id);
448 if (writerProp == nullptr)
449 {
450 std::string err = "Error while adapting the runtime type of a property from a remote object. The property ID ";
451 err.append(std::to_string(id.get()));
452 err.append(" cannot be found among the properties of the writer schema.");
454 }
455
456 Var var {};
457 readFromStream(var, in, *writerProp->getType());
458 std::ignore = adaptVariant(readerType, var, writerProp->getType());
459 VariantTraits<T>::variantToValue(var, val);
460}
461
462inline bool RemoteObject::memberTypesInSync(MemberHash id) const noexcept
463{
464 return SEN_LIKELY(!info_.writerSchema) || checkMemberTypesInSyncInDetail(id);
465}
466
467template <typename... T>
468inline void RemoteObject::dispatchEventFromStream(const EventBuffer<T...>& eventBuffer,
469 bool inSync,
470 const Span<const Arg>& readerArgs,
471 MemberHash eventId,
472 TimeStamp creationTime,
473 ObjectId producerId,
474 TransportMode transportMode,
475 const Span<const uint8_t>& buffer,
476 Object* producer) const
477{
478 if (SEN_LIKELY(inSync))
479 {
480 eventBuffer.dispatchFromStream(eventId, creationTime, producerId, transportMode, buffer, producer);
481 return;
482 }
483
484 SEN_ASSERT(info_.writerSchema.has_value() && "WriterSchema is expected");
485 const auto* writerEvent = info_.writerSchema.value()->searchEventById(eventId);
486 const auto& writerArgs = writerEvent->getArgs();
487
488 InputStream in(buffer);
489 VarList argsAsVariants {};
490 const auto& writerArgsSize = writerArgs.size();
491 const auto& readerArgsSize = readerArgs.size();
492 argsAsVariants.reserve(writerArgsSize);
493
494 // if the remote implementation has fewer arguments than the local implementation, the event cannot be dispatched
495 if (writerArgsSize < readerArgsSize)
496 {
497 logEventArgsWarning(writerEvent->getName().data(), writerArgsSize, readerArgsSize);
498 return;
499 }
500
501 for (const auto& arg: writerArgs)
502 {
503 argsAsVariants.emplace_back();
504 readFromStream(argsAsVariants.back(), in, *arg.type);
505 }
506
507 std::tuple<T...> argsNative {};
508 adaptEventArgs(argsNative, argsAsVariants, readerArgs, writerEvent);
509
510 // call emit with the tuple arguments
511 std::apply([&eventBuffer](auto&&... arg)
512 { eventBuffer.dispatch(std::forward<std::remove_reference_t<decltype(arg)>>(arg)...); },
513 std::tuple_cat(std::make_tuple(eventId, creationTime, producerId, transportMode, false, nullptr, producer),
514 std::move(argsNative)));
515}
516
517template <typename R>
518MethodResult<R> RemoteObject::readMethodCallReturnValue(const std::vector<uint8_t>& buffer)
519{
520 if constexpr (std::is_void_v<R>)
521 {
522 return ::sen::Ok();
523 }
524 else
525 {
526 InputStream in(buffer);
527
528 R returnVal {};
529 SerializationTraits<R>::read(in, returnVal);
530 return ::sen::Ok(returnVal);
531 }
532}
533
534template <typename R, typename F>
535MethodResult<R> RemoteObject::makeMethodResult(const MethodResponseData& response, F&& func)
536{
537 switch (response.result)
538 {
539 case RemoteCallResult::logicError:
540 return ::sen::Err(std::make_exception_ptr(std::logic_error(response.error)));
541
542 case RemoteCallResult::objectNotFound:
543 return ::sen::Err(std::make_exception_ptr(std::invalid_argument(response.error)));
544
545 case RemoteCallResult::runtimeError:
546 return ::sen::Err(std::make_exception_ptr(std::runtime_error(response.error)));
547
548 case RemoteCallResult::unknownException:
549 return ::sen::Err(std::make_exception_ptr(std::exception {}));
550
551 case RemoteCallResult::success:
552 return func(*response.returnValBuffer);
553
554 default:
556 }
557
558 SEN_ASSERT(false);
559}
560
561template <size_t i, typename... T>
562inline void RemoteObject::adaptEventArgs(std::tuple<T...>& argsNative,
563 VarList& argsAsVariants,
564 const Span<const Arg>& readerArgs,
565 const Event* writerEvent) const
566{
567 if constexpr (i == sizeof...(T))
568 {
569 return;
570 }
571 else
572 {
573 const auto writerArgIndex = writerEvent->getArgIndexFromNameHash(readerArgs[i].getNameHash());
574 auto& varArg = argsAsVariants.at(writerArgIndex);
575 std::ignore = adaptVariant(*readerArgs[i].type, varArg, writerEvent->getArgs()[writerArgIndex].type);
576 VariantTraits<std::remove_reference_t<decltype(std::get<i>(argsNative))>>::variantToValue(varArg,
577 std::get<i>(argsNative));
578
579 // next element of the tuple
580 adaptEventArgs<i + 1>(argsNative, argsAsVariants, readerArgs, writerEvent);
581 }
582}
583
584inline void RemoteObject::invalidateTransport()
585{
586 info_.sendCallFunc = nullptr;
587 cancelPendingCalls();
588}
589
590} // namespace impl
591} // namespace sen
592
593#endif // SEN_CORE_OBJ_DETAIL_REMOTE_OBJECT_H
The following macros implement a replacement of assert that is connected to the overall fault handlin...
impl::Err< void > Err() noexcept
If E is void, use the void specialization of Err.
Definition result.h:439
impl::Ok< void > Ok() noexcept
If T is void, use the void specialization of Ok.
Definition result.h:434
#define SEN_ASSERT(expr)
Checks an intermediate result produced by a procedure (not an input or output). NOLINTNEXTLINE.
Definition assert.h:39
void throwRuntimeError(const std::string &err)
Throws std::exception that attempts to collect the stack trace. We also wrap it to avoid including st...
InputStreamTemplate< LittleEndian > InputStream
Definition input_stream.h:84
Var toVariant(const T &val)
Definition core/include/sen/core/io/util.h:160
T toValue(const Var &var)
Definition core/include/sen/core/io/util.h:152
#define SEN_UNREACHABLE()
Definition compiler_macros.h:420
Result< R, std::exception_ptr > MethodResult
The result of a method (which can be an exception in case of error).
Definition callback.h:204
Callback< MethodCallInfo, MethodResult< R > > MethodCallback
A method callback.
Definition callback.h:212
std::vector< Var > VarList
A list of vars to represent sequences.
Definition var.h:104
std::optional< ConstTypeHandle< T > > MaybeConstTypeHandle
Definition type.h:322
TypeHandle< const T > ConstTypeHandle
Definition type.h:319
TransportMode
How to transport information.
Definition type.h:56
detail::MoveOnlyFunctionImpl< FwdArgs... > move_only_function
Definition move_only_function.h:256
Definition assert.h:17
OutputStreamTemplate< LittleEndian > OutputStream
Definition output_stream.h:64