Skip to content

Commit c818e44

Browse files
authored
ref(rules): Add delete methods to Redis Buffer (#68837)
We need a way to delete things we've already processed from the buffer. This PR adds `hdel` (hash delete) and `zremrangebyscore` methods to the Redis buffer that will later be used in `apply_delayed`, as well as replaces `sadd` with `zadd`, replaces `smembers` with `zrange` so we can remove project ids by the time they were added.
1 parent 98b71a8 commit c818e44

File tree

2 files changed

+90
-11
lines changed

2 files changed

+90
-11
lines changed

src/sentry/buffer/redis.py

Lines changed: 25 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -84,10 +84,12 @@ def callback(self, buffer_hook_event: BufferHookEvent, data: RedisBuffer) -> boo
8484

8585

8686
class RedisOperation(Enum):
87-
SET_ADD = "sadd"
88-
SET_GET = "smembers"
87+
SORTED_SET_ADD = "zadd"
88+
SORTED_SET_GET_RANGE = "zrange"
89+
SORTED_SET_DELETE_RANGE = "zremrangebyscore"
8990
HASH_ADD = "hset"
9091
HASH_GET_ALL = "hgetall"
92+
HASH_DELETE = "hdel"
9193

9294

9395
class PendingBuffer:
@@ -263,19 +265,35 @@ def get_redis_connection(self, key: str, transaction: bool = True) -> Pipeline:
263265
pipe = conn.pipeline(transaction=transaction)
264266
return pipe
265267

266-
def _execute_redis_operation(self, key: str, operation: RedisOperation, *args: Any) -> Any:
268+
def _execute_redis_operation(
269+
self, key: str, operation: RedisOperation, *args: Any, **kwargs: Any
270+
) -> Any:
267271
pending_key = self._make_pending_key_from_key(key)
268272
pipe = self.get_redis_connection(pending_key)
269-
getattr(pipe, operation.value)(key, *args)
273+
getattr(pipe, operation.value)(key, *args, **kwargs)
270274
if args:
271275
pipe.expire(key, self.key_expire)
272276
return pipe.execute()
273277

274-
def push_to_set(self, key: str, value: list[int] | int) -> None:
275-
self._execute_redis_operation(key, RedisOperation.SET_ADD, value)
278+
def push_to_sorted_set(self, key: str, value: list[int] | int) -> None:
279+
self._execute_redis_operation(key, RedisOperation.SORTED_SET_ADD, {value: time()})
276280

277281
def get_set(self, key: str) -> list[set[int]]:
278-
return self._execute_redis_operation(key, RedisOperation.SET_GET)
282+
return self._execute_redis_operation(
283+
key, RedisOperation.SORTED_SET_GET_RANGE, start=0, end=-1, withscores=True
284+
)
285+
286+
def delete_key(self, key: str, min: int, max: int) -> None:
287+
self._execute_redis_operation(key, RedisOperation.SORTED_SET_DELETE_RANGE, min=min, max=max)
288+
289+
def delete_hash(
290+
self,
291+
model: type[models.Model],
292+
filters: dict[str, models.Model | str | int],
293+
field: str,
294+
) -> None:
295+
key = self._make_key(model, filters)
296+
self._execute_redis_operation(key, RedisOperation.HASH_DELETE, field)
279297

280298
def push_to_hash(
281299
self,

tests/sentry/buffer/test_redis.py

Lines changed: 65 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -258,14 +258,14 @@ def test_process_pending_partitions_none(self, process_pending, process_incr):
258258
def group_rule_data_by_project_id(self, buffer, project_ids):
259259
project_ids_to_rule_data = defaultdict(list)
260260
for proj_id in project_ids[0]:
261-
rule_group_pairs = buffer.get_hash(Project, {"project_id": proj_id})
261+
rule_group_pairs = buffer.get_hash(Project, {"project_id": proj_id[0]})
262262
for pair in rule_group_pairs:
263263
for k, v in pair.items():
264264
if isinstance(k, bytes):
265265
k = k.decode("utf-8")
266266
if isinstance(v, bytes):
267267
v = v.decode("utf-8")
268-
project_ids_to_rule_data[int(proj_id)].append({k: v})
268+
project_ids_to_rule_data[int(proj_id[0])].append({k: v})
269269
return project_ids_to_rule_data
270270

271271
def test_enqueue(self):
@@ -283,8 +283,8 @@ def test_enqueue(self):
283283
event4_id = 11
284284

285285
# store the project ids
286-
self.buf.push_to_set(key=PROJECT_ID_BUFFER_LIST_KEY, value=project_id)
287-
self.buf.push_to_set(key=PROJECT_ID_BUFFER_LIST_KEY, value=project_id2)
286+
self.buf.push_to_sorted_set(key=PROJECT_ID_BUFFER_LIST_KEY, value=project_id)
287+
self.buf.push_to_sorted_set(key=PROJECT_ID_BUFFER_LIST_KEY, value=project_id2)
288288

289289
# store the rules and group per project
290290
self.buf.push_to_hash(
@@ -350,6 +350,67 @@ def test_process_batch(self):
350350
assert mock.call_count == 1
351351
assert mock.call_args[0][0] == self.buf
352352

353+
def test_delete_batch(self):
354+
"""Test that after we add things to redis we can clean it up"""
355+
project_id = 1
356+
rule_id = 2
357+
group_id = 3
358+
event_id = 4
359+
360+
project2_id = 5
361+
rule2_id = 6
362+
group2_id = 7
363+
event2_id = 8
364+
365+
now = datetime.datetime(2024, 4, 15, 3, 30, 00, tzinfo=datetime.UTC)
366+
one_minute_from_now = (now).replace(minute=31)
367+
368+
# add a set and a hash to the buffer
369+
with freeze_time(now):
370+
self.buf.push_to_sorted_set(key=PROJECT_ID_BUFFER_LIST_KEY, value=project_id)
371+
self.buf.push_to_hash(
372+
model=Project,
373+
filters={"project_id": project_id},
374+
field=f"{rule_id}:{group_id}",
375+
value=event_id,
376+
)
377+
with freeze_time(one_minute_from_now):
378+
self.buf.push_to_sorted_set(key=PROJECT_ID_BUFFER_LIST_KEY, value=project2_id)
379+
self.buf.push_to_hash(
380+
model=Project,
381+
filters={"project_id": project2_id},
382+
field=f"{rule2_id}:{group2_id}",
383+
value=event2_id,
384+
)
385+
386+
# retrieve them
387+
project_ids = self.buf.get_set(PROJECT_ID_BUFFER_LIST_KEY)
388+
assert len(project_ids[0]) == 2
389+
rule_group_pairs = self.buf.get_hash(Project, {"project_id": project_id})
390+
assert len(rule_group_pairs)
391+
392+
# delete only the first project ID by time
393+
self.buf.delete_key(PROJECT_ID_BUFFER_LIST_KEY, min=0, max=now.timestamp())
394+
395+
# retrieve again to make sure only project_id was removed
396+
project_ids = self.buf.get_set(PROJECT_ID_BUFFER_LIST_KEY)
397+
if isinstance(project_ids[0][0][0], bytes):
398+
assert project_ids == [
399+
[(bytes(str(project2_id), "utf-8"), one_minute_from_now.timestamp())]
400+
]
401+
else:
402+
assert project_ids == [[(str(project2_id), one_minute_from_now.timestamp())]]
403+
404+
# delete the project_id hash
405+
self.buf.delete_hash(
406+
model=Project,
407+
filters={"project_id": project_id},
408+
field=f"{rule_id}:{group_id}",
409+
)
410+
411+
rule_group_pairs = self.buf.get_hash(Project, {"project_id": project_id})
412+
assert rule_group_pairs == [{}]
413+
353414
@mock.patch("sentry.buffer.redis.RedisBuffer._make_key", mock.Mock(return_value="foo"))
354415
@mock.patch("sentry.buffer.base.Buffer.process")
355416
def test_process_uses_signal_only(self, process):

0 commit comments

Comments
 (0)