Skip to content

Commit 0c0d837

Browse files
authored
[service-utils] fix: crash if Kafka connects more than once (#6387)
1 parent 879148f commit 0c0d837

File tree

2 files changed

+17
-6
lines changed

2 files changed

+17
-6
lines changed

.changeset/gentle-readers-bet.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
"@thirdweb-dev/service-utils": patch
3+
---
4+
5+
[service-utils] Add connect promise so it only connects once

packages/service-utils/src/node/kafka.ts

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,8 @@ export interface KafkaProducerSendOptions {
3131
*/
3232
export class KafkaProducer {
3333
private producer: KafkaJS.Producer;
34-
private isConnected = false;
34+
// Use a promise to ensure `connect()` is called at most once.
35+
private connectPromise?: Promise<void>;
3536

3637
constructor(options: {
3738
/**
@@ -68,10 +69,17 @@ export class KafkaProducer {
6869

6970
/**
7071
* Connects the producer. Can be called explicitly at the start of your service, or will be called automatically when sending messages.
72+
*
73+
* A cached promise is used so this function is safe to call more than once and concurrently.
7174
*/
7275
async connect() {
73-
await this.producer.connect();
74-
this.isConnected = true;
76+
if (!this.connectPromise) {
77+
this.connectPromise = this.producer.connect().catch((err) => {
78+
this.connectPromise = undefined;
79+
throw err;
80+
});
81+
}
82+
await this.connectPromise;
7583
}
7684

7785
/**
@@ -88,9 +96,7 @@ export class KafkaProducer {
8896
topic: string,
8997
messages: Record<string, unknown>[],
9098
): Promise<void> {
91-
if (!this.isConnected) {
92-
await this.connect();
93-
}
99+
await this.connect();
94100

95101
await this.producer.send({
96102
topic,

0 commit comments

Comments
 (0)