diff --git a/Source/plugins/JSONRPC.h b/Source/plugins/JSONRPC.h index 5886308cf..a87acaaf7 100644 --- a/Source/plugins/JSONRPC.h +++ b/Source/plugins/JSONRPC.h @@ -92,15 +92,17 @@ namespace PluginHost { class Destination { public: Destination() = delete; - Destination(uint32_t channelId, const string& designator) + Destination(uint32_t channelId, const string& designator, const bool oneShot = false) : _callback(nullptr) , _channelId(channelId) - , _designator(designator) { + , _designator(designator) + , _oneShot(oneShot) { } Destination(IDispatcher::ICallback* callback, const string& designator) : _callback(callback) , _channelId(~0) - , _designator(designator) { + , _designator(designator) + , _oneShot(false) { if (_callback != nullptr) { _callback->AddRef(); } @@ -108,13 +110,16 @@ namespace PluginHost { Destination(Destination&& move) noexcept : _callback(move._callback) , _channelId(move._channelId) - , _designator(move._designator) { + , _designator(std::move(move._designator)) + , _oneShot(move._oneShot) { move._callback = nullptr; + move._channelId = ~0; } Destination(const Destination& copy) : _callback(copy._callback) , _channelId(copy._channelId) - , _designator(copy._designator) { + , _designator(copy._designator) + , _oneShot(copy._oneShot) { if (_callback != nullptr) { _callback->AddRef(); } @@ -132,8 +137,9 @@ namespace PluginHost { } _callback = move._callback; _channelId = move._channelId; - _designator = move._designator; + _designator = std::move(move._designator); move._callback = nullptr; + move._channelId = ~0; return (*this); } Destination& operator=(const Destination& copy) @@ -144,6 +150,7 @@ namespace PluginHost { _callback = copy._callback; _channelId = copy._channelId; _designator = copy._designator; + _oneShot = copy._oneShot; if (_callback != nullptr) { _callback->AddRef(); } @@ -160,11 +167,15 @@ namespace PluginHost { inline const string& Designator() const { return (_designator); } + inline bool IsOneShot() const { + return (_oneShot); + } private: IDispatcher::ICallback* _callback; uint32_t _channelId; string _designator; + bool _oneShot; }; using Destinations = std::vector; @@ -176,7 +187,7 @@ namespace PluginHost { : _designators() { } Observer(Observer&& move) noexcept - : _designators(move._designators) { + : _designators(std::move(move._designators)) { } Observer(const Observer& copy) : _designators(copy._designators) { @@ -187,7 +198,7 @@ namespace PluginHost { bool IsEmpty() const { return ( _designators.empty() ); } - uint32_t Subscribe(const uint32_t id, const string& designator) { + uint32_t Subscribe(const uint32_t id, const string& designator, const bool oneShot) { uint32_t result = Core::ERROR_NONE; Destinations::iterator index(_designators.begin()); @@ -196,7 +207,7 @@ namespace PluginHost { } if (index == _designators.end()) { - _designators.emplace_back(id, designator); + _designators.emplace_back(id, designator, oneShot); } else { result = Core::ERROR_DUPLICATE_KEY; @@ -278,7 +289,11 @@ namespace PluginHost { } } void Event(JSONRPC& parent, const string event, const string& parameter, const SendIfMethod& sendifmethod) { - for (Destination& entry : _designators) { + Destinations::iterator index(_designators.begin()); + + while (index != _designators.end()) { + Destination& entry = (*index); + if (!sendifmethod || sendifmethod(entry.Designator())) { if (entry.Callback() == nullptr) { parent.Notify(entry.ChannelId(), entry.Designator() + '.' + event, parameter); @@ -287,6 +302,13 @@ namespace PluginHost { entry.Callback()->Event(event, entry.Designator(), parameter); } } + + if (entry.IsOneShot() == true) { + index = _designators.erase(index); + } + else { + ++index; + } } } @@ -837,7 +859,8 @@ namespace PluginHost { return (index == _handlers.end() ? nullptr : &(*index)); } - virtual uint32_t Subscribe(const uint32_t channelId, const string& eventId, const string& designator) + public: + virtual uint32_t Subscribe(const uint32_t channelId, const string& eventId, const string& designator, const bool oneShot = false) { uint32_t result; @@ -851,7 +874,11 @@ namespace PluginHost { std::forward_as_tuple()).first; } - result = index->second.Subscribe(channelId, designator); + result = index->second.Subscribe(channelId, designator, oneShot); + + if ((result != Core::ERROR_NONE) && (index->second.IsEmpty() == true)) { + _observers.erase(index); + } _adminLock.Unlock(); @@ -907,10 +934,15 @@ namespace PluginHost { _adminLock.Lock(); - ObserverMap::const_iterator index = _observers.find(event); + ObserverMap::iterator index = _observers.find(event); if (index != _observers.end()) { - const_cast(index->second).Event(const_cast(*this), event, parameters, sendifmethod); + index->second.Event(const_cast(*this), event, parameters, sendifmethod); + + if (index->second.IsEmpty() == true) { + // A one-shot observer might've removed itself, so remove the event from being observed + _observers.erase(index); + } } // See if this is perhaps a registered alias for an event... @@ -920,10 +952,10 @@ namespace PluginHost { if (iter != _eventAliases.end()) { for (const string& alias : iter->second) { - ObserverMap::const_iterator index = _observers.find(alias); + ObserverMap::iterator index = _observers.find(alias); if (index != _observers.end()) { - const_cast(index->second).Event(const_cast(*this), alias, parameters, sendifmethod); + index->second.Event(const_cast(*this), alias, parameters, sendifmethod); } } } @@ -959,7 +991,7 @@ namespace PluginHost { string _callsign; TokenCheckFunction _validate; VersionList _versions; - ObserverMap _observers; + mutable ObserverMap _observers; EventAliasesMap _eventAliases; Core::SinkType _notification; }; @@ -1021,7 +1053,7 @@ namespace PluginHost { } public: - uint32_t Subscribe(const uint32_t channel, const string& event, const string& clientId) override + uint32_t Subscribe(const uint32_t channel, const string& event, const string& clientId, const bool oneShot = false) override { Core::hresult result = Core::ERROR_PRIVILIGED_REQUEST; @@ -1029,7 +1061,7 @@ namespace PluginHost { if ((_subscribeAssessor == nullptr) || (_subscribeAssessor(channel, event, clientId) == true)) { - result = JSONRPC::Subscribe(channel, event, clientId); + result = JSONRPC::Subscribe(channel, event, clientId, oneShot); if (result == Core::ERROR_NONE) { NotifyObservers(Core::JSONRPC::Message::Method(event), clientId, Status::registered); @@ -1055,7 +1087,7 @@ namespace PluginHost { return (result); } - protected: + private: void NotifyObservers(const string eventName, const string& client, const Status status) const { StatusCallbackMap::const_iterator it = _observers.find(eventName);