Skip to content

Commit 7ef1c9a

Browse files
committed
add lock function
1 parent 1a6b5c9 commit 7ef1c9a

File tree

8 files changed

+251
-188
lines changed

8 files changed

+251
-188
lines changed

index.d.ts

Lines changed: 0 additions & 48 deletions
This file was deleted.

index.ts

Lines changed: 0 additions & 1 deletion
This file was deleted.

package-lock.json

Lines changed: 0 additions & 13 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

package.json

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -14,27 +14,26 @@
1414
},
1515
"repository": "https://github.com/AmrSaber/simple-redis-mutex",
1616
"license": "ISC",
17+
"peerDependencies": {
18+
"redis": ">=4.7.0"
19+
},
1720
"devDependencies": {
1821
"@types/jest": "^26.0.24",
1922
"@types/redis": "^4.0.10",
20-
"delay": "^5.0.0",
2123
"eslint": "^9.15.0",
2224
"eslint-config-airbnb-base": "^14.2.1",
2325
"eslint-config-prettier": "^9.0.0",
2426
"eslint-plugin-import": "^2.22.1",
2527
"jest": "^29.7.0",
2628
"prettier": "^3.0.3",
27-
"redis": "^4.7.0",
2829
"ts-jest": "^29.2.5",
2930
"typescript": "^5.7.2"
3031
},
3132
"keywords": [
3233
"redis",
3334
"mutex",
3435
"lock",
35-
"expire",
36-
"fifo",
37-
"queue"
36+
"expire"
3837
],
3938
"engines": {
4039
"node": ">=8.2.1"

src/index.ts

Lines changed: 127 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,103 @@
11
import { RedisClientType, RedisClusterType } from 'redis';
2-
import { releaseScript } from './lua';
32

43
type RedisClient = RedisClientType<any, any, any> | RedisClusterType<any, any, any>;
54

6-
type ReleaseCallbackFn = () => unknown;
5+
type ReleaseCallbackFn = () => void;
76
type ReleaseCallback = { lockKey: string; callback: ReleaseCallbackFn };
87

98
export type ReleaseFunc = () => Promise<void>;
10-
export type TryLockParams = {
11-
timeout?: number;
9+
export type TryLockOptions = { timeout?: number };
10+
export type LockOptions = TryLockOptions & {
11+
pollingInterval?: number;
12+
failAfter?: number;
13+
onFail?: () => void;
1214
};
1315

14-
const REDIS_UPDATE_CHANNEL = '@simple-redis-mutex:locks-releases';
16+
const REDIS_RELEASES_CHANNEL = '@simple-redis-mutex:locks-releases';
1517
const REDIS_OK = 'OK';
1618

19+
export const DEFAULT_TIMEOUT = 30_000;
20+
export const DEFAULT_POLLING_INTERVAL = 10_000;
21+
22+
let callbacks: ReleaseCallback[] = [];
23+
24+
/**
25+
* Attempts to acquire lock, if lock is already acquired it will block until it can acquire the lock.
26+
* Returns lock release function.
27+
*
28+
* Lock timeout is used to expire the lock if it's not been released before `timeout`.
29+
* This is to prevent crashed processes holding the lock indefinitely.
30+
*
31+
* When a lock is released redis Pub/Sub is used to publish that the lock has been released
32+
* so that other processes waiting for the lock can attempt to acquire it.
33+
*
34+
* Manual polling is also implemented to attempt to acquire the lock in case the holder crashed and did not release the lock.
35+
* It is controlled by `pollingInterval`.
36+
*
37+
* Application logic should not depend on lock timeout and polling interval. They are meant to be a safe net when things fail.
38+
* Depending on them is inefficient and an anti-pattern, in such case application logic should be revised and refactored.
39+
*
40+
* If process fails to acquire the lock before `failAfter` milliseconds, it will throw an error and call `onFail` if provided.
41+
* If `failAfter` is not provided, process will block indefinitely waiting for the lock to be released.
42+
*
43+
* @param redis redis client
44+
* @param lockName lock name
45+
* @param options lock options
46+
* @param options.timeout lock timeout in milliseconds, default: 30 seconds
47+
* @param options.pollingInterval how long between manual polling for lock status milliseconds, default: 10 seconds
48+
* @param options.failAfter time to fail after if lock is still not acquired milliseconds
49+
* @param options.onFail called when failed to acquire lock before `failAfter`
50+
* @returns release function
51+
*/
52+
export function lock(
53+
redis: RedisClient,
54+
lockName: string,
55+
{ timeout = DEFAULT_TIMEOUT, pollingInterval = DEFAULT_POLLING_INTERVAL, failAfter, onFail }: LockOptions = {},
56+
): Promise<ReleaseFunc> {
57+
return new Promise((resolve, reject) => {
58+
let pollingId: NodeJS.Timeout | undefined;
59+
let failId: NodeJS.Timeout | undefined;
60+
61+
let attempting = true;
62+
function attempt() {
63+
if (!attempting) return;
64+
65+
tryLock(redis, lockName, { timeout }).then(([hasLock, release]) => {
66+
if (!hasLock) return;
67+
68+
clean();
69+
resolve(release);
70+
});
71+
}
72+
73+
function clean() {
74+
attempting = false;
75+
76+
// Remove release callback
77+
callbacks = callbacks.filter((cb) => cb.callback != attempt);
78+
79+
// Clear timeouts
80+
if (pollingId != null) clearInterval(pollingId);
81+
if (failId != null) clearTimeout(failId);
82+
83+
pollingId = failId = undefined;
84+
}
85+
86+
callbacks.push({ lockKey: getLockKey(lockName), callback: attempt });
87+
if (pollingInterval != null) pollingId = setInterval(attempt, pollingInterval);
88+
89+
if (failAfter != null) {
90+
failId = setTimeout(() => {
91+
clean();
92+
onFail?.();
93+
reject(new Error(`Lock "${lockName}" could not be acquired after ${failAfter} millis`));
94+
}, failAfter);
95+
}
96+
97+
attempt();
98+
});
99+
}
100+
17101
/**
18102
* Try to acquire the lock, if failed will return immediately.
19103
* Returns whether or not the lock was acquired, and a release function.
@@ -24,17 +108,17 @@ const REDIS_OK = 'OK';
24108
* If lock was not acquired, release function is a no-op.
25109
*
26110
* @param redis redis client
27-
* @param lockName Lock name
28-
* @param options Lock options
29-
* @param options.timeout Lock timeout in milliseconds, default: 30 seconds
30-
* @returns
111+
* @param lockName lock name
112+
* @param options lock options
113+
* @param options.timeout lock timeout in milliseconds, default: 30 seconds
114+
* @returns whether or not the lock was acquired and release function.
31115
*/
32116
export async function tryLock(
33117
redis: RedisClient,
34118
lockName: string,
35-
{ timeout = 30_000 }: TryLockParams = {},
119+
{ timeout = DEFAULT_TIMEOUT }: TryLockOptions = {},
36120
): Promise<[boolean, ReleaseFunc]> {
37-
const lockKey = `@simple-redis-mutex:lock-${lockName}`;
121+
const lockKey = getLockKey(lockName);
38122
const lockValue = String(Math.random());
39123

40124
await listenForUpdates(redis);
@@ -53,7 +137,7 @@ export async function tryLock(
53137

54138
await redis
55139
.eval(releaseScript, {
56-
keys: [lockKey, REDIS_UPDATE_CHANNEL],
140+
keys: [lockKey, REDIS_RELEASES_CHANNEL],
57141
arguments: [lockValue],
58142
})
59143
.catch((err) => console.error(`Error releasing lock ${lockName}:`, err));
@@ -62,38 +146,53 @@ export async function tryLock(
62146
return [true, release];
63147
}
64148

65-
let listening = false;
149+
let subscriber: RedisClient | undefined;
66150
async function listenForUpdates(redis: RedisClient) {
67-
if (listening) return;
68-
listening = true;
151+
// Make sure only one subscriber is created
152+
if (subscriber != null && subscriber.isOpen) return;
69153

70-
const subscriber = redis.duplicate();
154+
subscriber = redis.duplicate();
71155
subscriber.on('error', (err) => console.error('simple-redis-mutex subscriber error:', err));
72156
await subscriber.connect();
73157

74-
await subscriber.subscribe(REDIS_UPDATE_CHANNEL, async (message: string) => {
158+
await subscriber.subscribe(REDIS_RELEASES_CHANNEL, async (message: string) => {
75159
const releasedLock: { key: string; value: string } = JSON.parse(message);
76160

77-
// Find related callback functions and remove them from callbacks
161+
// Find related callback functions
78162
const relatedCallbacks = callbacks.filter((cb) => cb.lockKey == releasedLock.key);
79-
callbacks = callbacks.filter((cb) => cb.lockKey != releasedLock.key);
80163

81164
// Run all callbacks
82165
await Promise.all(relatedCallbacks.map((cb) => Promise.resolve(cb.callback())));
83166
});
84167

85168
redis.on('end', async () => {
86-
await subscriber.unsubscribe(REDIS_UPDATE_CHANNEL);
87-
await subscriber.quit();
169+
await subscriber?.unsubscribe(REDIS_RELEASES_CHANNEL);
170+
await subscriber?.quit();
171+
subscriber = undefined;
88172
});
89173
}
90174

91-
let callbacks: ReleaseCallback[] = [];
92-
function addReleaseCallback(lockKey: string, callbackFn: ReleaseCallbackFn) {
93-
callbacks.push({ lockKey, callback: callbackFn });
175+
function getLockKey(lockName: string): string {
176+
return `@simple-redis-mutex:lock-${lockName}`;
94177
}
95178

96-
// FIXME: is this needed?
97-
function removeReleaseCallback(lockKey: string, callbackFn: ReleaseCallbackFn) {
98-
callbacks = callbacks.filter((cb) => !(cb.lockKey == lockKey && cb.callback == callbackFn));
99-
}
179+
/**
180+
* Release the lock only if it has the same lockValue as acquireLock sets it.
181+
* This will prevent the release of an already released lock.
182+
*
183+
* Script source: https://redis.io/commands/set#patterns -- Redis official docs + with small changes
184+
*/
185+
const releaseScript = `
186+
local lockKey = KEYS[1]
187+
local updatesChannel = KEYS[2]
188+
local lockValue = ARGV[1]
189+
190+
if redis.call("GET", lockKey) == lockValue then
191+
redis.call("DEL", lockKey)
192+
redis.call(
193+
"PUBLISH",
194+
updatesChannel,
195+
string.format('{ "key": "%s", "value": "%s" }', lockKey, lockValue)
196+
)
197+
end
198+
`;

src/lua.ts

Lines changed: 0 additions & 72 deletions
This file was deleted.

0 commit comments

Comments
 (0)