Skip to content

Commit

Permalink
Merge pull request #1819 from Karthick-Somasundaresan/smartlink-crash…
Browse files Browse the repository at this point in the history
…-fix

Smartlink crash fix
  • Loading branch information
pwielders authored Jan 21, 2025
2 parents 305ee24 + f838812 commit 47f5bb4
Showing 1 changed file with 139 additions and 30 deletions.
169 changes: 139 additions & 30 deletions Source/websocket/JSONRPCLink.h
Original file line number Diff line number Diff line change
Expand Up @@ -460,14 +460,69 @@ namespace Thunder {
using PendingMap = std::unordered_map<uint32_t, Entry>;
using InvokeFunction = Core::JSONRPC::InvokeFunction;

class Handler {
public:
using HandlerMap = std::unordered_map<string, InvokeFunction>;

public:
Handler():_adminLock(),_invokeMap(){}
Handler(const Handler&) = delete;
Handler(Handler&&) = delete;
Handler& operator=(const Handler&) = delete;
Handler& operator=(Handler&&) = delete;
~Handler() = default;
void Register(const string& methodName, const InvokeFunction& lambda)
{
_adminLock.Lock();
auto retval = _invokeMap.emplace(std::piecewise_construct,
std::make_tuple(methodName),
std::make_tuple(lambda));

if ( retval.second == false ) {
retval.first->second = lambda;
}
_adminLock.Unlock();
}
void Unregister(const string& methodName)
{
_adminLock.Lock();
HandlerMap::iterator index = _invokeMap.find(methodName);

ASSERT((index != _invokeMap.end()) && _T("Do not unregister methods that are not registered!!!"));

if (index != _invokeMap.end()) {
_invokeMap.erase(index);
}
_adminLock.Unlock();
}

uint32_t Invoke(const Core::JSONRPC::Context& context, const string& method, const string& parameters, string& response)
{
uint32_t result = Core::ERROR_UNKNOWN_METHOD;

response.clear();

_adminLock.Lock();
HandlerMap::iterator index = _invokeMap.find(Core::JSONRPC::Message::Method(method));
if (index != _invokeMap.end()) {
result = index->second(context, method, parameters, response);
}
_adminLock.Unlock();
return (result);
}
private:
mutable Core::CriticalSection _adminLock;
HandlerMap _invokeMap;
};

protected:
static constexpr uint32_t DefaultWaitTime = 10000;

LinkType(const string& callsign, const string connectingCallsign, const TCHAR* localCallsign, const string& query)
: _adminLock()
, _connectId(RemoteNodeId())
, _channel(CommunicationChannel::Instance(_connectId, string("/jsonrpc/") + connectingCallsign, query))
, _handler({ DetermineVersion(callsign) })
, _handler()
, _callsign(callsign.empty() ? string() : Core::JSONRPC::Message::Callsign(callsign + '.'))
, _localSpace()
, _pendingQueue()
Expand Down Expand Up @@ -524,10 +579,7 @@ namespace Thunder {
{
return (_callsign);
}
Core::JSONRPC::Handler::EventIterator Events() const
{
return (_handler.Events());
}

template <typename INBOUND, typename METHOD>
void Assign(const string& eventName, const METHOD& method)
{
Expand Down Expand Up @@ -1035,6 +1087,7 @@ namespace Thunder {
std::forward_as_tuple(waitTime, response));
ASSERT(newElement.second == true);


if (newElement.second == true) {
uint64_t expiry = newElement.first->second.Expiry();
_adminLock.Unlock();
Expand All @@ -1053,7 +1106,6 @@ namespace Thunder {

_adminLock.Unlock();
}

return (result);
}
uint32_t Inbound(const Core::ProxyType<Core::JSONRPC::Message>& inbound)
Expand All @@ -1062,6 +1114,7 @@ namespace Thunder {

ASSERT(inbound.IsValid() == true);


if ((inbound->Id.IsSet() == true) && (inbound->Result.IsSet() || inbound->Error.IsSet())) {
// Looks like this is a response..
ASSERT(inbound->Parameters.IsSet() == false);
Expand Down Expand Up @@ -1153,7 +1206,7 @@ namespace Thunder {
Core::CriticalSection _adminLock;
Core::NodeId _connectId;
Core::ProxyType< CommunicationChannel > _channel;
Core::JSONRPC::Handler _handler;
Handler _handler;
string _callsign;
string _localSpace;
PendingMap _pendingQueue;
Expand All @@ -1174,6 +1227,21 @@ namespace Thunder {
class Connection : public LinkType<INTERFACE> {
private:
using Base = LinkType<INTERFACE>;
class EventSubscriber: public Core::Thread {
public:
EventSubscriber () = delete;
EventSubscriber(const EventSubscriber&) = delete;
EventSubscriber& operator=(const EventSubscriber&) = delete;
EventSubscriber(Connection& parent):Thread(Thunder::Core::Thread::DefaultStackSize(), _T("SmartLinkTypeEventSubscriber")), _parent(parent){}
~EventSubscriber() = default;
uint32_t Worker() override
{
_parent.SubscribeEvents();
return Core::infinite;
}
private:
Connection& _parent;
};
public:
static constexpr uint32_t DefaultWaitTime = Base::DefaultWaitTime;
private:
Expand Down Expand Up @@ -1253,6 +1321,9 @@ namespace Thunder {
: Base(callsign, string(), localCallsign, query)
, _monitor(string(), false)
, _parent(parent)
, _adminLock()
, _subscriptions()
, _eventSubscriber(*this)
, _state(UNKNOWN)
{
_monitor.template Assign<Statechange>(_T("statechange"), &Connection::state_change, this);
Expand All @@ -1261,30 +1332,77 @@ namespace Thunder {
~Connection() override
{
_monitor.Revoke(_T("statechange"));
_eventSubscriber.Stop();
_eventSubscriber.Wait(Core::Thread::STOPPED, Core::infinite);
}

public:
bool IsActivated()
{
return (_state == ACTIVATED);
}
void SubscribeEvents() {
_adminLock.Lock();
for (const string& iter: _subscriptions) {
SendSubscribeRequest(iter);
}
_adminLock.Unlock();
_eventSubscriber.Block();
_state = state::ACTIVATED;
_parent.StateChange();
}

template <typename INBOUND, typename METHOD>
uint32_t Subscribe(const uint32_t waitTime, const string& eventName, const METHOD& method)
{
auto result = Base::template Subscribe<INBOUND, METHOD>(waitTime, eventName, method);
if (result == Core::ERROR_NONE) {
_adminLock.Lock();
_subscriptions.insert(string(eventName));
_adminLock.Unlock();
}
return result;
}
template <typename INBOUND, typename METHOD, typename REALOBJECT>
uint32_t Subscribe(const uint32_t waitTime, const string& eventName, const METHOD& method, REALOBJECT* objectPtr)
{
auto result = Base::template Subscribe<INBOUND, METHOD, REALOBJECT>(waitTime, eventName, method, objectPtr);
if (result == Core::ERROR_NONE) {
_adminLock.Lock();
_subscriptions.insert(string(eventName));
_adminLock.Unlock();
}
return result;
}
void Unsubscribe(const uint32_t waitTime, const string& eventName)
{
_adminLock.Lock();
ASSERT(_subscriptions.find(eventName) != _subscriptions.end());
auto iter = _subscriptions.erase(eventName);
_adminLock.Unlock();
return Base::Unsubscribe(waitTime, eventName);
}


private:
uint32_t SendSubscribeRequest(const string& eventName) {
uint32_t retVal = Core::ERROR_UNAVAILABLE;
Core::JSONRPC::Message response;
const string parameters("{ \"event\": \"" + eventName + "\", \"id\": \"" + Base::Namespace() + "\"}");
auto result = Base::template Invoke<string>(DefaultWaitTime, "register", parameters, response);
if (result == Core::ERROR_NONE && response.Error.IsSet() != true) {
retVal = Core::ERROR_NONE;
}
return retVal;

}
void SetState(const JSONRPC::JSONPluginState value)
{
if (value == JSONRPC::JSONPluginState::ACTIVATED) {
if ((_state != ACTIVATED) && (_state != LOADING)) {
_state = state::LOADING;
auto index(Base::Events());
while (index.Next() == true) {
_events.push_back(index.Event());
}
next_event(Core::JSON::String(), nullptr);
}
else if (_state == LOADING) {
_state = state::ACTIVATED;
_parent.StateChange();

_eventSubscriber.Run();

}
}
else if (value == JSONRPC::JSONPluginState::DEACTIVATED) {
Expand Down Expand Up @@ -1315,18 +1433,7 @@ namespace Thunder {
_monitor.template Dispatch<void>(DefaultWaitTime, method, &Connection::monitor_response, this);
}
}
void next_event(const Core::JSON::String& /* parameters */, const Core::JSONRPC::Error* /* result */)
{
// See if there are events pending for registration...
if (_events.empty() == false) {
const string parameters("{ \"event\": \"" + _events.front() + "\", \"id\": \"" + Base::Namespace() + "\"}");
_events.pop_front();
LinkType<INTERFACE>::Dispatch(DefaultWaitTime, _T("register"), parameters, &Connection::next_event, this);
}
else {
SetState(JSONRPC::JSONPluginState::ACTIVATED);
}
}


void Opened() override
{
Expand All @@ -1339,7 +1446,9 @@ namespace Thunder {
private:
LinkType<INTERFACE> _monitor;
SmartLinkType<INTERFACE>& _parent;
std::list<string> _events;
Core::CriticalSection _adminLock;
std::unordered_set<string> _subscriptions;
EventSubscriber _eventSubscriber;
state _state;
};

Expand Down

0 comments on commit 47f5bb4

Please sign in to comment.