Skip to content

Commit 886a8cc

Browse files
authored
chore: isActive and isConnected early return to avoid issues (#337)
* Early return on isActive and isConnected if init was not called * Version bump * Minor issue fix * coverage fix
1 parent 2b6311f commit 886a8cc

File tree

2 files changed

+20
-6
lines changed

2 files changed

+20
-6
lines changed

packages/kafka/lib/AbstractKafkaConsumer.ts

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -145,15 +145,31 @@ export abstract class AbstractKafkaConsumer<
145145
* Returns true if all client's connections are currently connected and the client is connected to at least one broker.
146146
*/
147147
get isConnected(): boolean {
148-
return this.consumer.isConnected()
148+
// Streams are created only when init method was called
149+
if (!this.consumerStream && !this.messageBatchStream) return false
150+
try {
151+
return this.consumer.isConnected()
152+
} catch (_) {
153+
// this should not happen, but if so it means the consumer is not healthy
154+
/* v8 ignore next */
155+
return false
156+
}
149157
}
150158

151159
/**
152160
* Returns `true` if the consumer is not closed, and it is currently an active member of a consumer group.
153161
* This method will return `false` during consumer group rebalancing.
154162
*/
155163
get isActive(): boolean {
156-
return this.consumer.isActive()
164+
// Streams are created only when init method was called
165+
if (!this.consumerStream && !this.messageBatchStream) return false
166+
try {
167+
return this.consumer.isActive()
168+
} catch (_) {
169+
// this should not happen, but if so it means the consumer is not healthy
170+
/* v8 ignore next */
171+
return false
172+
}
157173
}
158174

159175
async init(): Promise<void> {
@@ -209,9 +225,7 @@ export abstract class AbstractKafkaConsumer<
209225
async close(): Promise<void> {
210226
if (!this.consumerStream && !this.messageBatchStream) {
211227
// Leaving the group in case consumer joined but streams were not created
212-
if (this.isActive) {
213-
this.consumer.leaveGroup()
214-
}
228+
if (this.isActive) this.consumer.leaveGroup()
215229
return
216230
}
217231

packages/kafka/package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"name": "@message-queue-toolkit/kafka",
3-
"version": "0.7.0",
3+
"version": "0.7.1",
44
"engines": {
55
"node": ">= 22.14.0"
66
},

0 commit comments

Comments
 (0)