Skip to content

Commit ae55f16

Browse files
authored
feat(amqp): add heartbeat and reconnect time configuration options (#2572)
This PR introduces configuration options for the AMQP heartbeat interval and reconnect time, improving connection stability and management. Previously, these values were not configurable, which could result in the client prematurely closing the connection.
1 parent 7aa1397 commit ae55f16

File tree

5 files changed

+37
-4
lines changed

5 files changed

+37
-4
lines changed

.env.sample

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,12 @@
129129
# The message prefetch amount (max non-acknowledged messages being processed)
130130
# (default=100)
131131
# AMQP_PREFETCH=
132+
# The AMQP heartbeat interval in seconds
133+
# (default=60)
134+
# AMQP_HEARBEAT_INTERVAL_SECONDS=
135+
# The AMQP reconnect time in seconds
136+
# (default=5)
137+
# AMQP_RECONNECT_TIME_SECONDS=
132138

133139
# Authorization token to use privileged endpoints.
134140
# The AUTH_TOKEN should always be set

src/__tests__/amqp-client.factory.ts

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,15 +6,27 @@ export function amqpClientFactory(queue?: string): {
66
channel: ChannelWrapper;
77
queueName: string;
88
} {
9-
const { AMQP_URL, AMQP_EXCHANGE_NAME, AMQP_EXCHANGE_MODE, AMQP_QUEUE } =
10-
process.env;
9+
const {
10+
AMQP_URL,
11+
AMQP_EXCHANGE_NAME,
12+
AMQP_EXCHANGE_MODE,
13+
AMQP_QUEUE,
14+
AMQP_HEARBEAT_INTERVAL_SECONDS,
15+
AMQP_RECONNECT_TIME_SECONDS,
16+
} = process.env;
17+
18+
const heartbeatIntervalInSeconds = +(AMQP_HEARBEAT_INTERVAL_SECONDS || 60);
19+
const reconnectTimeInSeconds = +(AMQP_RECONNECT_TIME_SECONDS || 5);
1120

1221
if (!AMQP_URL || !AMQP_EXCHANGE_NAME || !AMQP_EXCHANGE_MODE || !AMQP_QUEUE) {
1322
throw new Error('Invalid amqpClientFactory configuration');
1423
}
1524

1625
const queueName = queue ?? AMQP_QUEUE;
17-
const connection = amqp.connect(AMQP_URL);
26+
const connection = amqp.connect(AMQP_URL, {
27+
heartbeatIntervalInSeconds,
28+
reconnectTimeInSeconds,
29+
});
1830
const channel = connection.createChannel({
1931
json: true,
2032
setup: async (ch: Channel) => {

src/config/entities/__tests__/configuration.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,8 @@ export default (): ReturnType<typeof configuration> => ({
3333
exchange: { name: faker.string.sample(), mode: faker.string.sample() },
3434
queue: faker.string.sample(),
3535
prefetch: faker.number.int(),
36+
heartbeatIntervalInSeconds: 60,
37+
reconnectTimeInSeconds: 5,
3638
},
3739
application: {
3840
isProduction: faker.datatype.boolean(),

src/config/entities/configuration.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,10 @@ export default () => ({
5959
process.env.AMQP_PREFETCH != null
6060
? parseInt(process.env.AMQP_PREFETCH)
6161
: 100,
62+
heartbeatIntervalInSeconds: +(
63+
process.env.AMQP_HEARBEAT_INTERVAL_SECONDS || 60
64+
),
65+
reconnectTimeInSeconds: +(process.env.AMQP_RECONNECT_TIME_SECONDS || 5),
6266
},
6367
application: {
6468
isProduction: process.env.CGW_ENV === 'production',

src/datasources/queues/queues-api.module.ts

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,17 @@ function queueConsumerFactory(
2525
configurationService.getOrThrow<string>('amqp.exchange.mode');
2626
const queue = configurationService.getOrThrow<string>('amqp.queue');
2727
const prefetch = configurationService.getOrThrow<number>('amqp.prefetch');
28+
const heartbeatIntervalInSeconds = configurationService.getOrThrow<number>(
29+
'amqp.heartbeatIntervalInSeconds',
30+
);
31+
const reconnectTimeInSeconds = configurationService.getOrThrow<number>(
32+
'amqp.reconnectTimeInSeconds',
33+
);
2834

29-
const connection = amqp.connect(amqpUrl);
35+
const connection = amqp.connect(amqpUrl, {
36+
heartbeatIntervalInSeconds,
37+
reconnectTimeInSeconds,
38+
});
3039
const channel = connection.createChannel({
3140
json: true,
3241
setup: async (ch: Channel) => {

0 commit comments

Comments
 (0)