8#ifndef SEN_CORE_OBJ_DETAIL_REMOTE_OBJECT_H
9#define SEN_CORE_OBJ_DETAIL_REMOTE_OBJECT_H
50#include <unordered_map>
61class RemoteParticipant;
70using CallId = uint32_t;
74 std::function<Result<CallId, std::string>(
TransportMode, ObjectOwnerId, ObjectId, MemberHash, MemBlockPtr&& args)>;
77enum class RemoteCallResult : uint8_t
87struct MethodResponseData
90 std::shared_ptr<std::vector<uint8_t>> returnValBuffer;
91 RemoteCallResult result;
99struct RemoteObjectInfo
104 WorkQueue* workQueue =
nullptr;
105 SendCallFunc sendCallFunc;
106 std::string localName;
107 std::function<void(RemoteObject*)> destructionCallback =
nullptr;
108 ObjectOwnerId ownerId;
109 std::weak_ptr<kernel::impl::RemoteParticipant> remoteParticipant;
114class SEN_EXPORT RemoteObject:
public ProxyObject
116 SEN_NOCOPY_NOMOVE(RemoteObject)
119 explicit RemoteObject(RemoteObjectInfo&& info);
120 ~RemoteObject()
override;
123 virtual void copyStateFrom(
const RemoteObject& other);
126 [[nodiscard]] ObjectId getId() const noexcept final;
127 [[nodiscard]] const std::
string& getName() const noexcept final;
128 [[nodiscard]] ConstTypeHandle<ClassType> getClass() const noexcept override;
129 [[nodiscard]] const ProxyObject* asProxyObject() const noexcept final;
130 [[nodiscard]] ProxyObject* asProxyObject() noexcept final;
131 [[nodiscard]] RemoteObject* asRemoteObject() noexcept final;
132 [[nodiscard]] const RemoteObject* asRemoteObject() const noexcept final;
133 [[nodiscard]] const std::
string& getLocalName() const noexcept final;
134 [[nodiscard]] TimeStamp getLastCommitTime() const noexcept final;
135 [[nodiscard]] Var getPropertyUntyped(const Property* prop) const final;
136 [[nodiscard]] ConnectionGuard onEventUntyped(const Event* ev, EventCallback<VarList>&& callback) final;
138 void invokeUntyped(const Method* method, const VarList& args, MethodCallback<Var>&& onDone = {})
final;
139 [[nodiscard]] ConnectionGuard onPropertyChangedUntyped(
const Property* prop, EventCallback<VarList>&& callback)
final;
142 [[nodiscard]]
bool isRemote() const noexcept final;
145 void initializeState(Span<const uint8_t> state, const TimeStamp& time);
146 std::weak_ptr<const kernel::impl::RemoteParticipant> getParticipant() const;
147 [[nodiscard]] MaybeConstTypeHandle<ClassType> getWriterSchema() const noexcept;
150 friend class kernel::impl::RemoteParticipant;
151 friend class kernel::impl::ProxyManager;
153 [[nodiscard]]
bool responseReceived(MethodResponseData& response);
154 void propertyUpdatesReceived(const Span<const uint8_t>& properties, TimeStamp commitTime);
155 virtual
void eventReceived(MemberHash eventId, TimeStamp creationTime, const Span<const uint8_t>& args) = 0;
156 [[nodiscard]] virtual
bool readPropertyFromStream(MemberHash
id, InputStream& in,
bool initialState) = 0;
157 [[nodiscard]] virtual Var senImplGetPropertyImpl(MemberHash propertyId) const = 0;
158 virtual
void serializeMethodArgumentsFromVariants(MemberHash methodId,
160 OutputStream& out) const = 0;
161 [[nodiscard]] virtual Var deserializeMethodReturnValueAsVariant(MemberHash methodId, InputStream& in) const = 0;
162 void clearDestructionCallback() noexcept;
163 void senImplEventEmitted(MemberHash
id, std::function<VarList()>&& argsGetter, const EventInfo& info) final;
164 void senImplRemoveUntypedConnection(ConnId
id, MemberHash memberHash) final;
166 template <typename R, typename... Args>
167 void makeRemoteCall(MemberHash
id, TransportMode transportMode, MethodCallback<R>&& callback, Args... args) const;
169 template <typename R, typename... Args>
170 void adaptAndMakeRemoteCall(MemberHash
id,
171 const Method* readerMethod,
172 TransportMode transportMode,
173 MethodCallback<R>&& callback,
176 Result<CallId, std::
string> sendCall(TransportMode mode,
177 ObjectOwnerId ownerId,
180 MemBlockPtr&& args) const;
182 ConnId senImplMakeConnectionId() noexcept;
184 template <typename T>
185 void readOrAdapt(InputStream& in, T& val, MemberHash
id, const Type& readerType,
bool isSync);
187 [[nodiscard]]
bool memberTypesInSync(MemberHash
id) const noexcept;
189 [[nodiscard]]
bool checkMemberTypesInSyncInDetail(MemberHash
id) const noexcept;
191 template <typename... T>
192 void dispatchEventFromStream(const EventBuffer<T...>& eventBuffer,
194 const Span<const Arg>& readerArgs,
196 TimeStamp creationTime,
198 TransportMode transportMode,
199 const Span<const uint8_t>& buffer,
200 Object* producer) const;
202 void invalidateTransport();
205 template <typename R>
206 [[nodiscard]] static inline MethodResult<R> readMethodCallReturnValue(const std::vector<uint8_t>& buffer);
208 template <typename R, typename F>
209 [[nodiscard]] static inline MethodResult<R> makeMethodResult(const MethodResponseData& response, F&& func);
211 template <
size_t i = 0, typename... T>
212 void adaptEventArgs(std::tuple<T...>& args,
213 VarList& argsAsVariants,
214 const Span<const Arg>& readerArgs,
215 const Event* writerEvent) const;
217 static
void logEventArgsWarning(const std::
string& eventName,
218 size_t numOfWriterEventArgs,
219 size_t numOfReaderEventArgs);
221 virtual
void cancelPendingCalls();
224 using MutexLock = std::scoped_lock<std::recursive_mutex>;
225 struct EventCallbackData
228 EventCallback<VarList> callback;
231 using EventCallbackList = std::vector<EventCallbackData>;
233 struct EventCallbackInfo
235 EventCallbackList list;
239 using EventCallbackMap = std::unordered_map<MemberHash, EventCallbackInfo>;
242 using PendingResponseContainerType =
245 mutable RemoteObjectInfo info_;
246 mutable PendingResponseContainerType pendingResponses_;
247 mutable std::recursive_mutex pendingResponsesMutex_;
248 std::atomic_uint32_t nextConnection_ = 1;
249 std::atomic<TimeStamp> lastCommitTime_;
250 std::unique_ptr<EventCallbackMap> eventCallbacks_;
251 std::recursive_mutex eventCallbacksMutex_;
258template <
typename R,
typename... Args>
259void RemoteObject::makeRemoteCall(MemberHash
id,
265 auto argsBuffer = std::make_shared<ResizableHeapBlock>();
267 if constexpr (
sizeof...(Args) != 0U)
270 argsBuffer->resize((... + SerializationTraits<Args>::serializedSize(args)));
272 BufferWriter writer(*argsBuffer);
276 (SerializationTraits<Args>::write(out, args), ...);
280 if (
auto callId = sendCall(transportMode, info_.ownerId, info_.id,
id, std::move(argsBuffer)); callId.isOk())
283 if (
auto cbLock = callback.lock(); cbLock.isValid())
286 auto responseHandler =
287 [cb = std::move(callback), transportMode, us = weak_from_this()](
const auto& response)
mutable
289 if (
auto callbackLock = cb.lock(); callbackLock.isValid())
293 auto work = [cb = std::move(cb)]()
mutable
294 { cb.invoke({},
::sen::Err(std::make_exception_ptr(std::runtime_error(
"object deleted")))); };
295 callbackLock.pushAnswer(std::move(work), impl::cannotBeDropped(transportMode));
300 makeMethodResult<R>(response, [](
const auto& buffer) {
return readMethodCallReturnValue<R>(buffer); });
302 auto work = [res = std::move(result), cb = std::move(cb)]()
mutable { cb.invoke({}, res); };
303 callbackLock.pushAnswer(std::move(work), impl::cannotBeDropped(transportMode));
308 MutexLock pendingResponsesLock(pendingResponsesMutex_);
309 pendingResponses_.insert({callId.getValue(), std::move(responseHandler)});
315 if (
auto cbLock = callback.lock(); cbLock.isValid())
317 auto work = [cb = std::move(callback), err = callId.getError()]()
mutable
318 { cb.invoke({},
::sen::Err(std::make_exception_ptr(std::runtime_error(err)))); };
319 cbLock.pushAnswer(std::move(work), impl::cannotBeDropped(transportMode));
324template <
typename R,
typename... Args>
325inline void RemoteObject::adaptAndMakeRemoteCall(MemberHash
id,
326 const Method* readerMethod,
331 SEN_ASSERT(info_.writerSchema.has_value() &&
"WriterSchema is expected");
332 const auto* writerMethod = info_.writerSchema.value()->searchMethodById(
id);
333 if (writerMethod ==
nullptr)
335 if (
auto callbackLock = callback.lock(); callbackLock.isValid())
337 std::string err =
"Cannot call remote method ";
338 err.append(readerMethod->getName());
339 err.append(
" because it is not defined in the remote object.");
341 auto result =
Err(std::make_exception_ptr(std::runtime_error(err)));
343 auto work = [res = std::move(err), callback = std::move(callback)]()
mutable
344 { callback.invoke({},
Err(std::make_exception_ptr(std::runtime_error(res)))); };
346 callbackLock.pushAnswer(std::move(work), cannotBeDropped(transportMode));
351 const auto& readerArgs = readerMethod->getArgs();
352 const auto& writerArgs = writerMethod->getArgs();
353 if (writerArgs.size() > readerArgs.size())
355 if (
auto callbackLock = callback.lock(); callbackLock.isValid())
357 std::string err =
"Cannot call remote method ";
358 err.append(readerMethod->getName());
359 err.append(
". Remote method requires ");
360 err.append(std::to_string(writerArgs.size()));
361 err.append(
" arguments. Local method specifies ");
362 err.append(std::to_string(readerArgs.size()));
363 err.append(
" arguments.");
365 auto result =
Err(std::make_exception_ptr(std::runtime_error(err)));
366 auto work = [res = std::move(err), callback = std::move(callback)]()
mutable
367 { callback.invoke({},
Err(std::make_exception_ptr(std::runtime_error(res)))); };
369 callbackLock.pushAnswer(std::move(work), cannotBeDropped(transportMode));
376 auto argsBuffer = std::make_shared<ResizableHeapBlock>();
378 if (!readerArgs.empty())
380 ResizableBufferWriter writer(*argsBuffer);
384 for (
const auto& arg: writerArgs)
386 const auto readerArgIndex = readerMethod->getArgIndexFromNameHash(arg.getNameHash());
387 auto& varArg = argsAsVariants.at(readerArgIndex);
388 writeToStream(varArg, out, *arg.type, readerMethod->getArgs()[readerArgIndex].type);
393 auto callId = sendCall(transportMode, info_.ownerId, info_.id,
id, std::move(argsBuffer));
396 if (callback.lock().isValid())
399 auto responseHandler = [cb = std::move(callback),
400 writerReturnType = writerMethod->getReturnType(),
402 us = shared_from_this(),
404 transportMode](
const auto& response)
mutable
406 if (
auto cbLock = cb.lock(); cbLock.isValid())
408 auto result = makeMethodResult<R>(response,
409 [writerReturnType,
id,
this](
const auto& buffer)
411 if constexpr (std::is_void_v<R>)
413 std::ignore = buffer;
420 readFromStream(var, in, *writerReturnType);
422 adaptVariant(*MetaTypeTrait<R>::meta(), var, writerReturnType);
427 auto work = [res = std::move(result), cb = std::move(cb)]()
mutable { cb.invoke({}, res); };
428 cbLock.pushAnswer(std::move(work), cannotBeDropped(transportMode));
432 MutexLock pendingResponsesLock(pendingResponsesMutex_);
433 pendingResponses_.try_emplace(callId.getValue(), std::move(responseHandler));
438inline void RemoteObject::readOrAdapt(
InputStream& in, T& val, MemberHash
id,
const Type& readerType,
bool isSync)
440 if (SEN_LIKELY(isSync))
442 SerializationTraits<T>::read(in, val);
446 SEN_ASSERT(info_.writerSchema.has_value() &&
"WriterSchema is expected");
447 const auto* writerProp = info_.writerSchema.value()->searchPropertyById(
id);
448 if (writerProp ==
nullptr)
450 std::string err =
"Error while adapting the runtime type of a property from a remote object. The property ID ";
451 err.append(std::to_string(
id.get()));
452 err.append(
" cannot be found among the properties of the writer schema.");
457 readFromStream(var, in, *writerProp->getType());
458 std::ignore = adaptVariant(readerType, var, writerProp->getType());
459 VariantTraits<T>::variantToValue(var, val);
462inline bool RemoteObject::memberTypesInSync(MemberHash
id)
const noexcept
464 return SEN_LIKELY(!info_.writerSchema) || checkMemberTypesInSyncInDetail(
id);
467template <
typename... T>
468inline void RemoteObject::dispatchEventFromStream(
const EventBuffer<T...>& eventBuffer,
470 const Span<const Arg>& readerArgs,
472 TimeStamp creationTime,
475 const Span<const uint8_t>& buffer,
476 Object* producer)
const
478 if (SEN_LIKELY(inSync))
480 eventBuffer.dispatchFromStream(eventId, creationTime, producerId, transportMode, buffer, producer);
484 SEN_ASSERT(info_.writerSchema.has_value() &&
"WriterSchema is expected");
485 const auto* writerEvent = info_.writerSchema.value()->searchEventById(eventId);
486 const auto& writerArgs = writerEvent->getArgs();
490 const auto& writerArgsSize = writerArgs.size();
491 const auto& readerArgsSize = readerArgs.size();
492 argsAsVariants.reserve(writerArgsSize);
495 if (writerArgsSize < readerArgsSize)
497 logEventArgsWarning(writerEvent->getName().data(), writerArgsSize, readerArgsSize);
501 for (
const auto& arg: writerArgs)
503 argsAsVariants.emplace_back();
504 readFromStream(argsAsVariants.back(), in, *arg.type);
507 std::tuple<T...> argsNative {};
508 adaptEventArgs(argsNative, argsAsVariants, readerArgs, writerEvent);
511 std::apply([&eventBuffer](
auto&&... arg)
512 { eventBuffer.dispatch(std::forward<std::remove_reference_t<
decltype(arg)>>(arg)...); },
513 std::tuple_cat(std::make_tuple(eventId, creationTime, producerId, transportMode,
false,
nullptr, producer),
514 std::move(argsNative)));
518MethodResult<R> RemoteObject::readMethodCallReturnValue(
const std::vector<uint8_t>& buffer)
520 if constexpr (std::is_void_v<R>)
529 SerializationTraits<R>::read(in, returnVal);
530 return ::sen::Ok(returnVal);
534template <
typename R,
typename F>
535MethodResult<R> RemoteObject::makeMethodResult(
const MethodResponseData& response, F&& func)
537 switch (response.result)
539 case RemoteCallResult::logicError:
540 return ::sen::Err(std::make_exception_ptr(std::logic_error(response.error)));
542 case RemoteCallResult::objectNotFound:
543 return ::sen::Err(std::make_exception_ptr(std::invalid_argument(response.error)));
545 case RemoteCallResult::runtimeError:
546 return ::sen::Err(std::make_exception_ptr(std::runtime_error(response.error)));
548 case RemoteCallResult::unknownException:
549 return ::sen::Err(std::make_exception_ptr(std::exception {}));
551 case RemoteCallResult::success:
552 return func(*response.returnValBuffer);
561template <
size_t i,
typename... T>
562inline void RemoteObject::adaptEventArgs(std::tuple<T...>& argsNative,
564 const Span<const Arg>& readerArgs,
565 const Event* writerEvent)
const
567 if constexpr (i ==
sizeof...(T))
573 const auto writerArgIndex = writerEvent->getArgIndexFromNameHash(readerArgs[i].getNameHash());
574 auto& varArg = argsAsVariants.at(writerArgIndex);
575 std::ignore = adaptVariant(*readerArgs[i].type, varArg, writerEvent->getArgs()[writerArgIndex].type);
576 VariantTraits<std::remove_reference_t<decltype(std::get<i>(argsNative))>>::variantToValue(varArg,
577 std::get<i>(argsNative));
580 adaptEventArgs<i + 1>(argsNative, argsAsVariants, readerArgs, writerEvent);
584inline void RemoteObject::invalidateTransport()
586 info_.sendCallFunc =
nullptr;
587 cancelPendingCalls();
The following macros implement a replacement of assert that is connected to the overall fault handlin...
impl::Err< void > Err() noexcept
If E is void, use the void specialization of Err.
Definition result.h:439
impl::Ok< void > Ok() noexcept
If T is void, use the void specialization of Ok.
Definition result.h:434
#define SEN_ASSERT(expr)
Checks an intermediate result produced by a procedure (not an input or output). NOLINTNEXTLINE.
Definition assert.h:39
void throwRuntimeError(const std::string &err)
Throws std::exception that attempts to collect the stack trace. We also wrap it to avoid including st...
InputStreamTemplate< LittleEndian > InputStream
Definition input_stream.h:84
Var toVariant(const T &val)
Definition core/include/sen/core/io/util.h:160
T toValue(const Var &var)
Definition core/include/sen/core/io/util.h:152
#define SEN_UNREACHABLE()
Definition compiler_macros.h:420
Result< R, std::exception_ptr > MethodResult
The result of a method (which can be an exception in case of error).
Definition callback.h:204
Callback< MethodCallInfo, MethodResult< R > > MethodCallback
A method callback.
Definition callback.h:212
std::vector< Var > VarList
A list of vars to represent sequences.
Definition var.h:104
std::optional< ConstTypeHandle< T > > MaybeConstTypeHandle
Definition type.h:322
TypeHandle< const T > ConstTypeHandle
Definition type.h:319
TransportMode
How to transport information.
Definition type.h:56
detail::MoveOnlyFunctionImpl< FwdArgs... > move_only_function
Definition move_only_function.h:256
OutputStreamTemplate< LittleEndian > OutputStream
Definition output_stream.h:64