Skip to content

Commit

Permalink
Merge branch 'master' into development/statecontrol-interface
Browse files Browse the repository at this point in the history
  • Loading branch information
VeithMetro authored Feb 10, 2025
2 parents ba1ce5c + 1cca70b commit 924aaa9
Showing 1 changed file with 52 additions and 20 deletions.
72 changes: 52 additions & 20 deletions Source/plugins/JSONRPC.h
Original file line number Diff line number Diff line change
Expand Up @@ -92,29 +92,34 @@ namespace PluginHost {
class Destination {
public:
Destination() = delete;
Destination(uint32_t channelId, const string& designator)
Destination(uint32_t channelId, const string& designator, const bool oneShot = false)
: _callback(nullptr)
, _channelId(channelId)
, _designator(designator) {
, _designator(designator)
, _oneShot(oneShot) {
}
Destination(IDispatcher::ICallback* callback, const string& designator)
: _callback(callback)
, _channelId(~0)
, _designator(designator) {
, _designator(designator)
, _oneShot(false) {
if (_callback != nullptr) {
_callback->AddRef();
}
}
Destination(Destination&& move) noexcept
: _callback(move._callback)
, _channelId(move._channelId)
, _designator(move._designator) {
, _designator(std::move(move._designator))
, _oneShot(move._oneShot) {
move._callback = nullptr;
move._channelId = ~0;
}
Destination(const Destination& copy)
: _callback(copy._callback)
, _channelId(copy._channelId)
, _designator(copy._designator) {
, _designator(copy._designator)
, _oneShot(copy._oneShot) {
if (_callback != nullptr) {
_callback->AddRef();
}
Expand All @@ -132,8 +137,9 @@ namespace PluginHost {
}
_callback = move._callback;
_channelId = move._channelId;
_designator = move._designator;
_designator = std::move(move._designator);
move._callback = nullptr;
move._channelId = ~0;
return (*this);
}
Destination& operator=(const Destination& copy)
Expand All @@ -144,6 +150,7 @@ namespace PluginHost {
_callback = copy._callback;
_channelId = copy._channelId;
_designator = copy._designator;
_oneShot = copy._oneShot;
if (_callback != nullptr) {
_callback->AddRef();
}
Expand All @@ -160,11 +167,15 @@ namespace PluginHost {
inline const string& Designator() const {
return (_designator);
}
inline bool IsOneShot() const {
return (_oneShot);
}

private:
IDispatcher::ICallback* _callback;
uint32_t _channelId;
string _designator;
bool _oneShot;
};
using Destinations = std::vector<Destination>;

Expand All @@ -176,7 +187,7 @@ namespace PluginHost {
: _designators() {
}
Observer(Observer&& move) noexcept
: _designators(move._designators) {
: _designators(std::move(move._designators)) {
}
Observer(const Observer& copy)
: _designators(copy._designators) {
Expand All @@ -187,7 +198,7 @@ namespace PluginHost {
bool IsEmpty() const {
return ( _designators.empty() );
}
uint32_t Subscribe(const uint32_t id, const string& designator) {
uint32_t Subscribe(const uint32_t id, const string& designator, const bool oneShot) {
uint32_t result = Core::ERROR_NONE;

Destinations::iterator index(_designators.begin());
Expand All @@ -196,7 +207,7 @@ namespace PluginHost {
}

if (index == _designators.end()) {
_designators.emplace_back(id, designator);
_designators.emplace_back(id, designator, oneShot);
}
else {
result = Core::ERROR_DUPLICATE_KEY;
Expand Down Expand Up @@ -278,7 +289,11 @@ namespace PluginHost {
}
}
void Event(JSONRPC& parent, const string event, const string& parameter, const SendIfMethod& sendifmethod) {
for (Destination& entry : _designators) {
Destinations::iterator index(_designators.begin());

while (index != _designators.end()) {
Destination& entry = (*index);

if (!sendifmethod || sendifmethod(entry.Designator())) {
if (entry.Callback() == nullptr) {
parent.Notify(entry.ChannelId(), entry.Designator() + '.' + event, parameter);
Expand All @@ -287,6 +302,13 @@ namespace PluginHost {
entry.Callback()->Event(event, entry.Designator(), parameter);
}
}

if (entry.IsOneShot() == true) {
index = _designators.erase(index);
}
else {
++index;
}
}
}

Expand Down Expand Up @@ -837,7 +859,8 @@ namespace PluginHost {
return (index == _handlers.end() ? nullptr : &(*index));
}

virtual uint32_t Subscribe(const uint32_t channelId, const string& eventId, const string& designator)
public:
virtual uint32_t Subscribe(const uint32_t channelId, const string& eventId, const string& designator, const bool oneShot = false)
{
uint32_t result;

Expand All @@ -851,7 +874,11 @@ namespace PluginHost {
std::forward_as_tuple()).first;
}

result = index->second.Subscribe(channelId, designator);
result = index->second.Subscribe(channelId, designator, oneShot);

if ((result != Core::ERROR_NONE) && (index->second.IsEmpty() == true)) {
_observers.erase(index);
}

_adminLock.Unlock();

Expand Down Expand Up @@ -907,10 +934,15 @@ namespace PluginHost {

_adminLock.Lock();

ObserverMap::const_iterator index = _observers.find(event);
ObserverMap::iterator index = _observers.find(event);

if (index != _observers.end()) {
const_cast<Observer&>(index->second).Event(const_cast<JSONRPC&>(*this), event, parameters, sendifmethod);
index->second.Event(const_cast<JSONRPC&>(*this), event, parameters, sendifmethod);

if (index->second.IsEmpty() == true) {
// A one-shot observer might've removed itself, so remove the event from being observed
_observers.erase(index);
}
}

// See if this is perhaps a registered alias for an event...
Expand All @@ -920,10 +952,10 @@ namespace PluginHost {
if (iter != _eventAliases.end()) {

for (const string& alias : iter->second) {
ObserverMap::const_iterator index = _observers.find(alias);
ObserverMap::iterator index = _observers.find(alias);

if (index != _observers.end()) {
const_cast<Observer&>(index->second).Event(const_cast<JSONRPC&>(*this), alias, parameters, sendifmethod);
index->second.Event(const_cast<JSONRPC&>(*this), alias, parameters, sendifmethod);
}
}
}
Expand Down Expand Up @@ -959,7 +991,7 @@ namespace PluginHost {
string _callsign;
TokenCheckFunction _validate;
VersionList _versions;
ObserverMap _observers;
mutable ObserverMap _observers;
EventAliasesMap _eventAliases;
Core::SinkType<Notification> _notification;
};
Expand Down Expand Up @@ -1021,15 +1053,15 @@ namespace PluginHost {
}

public:
uint32_t Subscribe(const uint32_t channel, const string& event, const string& clientId) override
uint32_t Subscribe(const uint32_t channel, const string& event, const string& clientId, const bool oneShot = false) override
{
Core::hresult result = Core::ERROR_PRIVILIGED_REQUEST;

_adminLock.Lock();

if ((_subscribeAssessor == nullptr) || (_subscribeAssessor(channel, event, clientId) == true)) {

result = JSONRPC::Subscribe(channel, event, clientId);
result = JSONRPC::Subscribe(channel, event, clientId, oneShot);

if (result == Core::ERROR_NONE) {
NotifyObservers(Core::JSONRPC::Message::Method(event), clientId, Status::registered);
Expand All @@ -1055,7 +1087,7 @@ namespace PluginHost {
return (result);
}

protected:
private:
void NotifyObservers(const string eventName, const string& client, const Status status) const
{
StatusCallbackMap::const_iterator it = _observers.find(eventName);
Expand Down

0 comments on commit 924aaa9

Please sign in to comment.