Skip to content

Commit d31d90e

Browse files
authored
fix(core): use short timeouts for periodic KVS.setRecord calls (#2962)
Sometimes the `setRecord` calls might get stuck and since we issue a new one every 60s (via `persistState` event), and the default API call timeout is 6 minutes, we might end up overloading the API. This PR disables retries on those calls in case of a timeout, and overrides the default 6-minute timeout to half of the persist state interval (so to 30s by default). Related: https://apify.slack.com/archives/C0L33UM7Z/p1747043120071539
1 parent ea35685 commit d31d90e

File tree

6 files changed

+174
-40
lines changed

6 files changed

+174
-40
lines changed

packages/core/src/crawlers/statistics.ts

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -329,7 +329,17 @@ export class Statistics {
329329

330330
this.log.debug('Persisting state', { persistStateKey: this.persistStateKey });
331331

332-
await this.keyValueStore.setValue(this.persistStateKey, this.toJSON());
332+
// use half the interval of `persistState` to avoid race conditions
333+
const persistStateIntervalMillis = this.config.get('persistStateIntervalMillis')!;
334+
const timeoutSecs = persistStateIntervalMillis / 2_000;
335+
await this.keyValueStore
336+
.setValue(this.persistStateKey, this.toJSON(), {
337+
timeoutSecs,
338+
doNotRetryTimeouts: true,
339+
})
340+
.catch((error) =>
341+
this.log.warning(`Failed to persist the statistics to ${this.persistStateKey}`, { error }),
342+
);
333343
}
334344

335345
/**

packages/core/src/session_pool/session_pool.ts

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -367,7 +367,18 @@ export class SessionPool extends EventEmitter {
367367
persistStateKeyValueStoreId: this.persistStateKeyValueStoreId,
368368
persistStateKey: this.persistStateKey,
369369
});
370-
await this.keyValueStore.setValue(this.persistStateKey, this.getState());
370+
371+
// use half the interval of `persistState` to avoid race conditions
372+
const persistStateIntervalMillis = this.config.get('persistStateIntervalMillis')!;
373+
const timeoutSecs = persistStateIntervalMillis / 2_000;
374+
await this.keyValueStore
375+
.setValue(this.persistStateKey, this.getState(), {
376+
timeoutSecs,
377+
doNotRetryTimeouts: true,
378+
})
379+
.catch((error) =>
380+
this.log.warning(`Failed to persist the session pool stats to ${this.persistStateKey}`, { error }),
381+
);
371382
}
372383

373384
/**

packages/core/src/storages/key_value_store.ts

Lines changed: 38 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import JSON5 from 'json5';
66
import ow, { ArgumentError } from 'ow';
77

88
import { KEY_VALUE_STORE_KEY_REGEX } from '@apify/consts';
9+
import log from '@apify/log';
910
import { jsonStringifyExtended } from '@apify/utilities';
1011

1112
import { Configuration } from '../configuration';
@@ -273,10 +274,23 @@ export class KeyValueStore {
273274
return;
274275
}
275276

277+
// use half the interval of `persistState` to avoid race conditions
278+
const persistStateIntervalMillis = this.config.get('persistStateIntervalMillis')!;
279+
const timeoutSecs = persistStateIntervalMillis / 2_000;
280+
276281
this.config.getEventManager().on('persistState', async () => {
282+
const promises: Promise<void>[] = [];
283+
277284
for (const [key, value] of this.cache) {
278-
await this.setValue(key, value);
285+
promises.push(
286+
this.setValue(key, value, {
287+
timeoutSecs,
288+
doNotRetryTimeouts: true,
289+
}).catch((error) => log.warning(`Failed to persist the state value to ${key}`, { error })),
290+
);
279291
}
292+
293+
await Promise.all(promises);
280294
});
281295

282296
this.persistStateEventStarted = true;
@@ -352,6 +366,8 @@ export class KeyValueStore {
352366
options,
353367
ow.object.exactShape({
354368
contentType: ow.optional.string.nonEmpty,
369+
timeoutSecs: ow.optional.number,
370+
doNotRetryTimeouts: ow.optional.boolean,
355371
}),
356372
);
357373

@@ -380,11 +396,17 @@ export class KeyValueStore {
380396

381397
value = maybeStringify(value, optionsCopy);
382398

383-
return this.client.setRecord({
384-
key,
385-
value,
386-
contentType: optionsCopy.contentType,
387-
});
399+
return this.client.setRecord(
400+
{
401+
key,
402+
value,
403+
contentType: optionsCopy.contentType,
404+
},
405+
{
406+
timeoutSecs: optionsCopy.timeoutSecs,
407+
doNotRetryTimeouts: optionsCopy.doNotRetryTimeouts,
408+
},
409+
);
388410
}
389411

390412
/**
@@ -719,6 +741,16 @@ export interface RecordOptions {
719741
* Specifies a custom MIME content type of the record.
720742
*/
721743
contentType?: string;
744+
745+
/**
746+
* Specifies a custom timeout for the `set-record` API call, in seconds.
747+
*/
748+
timeoutSecs?: number;
749+
750+
/**
751+
* If set to `true`, the `set-record` API call will not be retried if it times out.
752+
*/
753+
doNotRetryTimeouts?: boolean;
722754
}
723755

724756
export interface KeyValueStoreIteratorOptions {

packages/types/src/storages.ts

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,11 @@ export interface KeyValueStoreRecord {
124124
contentType?: string;
125125
}
126126

127+
export interface KeyValueStoreRecordOptions {
128+
timeoutSecs?: number;
129+
doNotRetryTimeouts?: boolean;
130+
}
131+
127132
export interface KeyValueStoreClientUpdateOptions {
128133
name?: string;
129134
}
@@ -162,7 +167,7 @@ export interface KeyValueStoreClient {
162167
listKeys(options?: KeyValueStoreClientListOptions): Promise<KeyValueStoreClientListData>;
163168
recordExists(key: string): Promise<boolean>;
164169
getRecord(key: string, options?: KeyValueStoreClientGetRecordOptions): Promise<KeyValueStoreRecord | undefined>;
165-
setRecord(record: KeyValueStoreRecord): Promise<void>;
170+
setRecord(record: KeyValueStoreRecord, options?: KeyValueStoreRecordOptions): Promise<void>;
166171
deleteRecord(key: string): Promise<void>;
167172
}
168173

test/core/crawlers/statistics.test.ts

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -186,8 +186,15 @@ describe('Statistics', () => {
186186
// @ts-expect-error Incorrect types?
187187
const { retryHistogram, finished, failed, ...rest } = stats.calculate();
188188

189-
// @ts-expect-error Accessing private prop
190-
expect(setValueSpy).toBeCalledWith(stats.persistStateKey, { ...state, ...rest });
189+
expect(setValueSpy).toBeCalledWith(
190+
// @ts-expect-error Accessing private prop
191+
stats.persistStateKey,
192+
{ ...state, ...rest },
193+
{
194+
doNotRetryTimeouts: true,
195+
timeoutSecs: 30,
196+
},
197+
);
191198
}, 2000);
192199
});
193200

test/core/storages/key_value_store.test.ts

Lines changed: 98 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -40,11 +40,17 @@ describe('KeyValueStore', () => {
4040
await store.setValue('key-1', record);
4141

4242
expect(mockSetRecord).toBeCalledTimes(1);
43-
expect(mockSetRecord).toBeCalledWith({
44-
key: 'key-1',
45-
value: recordStr,
46-
contentType: 'application/json; charset=utf-8',
47-
});
43+
expect(mockSetRecord).toBeCalledWith(
44+
{
45+
key: 'key-1',
46+
value: recordStr,
47+
contentType: 'application/json; charset=utf-8',
48+
},
49+
{
50+
doNotRetryTimeouts: undefined,
51+
timeoutSecs: undefined,
52+
},
53+
);
4854

4955
// Get Record
5056
const mockGetRecord = vitest
@@ -281,11 +287,17 @@ describe('KeyValueStore', () => {
281287
await store.setValue('key-1', 'xxxx', { contentType: 'text/plain; charset=utf-8' });
282288

283289
expect(mockSetRecord).toBeCalledTimes(1);
284-
expect(mockSetRecord).toBeCalledWith({
285-
key: 'key-1',
286-
value: 'xxxx',
287-
contentType: 'text/plain; charset=utf-8',
288-
});
290+
expect(mockSetRecord).toBeCalledWith(
291+
{
292+
key: 'key-1',
293+
value: 'xxxx',
294+
contentType: 'text/plain; charset=utf-8',
295+
},
296+
{
297+
doNotRetryTimeouts: undefined,
298+
timeoutSecs: undefined,
299+
},
300+
);
289301
});
290302

291303
test('correctly passes object values as JSON', async () => {
@@ -305,11 +317,50 @@ describe('KeyValueStore', () => {
305317
await store.setValue('key-1', record);
306318

307319
expect(mockSetRecord).toBeCalledTimes(1);
308-
expect(mockSetRecord).toBeCalledWith({
309-
key: 'key-1',
310-
value: recordStr,
311-
contentType: 'application/json; charset=utf-8',
320+
expect(mockSetRecord).toBeCalledWith(
321+
{
322+
key: 'key-1',
323+
value: recordStr,
324+
contentType: 'application/json; charset=utf-8',
325+
},
326+
{
327+
doNotRetryTimeouts: undefined,
328+
timeoutSecs: undefined,
329+
},
330+
);
331+
});
332+
333+
test('correctly passes timeout options', async () => {
334+
const store = new KeyValueStore({
335+
id: 'my-store-id-1',
336+
client,
337+
});
338+
339+
const record = { foo: 'bar' };
340+
const recordStr = JSON.stringify(record, null, 2);
341+
342+
const mockSetRecord = vitest
343+
// @ts-expect-error Accessing private property
344+
.spyOn(store.client, 'setRecord')
345+
.mockResolvedValueOnce(undefined);
346+
347+
await store.setValue('key-1', record, {
348+
timeoutSecs: 1,
349+
doNotRetryTimeouts: true,
312350
});
351+
352+
expect(mockSetRecord).toBeCalledTimes(1);
353+
expect(mockSetRecord).toBeCalledWith(
354+
{
355+
key: 'key-1',
356+
value: recordStr,
357+
contentType: 'application/json; charset=utf-8',
358+
},
359+
{
360+
doNotRetryTimeouts: true,
361+
timeoutSecs: 1,
362+
},
363+
);
313364
});
314365

315366
test('correctly passes raw string values', async () => {
@@ -326,11 +377,17 @@ describe('KeyValueStore', () => {
326377
await store.setValue('key-1', 'xxxx', { contentType: 'text/plain; charset=utf-8' });
327378

328379
expect(mockSetRecord).toBeCalledTimes(1);
329-
expect(mockSetRecord).toBeCalledWith({
330-
key: 'key-1',
331-
value: 'xxxx',
332-
contentType: 'text/plain; charset=utf-8',
333-
});
380+
expect(mockSetRecord).toBeCalledWith(
381+
{
382+
key: 'key-1',
383+
value: 'xxxx',
384+
contentType: 'text/plain; charset=utf-8',
385+
},
386+
{
387+
doNotRetryTimeouts: undefined,
388+
timeoutSecs: undefined,
389+
},
390+
);
334391
});
335392

336393
test('correctly passes raw Buffer values', async () => {
@@ -348,11 +405,17 @@ describe('KeyValueStore', () => {
348405
await store.setValue('key-1', value, { contentType: 'image/jpeg; charset=something' });
349406

350407
expect(mockSetRecord).toBeCalledTimes(1);
351-
expect(mockSetRecord).toBeCalledWith({
352-
key: 'key-1',
353-
value,
354-
contentType: 'image/jpeg; charset=something',
355-
});
408+
expect(mockSetRecord).toBeCalledWith(
409+
{
410+
key: 'key-1',
411+
value,
412+
contentType: 'image/jpeg; charset=something',
413+
},
414+
{
415+
doNotRetryTimeouts: undefined,
416+
timeoutSecs: undefined,
417+
},
418+
);
356419
});
357420

358421
test('correctly passes a stream', async () => {
@@ -373,11 +436,17 @@ describe('KeyValueStore', () => {
373436
value.destroy();
374437

375438
expect(mockSetRecord).toHaveBeenCalledTimes(1);
376-
expect(mockSetRecord).toHaveBeenCalledWith({
377-
key: 'key-1',
378-
value,
379-
contentType: 'plain/text',
380-
});
439+
expect(mockSetRecord).toHaveBeenCalledWith(
440+
{
441+
key: 'key-1',
442+
value,
443+
contentType: 'plain/text',
444+
},
445+
{
446+
doNotRetryTimeouts: undefined,
447+
timeoutSecs: undefined,
448+
},
449+
);
381450
});
382451
});
383452

0 commit comments

Comments
 (0)