Skip to content

Commit 415dc93

Browse files
jeblairStephenSorriaux
authored andcommitted
feat(core): add support for persistent recursive watches
ZooKeeper 3.6.0 added support for persistent, and persistent recursive watches. This adds the corresponding support to the Kazoo client class.
1 parent 6540c93 commit 415dc93

File tree

5 files changed

+383
-8
lines changed

5 files changed

+383
-8
lines changed

kazoo/client.py

+117
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
from kazoo.protocol.connection import ConnectionHandler
2626
from kazoo.protocol.paths import _prefix_root, normpath
2727
from kazoo.protocol.serialization import (
28+
AddWatch,
2829
Auth,
2930
CheckVersion,
3031
CloseInstance,
@@ -38,6 +39,7 @@
3839
SetACL,
3940
GetData,
4041
Reconfig,
42+
RemoveWatches,
4143
SetData,
4244
Sync,
4345
Transaction,
@@ -48,6 +50,8 @@
4850
KazooState,
4951
KeeperState,
5052
WatchedEvent,
53+
AddWatchMode,
54+
WatcherType,
5155
)
5256
from kazoo.retry import KazooRetry
5357
from kazoo.security import ACL, OPEN_ACL_UNSAFE
@@ -248,6 +252,8 @@ def __init__(
248252
self.state_listeners = set()
249253
self._child_watchers = defaultdict(set)
250254
self._data_watchers = defaultdict(set)
255+
self._persistent_watchers = defaultdict(set)
256+
self._persistent_recursive_watchers = defaultdict(set)
251257
self._reset()
252258
self.read_only = read_only
253259

@@ -416,8 +422,16 @@ def _reset_watchers(self):
416422
for data_watchers in self._data_watchers.values():
417423
watchers.extend(data_watchers)
418424

425+
for persistent_watchers in self._persistent_watchers.values():
426+
watchers.extend(persistent_watchers)
427+
428+
for pr_watchers in self._persistent_recursive_watchers.values():
429+
watchers.extend(pr_watchers)
430+
419431
self._child_watchers = defaultdict(set)
420432
self._data_watchers = defaultdict(set)
433+
self._persistent_watchers = defaultdict(set)
434+
self._persistent_recursive_watchers = defaultdict(set)
421435

422436
ev = WatchedEvent(EventType.NONE, self._state, None)
423437
for watch in watchers:
@@ -1644,8 +1658,111 @@ def reconfig_async(self, joining, leaving, new_members, from_config):
16441658

16451659
return async_result
16461660

1661+
def add_watch(self, path, watch, mode):
1662+
"""Add a watch.
1663+
1664+
This method adds persistent watches. Unlike the data and
1665+
child watches which may be set by calls to
1666+
:meth:`KazooClient.exists`, :meth:`KazooClient.get`, and
1667+
:meth:`KazooClient.get_children`, persistent watches are not
1668+
removed after being triggered.
1669+
1670+
To remove a persistent watch, use
1671+
:meth:`KazooClient.remove_all_watches` with an argument of
1672+
:attr:`~kazoo.protocol.states.WatcherType.ANY`.
1673+
1674+
The `mode` argument determines whether or not the watch is
1675+
recursive. To set a persistent watch, use
1676+
:class:`~kazoo.protocol.states.AddWatchMode.PERSISTENT`. To set a
1677+
persistent recursive watch, use
1678+
:class:`~kazoo.protocol.states.AddWatchMode.PERSISTENT_RECURSIVE`.
1679+
1680+
:param path: Path of node to watch.
1681+
:param watch: Watch callback to set for future changes
1682+
to this path.
1683+
:param mode: The mode to use.
1684+
:type mode: int
1685+
1686+
:raises:
1687+
:exc:`~kazoo.exceptions.MarshallingError` if mode is
1688+
unknown.
1689+
1690+
:exc:`~kazoo.exceptions.ZookeeperError` if the server
1691+
returns a non-zero error code.
1692+
"""
1693+
return self.add_watch_async(path, watch, mode).get()
1694+
1695+
def add_watch_async(self, path, watch, mode):
1696+
"""Asynchronously add a watch. Takes the same arguments as
1697+
:meth:`add_watch`.
1698+
"""
1699+
if not isinstance(path, str):
1700+
raise TypeError("Invalid type for 'path' (string expected)")
1701+
if not callable(watch):
1702+
raise TypeError("Invalid type for 'watch' (must be a callable)")
1703+
if not isinstance(mode, int):
1704+
raise TypeError("Invalid type for 'mode' (int expected)")
1705+
if mode not in (
1706+
AddWatchMode.PERSISTENT,
1707+
AddWatchMode.PERSISTENT_RECURSIVE,
1708+
):
1709+
raise ValueError("Invalid value for 'mode'")
1710+
1711+
async_result = self.handler.async_result()
1712+
self._call(
1713+
AddWatch(_prefix_root(self.chroot, path), watch, mode),
1714+
async_result,
1715+
)
1716+
return async_result
1717+
1718+
def remove_all_watches(self, path, watcher_type):
1719+
"""Remove watches from a path.
1720+
1721+
This removes all watches of a specified type (data, child,
1722+
any) from a given path.
1723+
1724+
The `watcher_type` argument specifies which type to use. It
1725+
may be one of:
1726+
1727+
* :attr:`~kazoo.protocol.states.WatcherType.DATA`
1728+
* :attr:`~kazoo.protocol.states.WatcherType.CHILDREN`
1729+
* :attr:`~kazoo.protocol.states.WatcherType.ANY`
1730+
1731+
To remove persistent watches, specify a watcher type of
1732+
:attr:`~kazoo.protocol.states.WatcherType.ANY`.
1733+
1734+
:param path: Path of watch to remove.
1735+
:param watcher_type: The type of watch to remove.
1736+
:type watcher_type: int
1737+
"""
1738+
1739+
return self.remove_all_watches_async(path, watcher_type).get()
1740+
1741+
def remove_all_watches_async(self, path, watcher_type):
1742+
"""Asynchronously remove watches. Takes the same arguments as
1743+
:meth:`remove_all_watches`.
1744+
"""
1745+
if not isinstance(path, str):
1746+
raise TypeError("Invalid type for 'path' (string expected)")
1747+
if not isinstance(watcher_type, int):
1748+
raise TypeError("Invalid type for 'watcher_type' (int expected)")
1749+
if watcher_type not in (
1750+
WatcherType.ANY,
1751+
WatcherType.CHILDREN,
1752+
WatcherType.DATA,
1753+
):
1754+
raise ValueError("Invalid value for 'watcher_type'")
1755+
1756+
async_result = self.handler.async_result()
1757+
self._call(
1758+
RemoveWatches(_prefix_root(self.chroot, path), watcher_type),
1759+
async_result,
1760+
)
1761+
return async_result
1762+
16471763

16481764
class TransactionRequest(object):
1765+
16491766
"""A Zookeeper Transaction Request
16501767
16511768
A Transaction provides a builder object that can be used to

kazoo/protocol/connection.py

+49-7
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
)
2121
from kazoo.loggingsupport import BLATHER
2222
from kazoo.protocol.serialization import (
23+
AddWatch,
2324
Auth,
2425
Close,
2526
Connect,
@@ -28,17 +29,20 @@
2829
GetChildren2,
2930
Ping,
3031
PingInstance,
32+
RemoveWatches,
3133
ReplyHeader,
3234
SASL,
3335
Transaction,
3436
Watch,
3537
int_struct,
3638
)
3739
from kazoo.protocol.states import (
40+
AddWatchMode,
3841
Callback,
3942
KeeperState,
4043
WatchedEvent,
4144
EVENT_TYPE_MAP,
45+
WatcherType,
4246
)
4347
from kazoo.retry import (
4448
ForceRetryError,
@@ -363,6 +367,18 @@ def _write(self, msg, timeout):
363367
raise ConnectionDropped("socket connection broken")
364368
sent += bytes_sent
365369

370+
def _find_persistent_recursive_watchers(self, path):
371+
parts = path.split("/")
372+
watchers = []
373+
for count in range(len(parts)):
374+
candidate = "/".join(parts[: count + 1])
375+
if not candidate:
376+
continue
377+
watchers.extend(
378+
self.client._persistent_recursive_watchers.get(candidate, [])
379+
)
380+
return watchers
381+
366382
def _read_watch_event(self, buffer, offset):
367383
client = self.client
368384
watch, offset = Watch.deserialize(buffer, offset)
@@ -374,9 +390,13 @@ def _read_watch_event(self, buffer, offset):
374390

375391
if watch.type in (CREATED_EVENT, CHANGED_EVENT):
376392
watchers.extend(client._data_watchers.pop(path, []))
393+
watchers.extend(client._persistent_watchers.get(path, []))
394+
watchers.extend(self._find_persistent_recursive_watchers(path))
377395
elif watch.type == DELETED_EVENT:
378396
watchers.extend(client._data_watchers.pop(path, []))
379397
watchers.extend(client._child_watchers.pop(path, []))
398+
watchers.extend(client._persistent_watchers.get(path, []))
399+
watchers.extend(self._find_persistent_recursive_watchers(path))
380400
elif watch.type == CHILD_EVENT:
381401
watchers.extend(client._child_watchers.pop(path, []))
382402
else:
@@ -448,13 +468,35 @@ def _read_response(self, header, buffer, offset):
448468

449469
async_object.set(response)
450470

451-
# Determine if watchers should be registered
452-
watcher = getattr(request, "watcher", None)
453-
if not client._stopped.is_set() and watcher:
454-
if isinstance(request, (GetChildren, GetChildren2)):
455-
client._child_watchers[request.path].add(watcher)
456-
else:
457-
client._data_watchers[request.path].add(watcher)
471+
# Determine if watchers should be registered or unregistered
472+
if not client._stopped.is_set():
473+
watcher = getattr(request, "watcher", None)
474+
if watcher:
475+
if isinstance(request, AddWatch):
476+
if request.mode == AddWatchMode.PERSISTENT:
477+
client._persistent_watchers[request.path].add(
478+
watcher
479+
)
480+
elif request.mode == AddWatchMode.PERSISTENT_RECURSIVE:
481+
client._persistent_recursive_watchers[
482+
request.path
483+
].add(watcher)
484+
elif isinstance(request, (GetChildren, GetChildren2)):
485+
client._child_watchers[request.path].add(watcher)
486+
else:
487+
client._data_watchers[request.path].add(watcher)
488+
if isinstance(request, RemoveWatches):
489+
if request.watcher_type == WatcherType.CHILDREN:
490+
client._child_watchers.pop(request.path, None)
491+
elif request.watcher_type == WatcherType.DATA:
492+
client._data_watchers.pop(request.path, None)
493+
elif request.watcher_type == WatcherType.ANY:
494+
client._child_watchers.pop(request.path, None)
495+
client._data_watchers.pop(request.path, None)
496+
client._persistent_watchers.pop(request.path, None)
497+
client._persistent_recursive_watchers.pop(
498+
request.path, None
499+
)
458500

459501
if isinstance(request, Close):
460502
self.logger.log(BLATHER, "Read close response")

kazoo/protocol/serialization.py

+28
Original file line numberDiff line numberDiff line change
@@ -416,6 +416,20 @@ def deserialize(cls, bytes, offset):
416416
return data, stat
417417

418418

419+
class RemoveWatches(namedtuple("RemoveWatches", "path watcher_type")):
420+
type = 18
421+
422+
def serialize(self):
423+
b = bytearray()
424+
b.extend(write_string(self.path))
425+
b.extend(int_struct.pack(self.watcher_type))
426+
return b
427+
428+
@classmethod
429+
def deserialize(cls, bytes, offset):
430+
return None
431+
432+
419433
class Auth(namedtuple("Auth", "auth_type scheme auth")):
420434
type = 100
421435

@@ -441,6 +455,20 @@ def deserialize(cls, bytes, offset):
441455
return challenge, offset
442456

443457

458+
class AddWatch(namedtuple("AddWatch", "path watcher mode")):
459+
type = 106
460+
461+
def serialize(self):
462+
b = bytearray()
463+
b.extend(write_string(self.path))
464+
b.extend(int_struct.pack(self.mode))
465+
return b
466+
467+
@classmethod
468+
def deserialize(cls, bytes, offset):
469+
return None
470+
471+
444472
class Watch(namedtuple("Watch", "type state path")):
445473
@classmethod
446474
def deserialize(cls, bytes, offset):

kazoo/protocol/states.py

+41
Original file line numberDiff line numberDiff line change
@@ -251,3 +251,44 @@ def data_length(self):
251251
@property
252252
def children_count(self):
253253
return self.numChildren
254+
255+
256+
class AddWatchMode(object):
257+
"""Modes for use with :meth:`~kazoo.client.KazooClient.add_watch`
258+
259+
.. attribute:: PERSISTENT
260+
261+
The watch is not removed when trigged.
262+
263+
.. attribute:: PERSISTENT_RECURSIVE
264+
265+
The watch is not removed when trigged, and applies to all
266+
paths underneath the supplied path as well.
267+
"""
268+
269+
PERSISTENT = 0
270+
PERSISTENT_RECURSIVE = 1
271+
272+
273+
class WatcherType(object):
274+
"""Watcher types for use with
275+
:meth:`~kazoo.client.KazooClient.remove_all_watches`
276+
277+
.. attribute:: CHILDREN
278+
279+
Child watches.
280+
281+
.. attribute:: DATA
282+
283+
Data watches.
284+
285+
.. attribute:: ANY
286+
287+
Any type of watch (child, data, persistent, or persistent
288+
recursive).
289+
290+
"""
291+
292+
CHILDREN = 1
293+
DATA = 2
294+
ANY = 3

0 commit comments

Comments
 (0)