From b1ca84ca14d37464c048dafe13f7aa53d4aeb3b7 Mon Sep 17 00:00:00 2001 From: Sayan Das Date: Fri, 25 Jul 2025 12:24:19 +0530 Subject: [PATCH 1/5] Added kafka error handlers --- .../src/destinations/kafka/errors.ts | 1150 +++++++++++++++++ 1 file changed, 1150 insertions(+) create mode 100644 packages/destination-actions/src/destinations/kafka/errors.ts diff --git a/packages/destination-actions/src/destinations/kafka/errors.ts b/packages/destination-actions/src/destinations/kafka/errors.ts new file mode 100644 index 00000000000..ec44f4300c7 --- /dev/null +++ b/packages/destination-actions/src/destinations/kafka/errors.ts @@ -0,0 +1,1150 @@ +import { KafkaJSError } from 'kafkajs' +import { ActionDestinationErrorResponse } from '@segment/actions-core' + +export type KafkaResponse = { + kafkaStatus: string + kafkaStatusCode: number + isRetryableError: boolean + httpResponseCode: number + httpResponseMessage: string +} + +// https://kafka.apache.org/11/protocol.html#protocol_error_codes +export const KafkaErrorMap = new Map([ + [ + -1, + { + kafkaStatus: 'UNKNOWN_SERVER_ERROR', + kafkaStatusCode: -1, + isRetryableError: false, + httpResponseCode: 400, + httpResponseMessage: 'The server experienced an unexpected error when processing the request.' + } + ], + [ + 0, + { + kafkaStatus: 'NONE', + kafkaStatusCode: 0, + isRetryableError: false, + httpResponseCode: 200, + httpResponseMessage: 'Message sent successfully.' + } + ], + [ + 1, + { + kafkaStatus: 'OFFSET_OUT_OF_RANGE', + kafkaStatusCode: 1, + isRetryableError: false, + httpResponseCode: 400, + httpResponseMessage: 'The requested offset is not within the range of offsets maintained by the server.' + } + ], + [ + 2, + { + kafkaStatus: 'CORRUPT_MESSAGE', + kafkaStatusCode: 2, + isRetryableError: true, + httpResponseCode: 400, + httpResponseMessage: 'This message has failed its CRC checksum, exceeds the valid size, or is otherwise corrupt.' + } + ], + [ + 3, + { + kafkaStatus: 'UNKNOWN_TOPIC_OR_PARTITION', + kafkaStatusCode: 3, + isRetryableError: true, + httpResponseCode: 404, + httpResponseMessage: 'This server does not host this topic-partition.' + } + ], + [ + 4, + { + kafkaStatus: 'INVALID_FETCH_SIZE', + kafkaStatusCode: 4, + isRetryableError: false, + httpResponseCode: 400, + httpResponseMessage: 'The requested fetch size is invalid.' + } + ], + [ + 5, + { + kafkaStatus: 'LEADER_NOT_AVAILABLE', + kafkaStatusCode: 5, + isRetryableError: true, + httpResponseCode: 503, + httpResponseMessage: + 'There is no leader for this topic-partition as we are in the middle of a leadership election.' + } + ], + [ + 6, + { + kafkaStatus: 'NOT_LEADER_FOR_PARTITION', + kafkaStatusCode: 6, + isRetryableError: true, + httpResponseCode: 503, + httpResponseMessage: 'This server is not the leader for that topic-partition.' + } + ], + [ + 7, + { + kafkaStatus: 'REQUEST_TIMED_OUT', + kafkaStatusCode: 7, + isRetryableError: true, + httpResponseCode: 408, + httpResponseMessage: 'The request timed out.' + } + ], + [ + 8, + { + kafkaStatus: 'BROKER_NOT_AVAILABLE', + kafkaStatusCode: 8, + isRetryableError: false, + httpResponseCode: 503, + httpResponseMessage: 'The broker is not available.' + } + ], + [ + 9, + { + kafkaStatus: 'REPLICA_NOT_AVAILABLE', + kafkaStatusCode: 9, + isRetryableError: false, + httpResponseCode: 503, + httpResponseMessage: 'The replica is not available for the requested topic-partition.' + } + ], + [ + 10, + { + kafkaStatus: 'MESSAGE_TOO_LARGE', + kafkaStatusCode: 10, + isRetryableError: false, + httpResponseCode: 413, + httpResponseMessage: 'The request included a message larger than the max message size the server will accept.' + } + ], + [ + 11, + { + kafkaStatus: 'STALE_CONTROLLER_EPOCH', + kafkaStatusCode: 11, + isRetryableError: false, + httpResponseCode: 503, + httpResponseMessage: 'The controller moved to another broker.' + } + ], + [ + 12, + { + kafkaStatus: 'OFFSET_METADATA_TOO_LARGE', + kafkaStatusCode: 12, + isRetryableError: false, + httpResponseCode: 413, + httpResponseMessage: 'The metadata field of the offset request was too large.' + } + ], + [ + 13, + { + kafkaStatus: 'NETWORK_EXCEPTION', + kafkaStatusCode: 13, + isRetryableError: true, + httpResponseCode: 503, + httpResponseMessage: 'The server disconnected before a response was received.' + } + ], + [ + 14, + { + kafkaStatus: 'COORDINATOR_LOAD_IN_PROGRESS', + kafkaStatusCode: 14, + isRetryableError: true, + httpResponseCode: 503, + httpResponseMessage: "The coordinator is loading and hence can't process requests." + } + ], + [ + 15, + { + kafkaStatus: 'COORDINATOR_NOT_AVAILABLE', + kafkaStatusCode: 15, + isRetryableError: true, + httpResponseCode: 503, + httpResponseMessage: 'The coordinator is not available.' + } + ], + [ + 16, + { + kafkaStatus: 'NOT_COORDINATOR', + kafkaStatusCode: 16, + isRetryableError: true, + httpResponseCode: 503, + httpResponseMessage: 'Not coordinator for group' + } + ], + [ + 17, + { + kafkaStatus: 'INVALID_TOPIC_EXCEPTION', + kafkaStatusCode: 17, + isRetryableError: false, + httpResponseCode: 400, + httpResponseMessage: 'The request attempted to perform an operation on an invalid topic.' + } + ], + [ + 18, + { + kafkaStatus: 'RECORD_LIST_TOO_LARGE', + kafkaStatusCode: 18, + isRetryableError: false, + httpResponseCode: 413, + httpResponseMessage: 'The request included message batch larger than the configured segment size on the server.' + } + ], + [ + 19, + { + kafkaStatus: 'NOT_ENOUGH_REPLICAS', + kafkaStatusCode: 19, + isRetryableError: true, + httpResponseCode: 503, + httpResponseMessage: 'Messages are rejected since there are fewer in-sync replicas than required.' + } + ], + [ + 20, + { + kafkaStatus: 'NOT_ENOUGH_REPLICAS_AFTER_APPEND', + kafkaStatusCode: 20, + isRetryableError: true, + httpResponseCode: 503, + httpResponseMessage: 'Messages are written to the log, but to fewer in-sync replicas than required.' + } + ], + [ + 21, + { + kafkaStatus: 'INVALID_REQUIRED_ACKS', + kafkaStatusCode: 21, + isRetryableError: false, + httpResponseCode: 400, + httpResponseMessage: 'Produce request specified an invalid value for required acks.' + } + ], + [ + 22, + { + kafkaStatus: 'ILLEGAL_GENERATION', + kafkaStatusCode: 22, + isRetryableError: false, + httpResponseCode: 400, + httpResponseMessage: 'Specified group generation id is not valid.' + } + ], + [ + 23, + { + kafkaStatus: 'INCONSISTENT_GROUP_PROTOCOL', + kafkaStatusCode: 23, + isRetryableError: false, + httpResponseCode: 400, + httpResponseMessage: + "The group member's supported protocols are incompatible with those of existing members or first group member tried to join with empty protocol type or empty protocol list." + } + ], + [ + 24, + { + kafkaStatus: 'INVALID_GROUP_ID', + kafkaStatusCode: 24, + isRetryableError: false, + httpResponseCode: 400, + httpResponseMessage: 'The configured groupId is invalid' + } + ], + [ + 25, + { + kafkaStatus: 'UNKNOWN_MEMBER_ID', + kafkaStatusCode: 25, + isRetryableError: false, + httpResponseCode: 400, + httpResponseMessage: 'The coordinator is not aware of this member.' + } + ], + [ + 26, + { + kafkaStatus: 'INVALID_SESSION_TIMEOUT', + kafkaStatusCode: 26, + isRetryableError: false, + httpResponseCode: 400, + httpResponseMessage: + 'The session timeout is not within the range allowed by the broker (as configured by group.min.session.timeout.ms and group.max.session.timeout.ms).' + } + ], + [ + 27, + { + kafkaStatus: 'REBALANCE_IN_PROGRESS', + kafkaStatusCode: 27, + isRetryableError: false, + httpResponseCode: 503, + httpResponseMessage: 'The group is rebalancing, so a rejoin is needed.' + } + ], + [ + 28, + { + kafkaStatus: 'INVALID_COMMIT_OFFSET_SIZE', + kafkaStatusCode: 28, + isRetryableError: false, + httpResponseCode: 400, + httpResponseMessage: 'The committing offset data size is not valid' + } + ], + [ + 29, + { + kafkaStatus: 'TOPIC_AUTHORIZATION_FAILED', + kafkaStatusCode: 29, + isRetryableError: false, + httpResponseCode: 403, + httpResponseMessage: 'Not authorized to access topics: [Topic authorization failed.]' + } + ], + [ + 30, + { + kafkaStatus: 'GROUP_AUTHORIZATION_FAILED', + kafkaStatusCode: 30, + isRetryableError: false, + httpResponseCode: 403, + httpResponseMessage: 'Not authorized to access group: Group authorization failed.' + } + ], + [ + 31, + { + kafkaStatus: 'CLUSTER_AUTHORIZATION_FAILED', + kafkaStatusCode: 31, + isRetryableError: false, + httpResponseCode: 403, + httpResponseMessage: 'Cluster authorization failed.' + } + ], + [ + 32, + { + kafkaStatus: 'INVALID_TIMESTAMP', + kafkaStatusCode: 32, + isRetryableError: false, + httpResponseCode: 400, + httpResponseMessage: 'The timestamp of the message is out of acceptable range.' + } + ], + [ + 33, + { + kafkaStatus: 'UNSUPPORTED_SASL_MECHANISM', + kafkaStatusCode: 33, + isRetryableError: false, + httpResponseCode: 400, + httpResponseMessage: 'The broker does not support the requested SASL mechanism.' + } + ], + [ + 34, + { + kafkaStatus: 'ILLEGAL_SASL_STATE', + kafkaStatusCode: 34, + isRetryableError: false, + httpResponseCode: 400, + httpResponseMessage: 'Request is not valid given the current SASL state.' + } + ], + [ + 35, + { + kafkaStatus: 'UNSUPPORTED_VERSION', + kafkaStatusCode: 35, + isRetryableError: false, + httpResponseCode: 400, + httpResponseMessage: 'The version of API is not supported.' + } + ], + [ + 36, + { + kafkaStatus: 'TOPIC_ALREADY_EXISTS', + kafkaStatusCode: 36, + isRetryableError: false, + httpResponseCode: 409, + httpResponseMessage: 'Topic with this name already exists.' + } + ], + [ + 37, + { + kafkaStatus: 'INVALID_PARTITIONS', + kafkaStatusCode: 37, + isRetryableError: false, + httpResponseCode: 400, + httpResponseMessage: 'Number of partitions is invalid.' + } + ], + [ + 38, + { + kafkaStatus: 'INVALID_REPLICATION_FACTOR', + kafkaStatusCode: 38, + isRetryableError: false, + httpResponseCode: 400, + httpResponseMessage: 'Replication-factor is invalid.' + } + ], + [ + 39, + { + kafkaStatus: 'INVALID_REPLICA_ASSIGNMENT', + kafkaStatusCode: 39, + isRetryableError: false, + httpResponseCode: 400, + httpResponseMessage: 'Replica assignment is invalid.' + } + ], + [ + 40, + { + kafkaStatus: 'INVALID_CONFIG', + kafkaStatusCode: 40, + isRetryableError: false, + httpResponseCode: 400, + httpResponseMessage: 'Configuration is invalid.' + } + ], + [ + 41, + { + kafkaStatus: 'NOT_CONTROLLER', + kafkaStatusCode: 41, + isRetryableError: true, + httpResponseCode: 503, + httpResponseMessage: 'This is not the correct controller for this cluster.' + } + ], + [ + 42, + { + kafkaStatus: 'INVALID_REQUEST', + kafkaStatusCode: 42, + isRetryableError: false, + httpResponseCode: 400, + httpResponseMessage: + 'This most likely occurs because of a request being malformed by the client library or the message was sent to an incompatible broker. See the broker logs for more details.' + } + ], + [ + 43, + { + kafkaStatus: 'UNSUPPORTED_FOR_MESSAGE_FORMAT', + kafkaStatusCode: 43, + isRetryableError: false, + httpResponseCode: 400, + httpResponseMessage: 'The message format version on the broker does not support the request.' + } + ], + [ + 44, + { + kafkaStatus: 'POLICY_VIOLATION', + kafkaStatusCode: 44, + isRetryableError: false, + httpResponseCode: 403, + httpResponseMessage: 'Request parameters do not satisfy the configured policy.' + } + ], + [ + 45, + { + kafkaStatus: 'OUT_OF_ORDER_SEQUENCE_NUMBER', + kafkaStatusCode: 45, + isRetryableError: false, + httpResponseCode: 400, + httpResponseMessage: 'The broker received an out of order sequence number' + } + ], + [ + 46, + { + kafkaStatus: 'DUPLICATE_SEQUENCE_NUMBER', + kafkaStatusCode: 46, + isRetryableError: false, + httpResponseCode: 400, + httpResponseMessage: 'The broker received a duplicate sequence number' + } + ], + [ + 47, + { + kafkaStatus: 'INVALID_PRODUCER_EPOCH', + kafkaStatusCode: 47, + isRetryableError: false, + httpResponseCode: 400, + httpResponseMessage: + "Producer attempted an operation with an old epoch. Either there is a newer producer with the same transactionalId, or the producer's transaction has been expired by the broker." + } + ], + [ + 48, + { + kafkaStatus: 'INVALID_TXN_STATE', + kafkaStatusCode: 48, + isRetryableError: false, + httpResponseCode: 400, + httpResponseMessage: 'The producer attempted a transactional operation in an invalid state' + } + ], + [ + 49, + { + kafkaStatus: 'INVALID_PRODUCER_ID_MAPPING', + kafkaStatusCode: 49, + isRetryableError: false, + httpResponseCode: 400, + httpResponseMessage: + 'The producer attempted to use a producer id which is not currently assigned to its transactional id' + } + ], + [ + 50, + { + kafkaStatus: 'INVALID_TRANSACTION_TIMEOUT', + kafkaStatusCode: 50, + isRetryableError: false, + httpResponseCode: 400, + httpResponseMessage: + 'The transaction timeout is larger than the maximum value allowed by the broker (as configured by transaction.max.timeout.ms).' + } + ], + [ + 51, + { + kafkaStatus: 'CONCURRENT_TRANSACTIONS', + kafkaStatusCode: 51, + isRetryableError: false, + httpResponseCode: 503, + httpResponseMessage: + 'The producer attempted to update a transaction while another concurrent operation on the same transaction was ongoing' + } + ], + [ + 52, + { + kafkaStatus: 'TRANSACTION_COORDINATOR_FENCED', + kafkaStatusCode: 52, + isRetryableError: false, + httpResponseCode: 400, + httpResponseMessage: + 'Indicates that the transaction coordinator sending a WriteTxnMarker is no longer the current coordinator for a given producer' + } + ], + [ + 53, + { + kafkaStatus: 'TRANSACTIONAL_ID_AUTHORIZATION_FAILED', + kafkaStatusCode: 53, + isRetryableError: false, + httpResponseCode: 403, + httpResponseMessage: 'Transactional Id authorization failed' + } + ], + [ + 54, + { + kafkaStatus: 'SECURITY_DISABLED', + kafkaStatusCode: 54, + isRetryableError: false, + httpResponseCode: 400, + httpResponseMessage: 'Security features are disabled.' + } + ], + [ + 55, + { + kafkaStatus: 'OPERATION_NOT_ATTEMPTED', + kafkaStatusCode: 55, + isRetryableError: false, + httpResponseCode: 400, + httpResponseMessage: + 'The broker did not attempt to execute this operation. This may happen for batched RPCs where some operations in the batch failed, causing the broker to respond without trying the rest.' + } + ], + [ + 56, + { + kafkaStatus: 'KAFKA_STORAGE_ERROR', + kafkaStatusCode: 56, + isRetryableError: true, + httpResponseCode: 500, + httpResponseMessage: 'Disk error when trying to access log file on the disk.' + } + ], + [ + 57, + { + kafkaStatus: 'LOG_DIR_NOT_FOUND', + kafkaStatusCode: 57, + isRetryableError: false, + httpResponseCode: 500, + httpResponseMessage: 'The user-specified log directory is not found in the broker config.' + } + ], + [ + 58, + { + kafkaStatus: 'SASL_AUTHENTICATION_FAILED', + kafkaStatusCode: 58, + isRetryableError: false, + httpResponseCode: 401, + httpResponseMessage: 'SASL Authentication failed.' + } + ], + [ + 59, + { + kafkaStatus: 'UNKNOWN_PRODUCER_ID', + kafkaStatusCode: 59, + isRetryableError: false, + httpResponseCode: 400, + httpResponseMessage: + "This exception is raised by the broker if it could not locate the producer metadata associated with the producerId in question. This could happen if, for instance, the producer's records were deleted because their retention time had elapsed. Once the last records of the producerId are removed, the producer's metadata is removed from the broker, and future appends by the producer will return this exception." + } + ], + [ + 60, + { + kafkaStatus: 'REASSIGNMENT_IN_PROGRESS', + kafkaStatusCode: 60, + isRetryableError: false, + httpResponseCode: 503, + httpResponseMessage: 'A partition reassignment is in progress' + } + ], + [ + 61, + { + kafkaStatus: 'DELEGATION_TOKEN_AUTH_DISABLED', + kafkaStatusCode: 61, + isRetryableError: false, + httpResponseCode: 400, + httpResponseMessage: 'Delegation Token feature is not enabled.' + } + ], + [ + 62, + { + kafkaStatus: 'DELEGATION_TOKEN_NOT_FOUND', + kafkaStatusCode: 62, + isRetryableError: false, + httpResponseCode: 404, + httpResponseMessage: 'Delegation Token is not found on server.' + } + ], + [ + 63, + { + kafkaStatus: 'DELEGATION_TOKEN_OWNER_MISMATCH', + kafkaStatusCode: 63, + isRetryableError: false, + httpResponseCode: 403, + httpResponseMessage: 'Specified Principal is not valid Owner/Renewer.' + } + ], + [ + 64, + { + kafkaStatus: 'DELEGATION_TOKEN_REQUEST_NOT_ALLOWED', + kafkaStatusCode: 64, + isRetryableError: false, + httpResponseCode: 403, + httpResponseMessage: + 'Delegation Token requests are not allowed on PLAINTEXT/1-way SSL channels and on delegation token authenticated channels.' + } + ], + [ + 65, + { + kafkaStatus: 'DELEGATION_TOKEN_AUTHORIZATION_FAILED', + kafkaStatusCode: 65, + isRetryableError: false, + httpResponseCode: 403, + httpResponseMessage: 'Delegation Token authorization failed.' + } + ], + [ + 66, + { + kafkaStatus: 'DELEGATION_TOKEN_EXPIRED', + kafkaStatusCode: 66, + isRetryableError: false, + httpResponseCode: 401, + httpResponseMessage: 'Delegation Token is expired.' + } + ], + [ + 67, + { + kafkaStatus: 'INVALID_PRINCIPAL_TYPE', + kafkaStatusCode: 67, + isRetryableError: false, + httpResponseCode: 400, + httpResponseMessage: 'Supplied principalType is not supported' + } + ], + [ + 68, + { + kafkaStatus: 'NON_EMPTY_GROUP', + kafkaStatusCode: 68, + isRetryableError: false, + httpResponseCode: 400, + httpResponseMessage: 'The group The group is not empty is not empty' + } + ], + [ + 69, + { + kafkaStatus: 'GROUP_ID_NOT_FOUND', + kafkaStatusCode: 69, + isRetryableError: false, + httpResponseCode: 404, + httpResponseMessage: 'The group id The group id does not exist was not found' + } + ], + [ + 70, + { + kafkaStatus: 'FETCH_SESSION_ID_NOT_FOUND', + kafkaStatusCode: 70, + isRetryableError: true, + httpResponseCode: 503, + httpResponseMessage: 'The fetch session ID was not found' + } + ], + [ + 71, + { + kafkaStatus: 'INVALID_FETCH_SESSION_EPOCH', + kafkaStatusCode: 71, + isRetryableError: true, + httpResponseCode: 503, + httpResponseMessage: 'The fetch session epoch is invalid' + } + ], + [ + 72, + { + kafkaStatus: 'LISTENER_NOT_FOUND', + kafkaStatusCode: 72, + isRetryableError: false, + httpResponseCode: 400, + httpResponseMessage: 'Listener not found' + } + ], + [ + 73, + { + kafkaStatus: 'TOPIC_DELETION_DISABLED', + kafkaStatusCode: 73, + isRetryableError: false, + httpResponseCode: 403, + httpResponseMessage: 'Topic deletion disabled' + } + ], + [ + 74, + { + kafkaStatus: 'FENCED_LEADER_EPOCH', + kafkaStatusCode: 74, + isRetryableError: true, + httpResponseCode: 503, + httpResponseMessage: 'Fenced leader epoch' + } + ], + [ + 75, + { + kafkaStatus: 'UNKNOWN_LEADER_EPOCH', + kafkaStatusCode: 75, + isRetryableError: true, + httpResponseCode: 503, + httpResponseMessage: 'Unknown leader epoch' + } + ], + [ + 76, + { + kafkaStatus: 'UNSUPPORTED_COMPRESSION_TYPE', + kafkaStatusCode: 76, + isRetryableError: false, + httpResponseCode: 400, + httpResponseMessage: 'Unsupported compression type' + } + ], + [ + 77, + { + kafkaStatus: 'STALE_BROKER_EPOCH', + kafkaStatusCode: 77, + isRetryableError: false, + httpResponseCode: 400, + httpResponseMessage: 'Stale broker epoch' + } + ], + [ + 78, + { + kafkaStatus: 'OFFSET_NOT_AVAILABLE', + kafkaStatusCode: 78, + isRetryableError: true, + httpResponseCode: 503, + httpResponseMessage: 'Offset not available' + } + ], + [ + 79, + { + kafkaStatus: 'MEMBER_ID_REQUIRED', + kafkaStatusCode: 79, + isRetryableError: false, + httpResponseCode: 400, + httpResponseMessage: 'Member id required' + } + ], + [ + 80, + { + kafkaStatus: 'PREFERRED_LEADER_NOT_AVAILABLE', + kafkaStatusCode: 80, + isRetryableError: true, + httpResponseCode: 503, + httpResponseMessage: 'Preferred leader not available' + } + ], + [ + 81, + { + kafkaStatus: 'GROUP_MAX_SIZE_REACHED', + kafkaStatusCode: 81, + isRetryableError: false, + httpResponseCode: 400, + httpResponseMessage: 'Group maximum size reached' + } + ], + [ + 82, + { + kafkaStatus: 'FENCED_INSTANCE_ID', + kafkaStatusCode: 82, + isRetryableError: false, + httpResponseCode: 400, + httpResponseMessage: 'Fenced instance id' + } + ], + [ + 83, + { + kafkaStatus: 'ELIGIBLE_LEADERS_NOT_AVAILABLE', + kafkaStatusCode: 83, + isRetryableError: true, + httpResponseCode: 503, + httpResponseMessage: 'Eligible leaders not available' + } + ], + [ + 84, + { + kafkaStatus: 'ELECTION_NOT_NEEDED', + kafkaStatusCode: 84, + isRetryableError: false, + httpResponseCode: 400, + httpResponseMessage: 'Election not needed' + } + ], + [ + 85, + { + kafkaStatus: 'NO_REASSIGNMENT_IN_PROGRESS', + kafkaStatusCode: 85, + isRetryableError: false, + httpResponseCode: 400, + httpResponseMessage: 'No reassignment in progress' + } + ], + [ + 86, + { + kafkaStatus: 'GROUP_SUBSCRIBED_TO_TOPIC', + kafkaStatusCode: 86, + isRetryableError: false, + httpResponseCode: 400, + httpResponseMessage: 'Group subscribed to topic' + } + ], + [ + 87, + { + kafkaStatus: 'INVALID_RECORD', + kafkaStatusCode: 87, + isRetryableError: false, + httpResponseCode: 400, + httpResponseMessage: 'Invalid record' + } + ], + [ + 88, + { + kafkaStatus: 'UNSTABLE_OFFSET_COMMIT', + kafkaStatusCode: 88, + isRetryableError: true, + httpResponseCode: 503, + httpResponseMessage: 'Unstable offset commit' + } + ], + [ + 89, + { + kafkaStatus: 'THROTTLING_QUOTA_EXCEEDED', + kafkaStatusCode: 89, + isRetryableError: true, + httpResponseCode: 429, + httpResponseMessage: 'Throttling quota exceeded' + } + ], + [ + 90, + { + kafkaStatus: 'PRODUCER_FENCED', + kafkaStatusCode: 90, + isRetryableError: false, + httpResponseCode: 400, + httpResponseMessage: 'Producer fenced' + } + ], + [ + 91, + { + kafkaStatus: 'RESOURCE_NOT_FOUND', + kafkaStatusCode: 91, + isRetryableError: false, + httpResponseCode: 404, + httpResponseMessage: 'Resource not found' + } + ], + [ + 92, + { + kafkaStatus: 'DUPLICATE_RESOURCE', + kafkaStatusCode: 92, + isRetryableError: false, + httpResponseCode: 409, + httpResponseMessage: 'Duplicate resource' + } + ], + [ + 93, + { + kafkaStatus: 'UNACCEPTABLE_CREDENTIAL', + kafkaStatusCode: 93, + isRetryableError: false, + httpResponseCode: 401, + httpResponseMessage: 'Unacceptable credential' + } + ], + [ + 94, + { + kafkaStatus: 'INCONSISTENT_VOTER_SET', + kafkaStatusCode: 94, + isRetryableError: false, + httpResponseCode: 400, + httpResponseMessage: 'Inconsistent voter set' + } + ], + [ + 95, + { + kafkaStatus: 'INVALID_UPDATE_VERSION', + kafkaStatusCode: 95, + isRetryableError: false, + httpResponseCode: 400, + httpResponseMessage: 'Invalid update version' + } + ], + [ + 96, + { + kafkaStatus: 'FEATURE_UPDATE_FAILED', + kafkaStatusCode: 96, + isRetryableError: false, + httpResponseCode: 500, + httpResponseMessage: 'Feature update failed' + } + ], + [ + 97, + { + kafkaStatus: 'PRINCIPAL_DESERIALIZATION_FAILURE', + kafkaStatusCode: 97, + isRetryableError: false, + httpResponseCode: 400, + httpResponseMessage: 'Principal deserialization failure' + } + ], + [ + 98, + { + kafkaStatus: 'SNAPSHOT_NOT_FOUND', + kafkaStatusCode: 98, + isRetryableError: false, + httpResponseCode: 404, + httpResponseMessage: 'Snapshot not found' + } + ], + [ + 99, + { + kafkaStatus: 'POSITION_OUT_OF_RANGE', + kafkaStatusCode: 99, + isRetryableError: false, + httpResponseCode: 400, + httpResponseMessage: 'Position out of range' + } + ], + [ + 100, + { + kafkaStatus: 'UNKNOWN_TOPIC_ID', + kafkaStatusCode: 100, + isRetryableError: false, + httpResponseCode: 404, + httpResponseMessage: 'Unknown topic id' + } + ], + [ + 101, + { + kafkaStatus: 'DUPLICATE_BROKER_REGISTRATION', + kafkaStatusCode: 101, + isRetryableError: false, + httpResponseCode: 409, + httpResponseMessage: 'Duplicate broker registration' + } + ], + [ + 102, + { + kafkaStatus: 'BROKER_ID_NOT_REGISTERED', + kafkaStatusCode: 102, + isRetryableError: false, + httpResponseCode: 404, + httpResponseMessage: 'Broker id not registered' + } + ], + [ + 103, + { + kafkaStatus: 'INCONSISTENT_TOPIC_ID', + kafkaStatusCode: 103, + isRetryableError: false, + httpResponseCode: 400, + httpResponseMessage: 'Inconsistent topic id' + } + ], + [ + 104, + { + kafkaStatus: 'INCONSISTENT_CLUSTER_ID', + kafkaStatusCode: 104, + isRetryableError: false, + httpResponseCode: 400, + httpResponseMessage: 'Inconsistent cluster id' + } + ], + [ + 105, + { + kafkaStatus: 'TRANSACTIONAL_ID_NOT_FOUND', + kafkaStatusCode: 105, + isRetryableError: false, + httpResponseCode: 404, + httpResponseMessage: 'Transactional id not found' + } + ], + [ + 106, + { + kafkaStatus: 'FETCH_SESSION_TOPIC_ID_ERROR', + kafkaStatusCode: 106, + isRetryableError: true, + httpResponseCode: 503, + httpResponseMessage: 'Fetch session topic id error' + } + ] +]) + +/** + * Handles KafkaJS errors and maps them to appropriate Segment errors + * @param error - The KafkaJS error + * @param defaultMessage - Default error message if no specific mapping found + * @returns IntegrationError or RetryableError based on error type + */ +export const handleKafkaError = (error: KafkaJSError, defaultMessage: string): ActionDestinationErrorResponse => { + // Extract error code from KafkaJS error + let errorCode: number | undefined + + // KafkaJS errors often contain error codes in different formats + if ('errorCode' in error && typeof error.errorCode === 'number') { + errorCode = error.errorCode + } else if (error.cause && typeof error.cause === 'object' && error.cause !== null && 'code' in error.cause) { + errorCode = (error.cause as { code: number }).code + } else if (error.message) { + // Try to extract error code from message (e.g., "Error code: 3") + const match = error.message.match(/(?:error code:?\s*|code\s*=\s*)(-?\d+)/i) + if (match) { + errorCode = parseInt(match[1], 10) + } + } + + // Look up error in our mapping + if (errorCode !== undefined && KafkaErrorMap.has(errorCode)) { + const kafkaError = KafkaErrorMap.get(errorCode) + + if (kafkaError) { + return new ActionDestinationErrorResponse({ + errormessage: kafkaError.httpResponseMessage, + status: kafkaError.httpResponseCode, + body: kafkaError.kafkaStatus + }) + } + } + + // Fallback for unmapped errors + return new ActionDestinationErrorResponse({ + errormessage: `${defaultMessage}: ${error.message}`, + status: 400, + body: 'An unknown error occurred while processing the Kafka request.' + }) +} From 6bdd4f1842109abcd9e12bbbf3150ed5a72651c3 Mon Sep 17 00:00:00 2001 From: Sayan Das Date: Fri, 25 Jul 2025 12:24:19 +0530 Subject: [PATCH 2/5] Updated error response --- .../src/destinations/kafka/errors.ts | 25 +++-- .../src/destinations/kafka/utils.ts | 106 ++++++++++++++++-- 2 files changed, 114 insertions(+), 17 deletions(-) diff --git a/packages/destination-actions/src/destinations/kafka/errors.ts b/packages/destination-actions/src/destinations/kafka/errors.ts index ec44f4300c7..db6ba605799 100644 --- a/packages/destination-actions/src/destinations/kafka/errors.ts +++ b/packages/destination-actions/src/destinations/kafka/errors.ts @@ -1,5 +1,6 @@ import { KafkaJSError } from 'kafkajs' -import { ActionDestinationErrorResponse } from '@segment/actions-core' +import { ModifiedResponse } from '@segment/actions-core' +import { generateHttpResponse } from './utils' export type KafkaResponse = { kafkaStatus: string @@ -1111,7 +1112,7 @@ export const KafkaErrorMap = new Map([ * @param defaultMessage - Default error message if no specific mapping found * @returns IntegrationError or RetryableError based on error type */ -export const handleKafkaError = (error: KafkaJSError, defaultMessage: string): ActionDestinationErrorResponse => { +export const handleKafkaError = (error: KafkaJSError, defaultMessage: string): ModifiedResponse => { // Extract error code from KafkaJS error let errorCode: number | undefined @@ -1133,18 +1134,26 @@ export const handleKafkaError = (error: KafkaJSError, defaultMessage: string): A const kafkaError = KafkaErrorMap.get(errorCode) if (kafkaError) { - return new ActionDestinationErrorResponse({ - errormessage: kafkaError.httpResponseMessage, + return generateHttpResponse({ status: kafkaError.httpResponseCode, - body: kafkaError.kafkaStatus + statusText: kafkaError.httpResponseMessage, + url: 'https://kafka.segment.com', + data: JSON.stringify({ + message: kafkaError.httpResponseMessage, + kafkaStatus: kafkaError.kafkaStatus + }) }) } } // Fallback for unmapped errors - return new ActionDestinationErrorResponse({ - errormessage: `${defaultMessage}: ${error.message}`, + return generateHttpResponse({ status: 400, - body: 'An unknown error occurred while processing the Kafka request.' + statusText: 'Bad Request', + url: 'https://kafka.segment.com', + data: JSON.stringify({ + message: defaultMessage, + kafkaStatus: 'UNKNOWN_ERROR' + }) }) } diff --git a/packages/destination-actions/src/destinations/kafka/utils.ts b/packages/destination-actions/src/destinations/kafka/utils.ts index 76afdfd150d..ea76e38268f 100644 --- a/packages/destination-actions/src/destinations/kafka/utils.ts +++ b/packages/destination-actions/src/destinations/kafka/utils.ts @@ -1,10 +1,18 @@ import { Kafka, ProducerRecord, Partitioners, SASLOptions, KafkaConfig, KafkaJSError, Producer } from 'kafkajs' -import { DynamicFieldResponse, IntegrationError, Features } from '@segment/actions-core' +import { + DynamicFieldResponse, + IntegrationError, + Features, + StatsContext, + MultiStatusResponse, + JSONLikeObject, + ModifiedResponse +} from '@segment/actions-core' import type { Settings } from './generated-types' import type { Payload } from './send/generated-types' import { DEFAULT_PARTITIONER, Message, TopicMessages, SSLConfig, CachedProducer } from './types' import { PRODUCER_REQUEST_TIMEOUT_MS, PRODUCER_TTL_MS, FLAGON_NAME } from './constants' -import { StatsContext } from '@segment/actions-core/destination-kit' +import { handleKafkaError } from './errors' export const producersByConfig: Record = {} @@ -137,7 +145,10 @@ const getProducer = (settings: Settings) => { }) } -export const getOrCreateProducer = async (settings: Settings, statsContext: StatsContext | undefined): Promise => { +export const getOrCreateProducer = async ( + settings: Settings, + statsContext: StatsContext | undefined +): Promise => { const key = serializeKafkaConfig(settings) const now = Date.now() @@ -174,7 +185,23 @@ export const getOrCreateProducer = async (settings: Settings, statsContext: Stat return producer } -export const sendData = async (settings: Settings, payload: Payload[], features: Features | undefined, statsContext: StatsContext | undefined) => { +export const sendData = async ( + settings: Settings, + payload: Payload[], + features: Features | undefined, + statsContext: StatsContext | undefined +) => { + // Assume everything to be successful by default + const multiStatusResponse = new MultiStatusResponse() + + for (let i = 0; i < payload.length; i++) { + multiStatusResponse.setSuccessResponseAtIndex(i, { + sent: payload[i].payload as JSONLikeObject, + body: 'Message sent successfully', + status: 200 + }) + } + validate(settings) const groupedPayloads: { [topic: string]: Payload[] } = {} @@ -213,11 +240,7 @@ export const sendData = async (settings: Settings, payload: Payload[], features: try { await producer.send(data as ProducerRecord) } catch (error) { - throw new IntegrationError( - `Kafka Producer Error: ${(error as KafkaJSError).message}`, - 'KAFKA_PRODUCER_ERROR', - 400 - ) + return handleKafkaError(error as KafkaJSError, `Failed to deliver message to kafka: ${(error as Error).message}`) } } @@ -229,4 +252,69 @@ export const sendData = async (settings: Settings, payload: Payload[], features: } else { await producer.disconnect() } + + return generateHttpResponse({ + status: 200, + statusText: 'OK', + url: 'https://kafka.segment.com', + data: '{"message": "Messages sent successfully"}' + }) +} + +type ModifiedResponseInput = { + status: number + statusText: string + url: string + data: string +} + +export function generateHttpResponse(input: ModifiedResponseInput): ModifiedResponse { + const headers = new Headers({ + 'Content-Type': 'application/json' + }) as Headers & { toJSON: () => Record } + headers.toJSON = () => { + const jsonHeaders: Record = {} + headers.forEach((value, key) => { + jsonHeaders[key] = value + }) + return jsonHeaders + } + + const encoder = new TextEncoder() + const encodedBytes = encoder.encode(input.data) + + return { + status: input.status, + statusText: input.statusText, + ok: true, + url: input.url, + headers: headers as Headers & { toJSON: () => Record }, + content: input.data, + data: input.data ? JSON.parse(input.data) : undefined, + redirected: false, + type: 'default', + clone: function () { + return { ...this } + }, + body: null, + bodyUsed: false, + arrayBuffer: async function () { + return encodedBytes.buffer + }, + blob: async function () { + return new Blob([input.data], { type: 'application/json' }) + }, + formData: async function () { + throw new Error('formData not implemented') + }, + json: async function () { + return input.data ? JSON.parse(input.data) : undefined + }, + text: async function () { + return input.data + }, + bytes: async function () { + return encodedBytes + } + } } From 68a9a78e574f02dd3a3f3ac45b3b337c8617e682 Mon Sep 17 00:00:00 2001 From: Sayan Das Date: Fri, 25 Jul 2025 12:24:19 +0530 Subject: [PATCH 3/5] Simplify Kafka errors --- .../src/destinations/kafka/errors.ts | 52 +++----------- .../src/destinations/kafka/utils.ts | 70 +------------------ 2 files changed, 11 insertions(+), 111 deletions(-) diff --git a/packages/destination-actions/src/destinations/kafka/errors.ts b/packages/destination-actions/src/destinations/kafka/errors.ts index db6ba605799..c1aa9b15a36 100644 --- a/packages/destination-actions/src/destinations/kafka/errors.ts +++ b/packages/destination-actions/src/destinations/kafka/errors.ts @@ -1,6 +1,5 @@ import { KafkaJSError } from 'kafkajs' -import { ModifiedResponse } from '@segment/actions-core' -import { generateHttpResponse } from './utils' +import { IntegrationError, RetryableError } from '@segment/actions-core' export type KafkaResponse = { kafkaStatus: string @@ -1112,48 +1111,15 @@ export const KafkaErrorMap = new Map([ * @param defaultMessage - Default error message if no specific mapping found * @returns IntegrationError or RetryableError based on error type */ -export const handleKafkaError = (error: KafkaJSError, defaultMessage: string): ModifiedResponse => { - // Extract error code from KafkaJS error - let errorCode: number | undefined - - // KafkaJS errors often contain error codes in different formats - if ('errorCode' in error && typeof error.errorCode === 'number') { - errorCode = error.errorCode - } else if (error.cause && typeof error.cause === 'object' && error.cause !== null && 'code' in error.cause) { - errorCode = (error.cause as { code: number }).code - } else if (error.message) { - // Try to extract error code from message (e.g., "Error code: 3") - const match = error.message.match(/(?:error code:?\s*|code\s*=\s*)(-?\d+)/i) - if (match) { - errorCode = parseInt(match[1], 10) - } - } - - // Look up error in our mapping - if (errorCode !== undefined && KafkaErrorMap.has(errorCode)) { - const kafkaError = KafkaErrorMap.get(errorCode) - - if (kafkaError) { - return generateHttpResponse({ - status: kafkaError.httpResponseCode, - statusText: kafkaError.httpResponseMessage, - url: 'https://kafka.segment.com', - data: JSON.stringify({ - message: kafkaError.httpResponseMessage, - kafkaStatus: kafkaError.kafkaStatus - }) - }) +export const handleKafkaError = (error: KafkaJSError, defaultMessage: string) => { + if (error instanceof KafkaJSError) { + if (error.retriable) { + return new RetryableError(error.message) + } else { + return new IntegrationError(error.message, 'KAFKA_ERROR', 400) } } - // Fallback for unmapped errors - return generateHttpResponse({ - status: 400, - statusText: 'Bad Request', - url: 'https://kafka.segment.com', - data: JSON.stringify({ - message: defaultMessage, - kafkaStatus: 'UNKNOWN_ERROR' - }) - }) + // Fallback to default error handling + return new IntegrationError(defaultMessage, 'KAFKA_ERROR', 400) } diff --git a/packages/destination-actions/src/destinations/kafka/utils.ts b/packages/destination-actions/src/destinations/kafka/utils.ts index ea76e38268f..bf2d2534a41 100644 --- a/packages/destination-actions/src/destinations/kafka/utils.ts +++ b/packages/destination-actions/src/destinations/kafka/utils.ts @@ -5,8 +5,7 @@ import { Features, StatsContext, MultiStatusResponse, - JSONLikeObject, - ModifiedResponse + JSONLikeObject } from '@segment/actions-core' import type { Settings } from './generated-types' import type { Payload } from './send/generated-types' @@ -240,7 +239,7 @@ export const sendData = async ( try { await producer.send(data as ProducerRecord) } catch (error) { - return handleKafkaError(error as KafkaJSError, `Failed to deliver message to kafka: ${(error as Error).message}`) + handleKafkaError(error as KafkaJSError, `Failed to deliver message to kafka: ${(error as Error).message}`) } } @@ -252,69 +251,4 @@ export const sendData = async ( } else { await producer.disconnect() } - - return generateHttpResponse({ - status: 200, - statusText: 'OK', - url: 'https://kafka.segment.com', - data: '{"message": "Messages sent successfully"}' - }) -} - -type ModifiedResponseInput = { - status: number - statusText: string - url: string - data: string -} - -export function generateHttpResponse(input: ModifiedResponseInput): ModifiedResponse { - const headers = new Headers({ - 'Content-Type': 'application/json' - }) as Headers & { toJSON: () => Record } - headers.toJSON = () => { - const jsonHeaders: Record = {} - headers.forEach((value, key) => { - jsonHeaders[key] = value - }) - return jsonHeaders - } - - const encoder = new TextEncoder() - const encodedBytes = encoder.encode(input.data) - - return { - status: input.status, - statusText: input.statusText, - ok: true, - url: input.url, - headers: headers as Headers & { toJSON: () => Record }, - content: input.data, - data: input.data ? JSON.parse(input.data) : undefined, - redirected: false, - type: 'default', - clone: function () { - return { ...this } - }, - body: null, - bodyUsed: false, - arrayBuffer: async function () { - return encodedBytes.buffer - }, - blob: async function () { - return new Blob([input.data], { type: 'application/json' }) - }, - formData: async function () { - throw new Error('formData not implemented') - }, - json: async function () { - return input.data ? JSON.parse(input.data) : undefined - }, - text: async function () { - return input.data - }, - bytes: async function () { - return encodedBytes - } - } } From c38a1200e6444ed8cb0c3098fa53a8dd8280d3ba Mon Sep 17 00:00:00 2001 From: Sayan Das Date: Fri, 25 Jul 2025 12:24:19 +0530 Subject: [PATCH 4/5] Added error handling in Kafka --- .../__tests__/create-request-client.test.ts | 22 ++++++++ packages/core/src/request-client.ts | 10 ++++ .../src/destinations/kafka/errors.ts | 51 +++++++++++++++++- .../src/destinations/kafka/send/index.ts | 8 +-- .../src/destinations/kafka/utils.ts | 54 ++++++++++++------- 5 files changed, 120 insertions(+), 25 deletions(-) diff --git a/packages/core/src/__tests__/create-request-client.test.ts b/packages/core/src/__tests__/create-request-client.test.ts index 9cdb1a78ba7..b415c00da97 100644 --- a/packages/core/src/__tests__/create-request-client.test.ts +++ b/packages/core/src/__tests__/create-request-client.test.ts @@ -1,6 +1,7 @@ import btoa from 'btoa-lite' import createTestServer from 'create-test-server' import createRequestClient from '../create-request-client' +import { Response } from '../fetch' describe('createRequestClient', () => { it('should create a request client instance that has Segment defaults', async () => { @@ -111,4 +112,25 @@ describe('createRequestClient', () => { }) await server.close() }) + + it('should emulate an HTTP response if emulateHttpResponse property is set', async () => { + const server = await createTestServer() + server.get('/', (_request, response) => { + response.json({ greeting: 'This is the server response' }) + }) + + const request = createRequestClient() + + const emulateHttpResponse = new Response(JSON.stringify({ hello: 'This is the emulated response' }), { + headers: { 'Content-Type': 'application/json' }, + status: 200, + statusText: 'OK' + }) + + await expect(request(server.url, { method: 'get', emulateHttpResponse })).resolves.toMatchObject({ + data: expect.objectContaining({ hello: 'This is the emulated response' }) + }) + + await server.close() + }) }) diff --git a/packages/core/src/request-client.ts b/packages/core/src/request-client.ts index 7eeb8a4a286..b09fb954c67 100644 --- a/packages/core/src/request-client.ts +++ b/packages/core/src/request-client.ts @@ -69,6 +69,11 @@ export interface RequestOptions extends Omit { * Uses the provided https.Agent */ agent?: https.Agent + /** + * Used to emulate an http request. + * If set, instead of making a real HTTP request, the provided response will be returned. + */ + emulateHttpResponse?: Response } /** @@ -357,6 +362,11 @@ class RequestClient { } } + // If we have an emulateHttpResponse, return it instead of making a real request + if (this.options.emulateHttpResponse) { + return Promise.resolve(this.options.emulateHttpResponse) + } + if (this.options.timeout === false) { return fetch(this.request.clone()) } diff --git a/packages/destination-actions/src/destinations/kafka/errors.ts b/packages/destination-actions/src/destinations/kafka/errors.ts index c1aa9b15a36..1609474e9c9 100644 --- a/packages/destination-actions/src/destinations/kafka/errors.ts +++ b/packages/destination-actions/src/destinations/kafka/errors.ts @@ -1,5 +1,6 @@ import { KafkaJSError } from 'kafkajs' -import { IntegrationError, RetryableError } from '@segment/actions-core' +import { IntegrationError, RetryableError, RequestClient, Response } from '@segment/actions-core' +import { TopicMessages } from './types' export type KafkaResponse = { kafkaStatus: string @@ -1111,15 +1112,61 @@ export const KafkaErrorMap = new Map([ * @param defaultMessage - Default error message if no specific mapping found * @returns IntegrationError or RetryableError based on error type */ -export const handleKafkaError = (error: KafkaJSError, defaultMessage: string) => { +export async function handleKafkaError( + request: RequestClient, + url: string, + error: KafkaJSError, + defaultMessage: string, + data?: TopicMessages +) { + const httpErrorBody: HttpErrorBody = { + name: error.name, + message: error.message, + retryable: error.retriable + } + if (error instanceof KafkaJSError) { if (error.retriable) { + await emulateFailedHttpRequest(request, url, httpErrorBody, data) return new RetryableError(error.message) } else { + await emulateFailedHttpRequest(request, url, httpErrorBody, data) return new IntegrationError(error.message, 'KAFKA_ERROR', 400) } } // Fallback to default error handling + await emulateFailedHttpRequest(request, url, httpErrorBody, data) return new IntegrationError(defaultMessage, 'KAFKA_ERROR', 400) } + +type HttpErrorBody = { + name: string + message: string + retryable: boolean +} + +function emulateFailedHttpRequest( + request: RequestClient, + url: string, + httpErrorBody: HttpErrorBody, + data?: TopicMessages +) { + const emulateHttpResponse = new Response(JSON.stringify(httpErrorBody), { + headers: { 'Content-Type': 'application/json' }, + status: httpErrorBody.retryable ? 503 : 400, + statusText: 'Kafka Error' + }) + + return request(url, { + method: 'POST', + json: { + data + }, + headers: { + 'Content-Type': 'application/json' + }, + emulateHttpResponse, + throwHttpErrors: false + }) +} diff --git a/packages/destination-actions/src/destinations/kafka/send/index.ts b/packages/destination-actions/src/destinations/kafka/send/index.ts index 83981925779..c577494a887 100644 --- a/packages/destination-actions/src/destinations/kafka/send/index.ts +++ b/packages/destination-actions/src/destinations/kafka/send/index.ts @@ -49,11 +49,11 @@ const action: ActionDefinition = { return getTopics(settings) } }, - perform: async (_request, { settings, payload, features, statsContext }) => { - await sendData(settings, [payload], features, statsContext) + perform: async (request, { settings, payload, features, statsContext }) => { + await sendData(request, settings, [payload], features, statsContext) }, - performBatch: async (_request, { settings, payload, features, statsContext }) => { - await sendData(settings, payload, features, statsContext) + performBatch: async (request, { settings, payload, features, statsContext }) => { + await sendData(request, settings, payload, features, statsContext) } } diff --git a/packages/destination-actions/src/destinations/kafka/utils.ts b/packages/destination-actions/src/destinations/kafka/utils.ts index bf2d2534a41..1fd0921f7f7 100644 --- a/packages/destination-actions/src/destinations/kafka/utils.ts +++ b/packages/destination-actions/src/destinations/kafka/utils.ts @@ -4,8 +4,8 @@ import { IntegrationError, Features, StatsContext, - MultiStatusResponse, - JSONLikeObject + RequestClient, + Response } from '@segment/actions-core' import type { Settings } from './generated-types' import type { Payload } from './send/generated-types' @@ -185,22 +185,12 @@ export const getOrCreateProducer = async ( } export const sendData = async ( + request: RequestClient, settings: Settings, payload: Payload[], features: Features | undefined, statsContext: StatsContext | undefined ) => { - // Assume everything to be successful by default - const multiStatusResponse = new MultiStatusResponse() - - for (let i = 0; i < payload.length; i++) { - multiStatusResponse.setSuccessResponseAtIndex(i, { - sent: payload[i].payload as JSONLikeObject, - body: 'Message sent successfully', - status: 200 - }) - } - validate(settings) const groupedPayloads: { [topic: string]: Payload[] } = {} @@ -228,18 +218,25 @@ export const sendData = async ( })) let producer: Producer - if (features && features[FLAGON_NAME]) { - producer = await getOrCreateProducer(settings, statsContext) - } else { - producer = getProducer(settings) - await producer.connect() + try { + if (features && features[FLAGON_NAME]) { + producer = await getOrCreateProducer(settings, statsContext) + } else { + producer = getProducer(settings) + await producer.connect() + } + } catch (error) { + return handleKafkaError(request, settings.brokers, error as KafkaJSError, settings.brokers) } for (const data of topicMessages) { try { await producer.send(data as ProducerRecord) + + // If the send was successful, emulate an HTTP request to record a trace + await emulateSuccessfulHttpRequest(request, settings.brokers, data) } catch (error) { - handleKafkaError(error as KafkaJSError, `Failed to deliver message to kafka: ${(error as Error).message}`) + return handleKafkaError(request, settings.brokers, error as KafkaJSError, settings.brokers) } } @@ -252,3 +249,22 @@ export const sendData = async ( await producer.disconnect() } } + +function emulateSuccessfulHttpRequest(request: RequestClient, url: string, data: TopicMessages) { + const emulateHttpResponse = new Response(JSON.stringify({ status: 'success' }), { + headers: { 'Content-Type': 'application/json' }, + status: 200, + statusText: 'OK' + }) + + return request(url, { + method: 'POST', + json: { + data + }, + headers: { + 'Content-Type': 'application/json' + }, + emulateHttpResponse + }) +} From 987fc7bc890f87b4f20fb75af15f7b21839cc7b0 Mon Sep 17 00:00:00 2001 From: Arijit Ray <35370469+itsarijitray@users.noreply.github.com> Date: Mon, 28 Jul 2025 13:18:50 +0530 Subject: [PATCH 5/5] Add return statements --- .../destination-actions/src/destinations/kafka/send/index.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/packages/destination-actions/src/destinations/kafka/send/index.ts b/packages/destination-actions/src/destinations/kafka/send/index.ts index c577494a887..4d33db418fd 100644 --- a/packages/destination-actions/src/destinations/kafka/send/index.ts +++ b/packages/destination-actions/src/destinations/kafka/send/index.ts @@ -50,10 +50,10 @@ const action: ActionDefinition = { } }, perform: async (request, { settings, payload, features, statsContext }) => { - await sendData(request, settings, [payload], features, statsContext) + return sendData(request, settings, [payload], features, statsContext) }, performBatch: async (request, { settings, payload, features, statsContext }) => { - await sendData(request, settings, payload, features, statsContext) + return await sendData(request, settings, payload, features, statsContext) } }