diff --git a/package-lock.json b/package-lock.json index 39a40de72b..72a26ba0a6 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1,12 +1,12 @@ { "name": "origintrail_node", - "version": "8.1.1-rc.5", + "version": "8.1.1-rc.6", "lockfileVersion": 2, "requires": true, "packages": { "": { "name": "origintrail_node", - "version": "8.1.1-rc.5", + "version": "8.1.1-rc.6", "license": "ISC", "dependencies": { "@comunica/query-sparql": "^4.0.2", @@ -61,7 +61,6 @@ "sequelize": "^6.29.0", "sqlite": "^5.1.1", "sqlite3": "^5.1.7", - "timeout-abort-controller": "^3.0.0", "toobusy-js": "^0.5.1", "uint8arrays": "^3.1.0", "umzug": "^3.2.1", @@ -25308,14 +25307,6 @@ "node": ">=0.10.0" } }, - "node_modules/timeout-abort-controller": { - "version": "3.0.0", - "resolved": "https://registry.npmjs.org/timeout-abort-controller/-/timeout-abort-controller-3.0.0.tgz", - "integrity": "sha512-O3e+2B8BKrQxU2YRyEjC/2yFdb33slI22WRdUaDx6rvysfi9anloNZyR2q0l6LnePo5qH7gSM7uZtvvwZbc2yA==", - "dependencies": { - "retimer": "^3.0.0" - } - }, "node_modules/tiny-case": { "version": "1.0.3", "resolved": "https://registry.npmjs.org/tiny-case/-/tiny-case-1.0.3.tgz", @@ -47889,14 +47880,6 @@ "integrity": "sha512-G7r3AhovYtr5YKOWQkta8RKAPb+J9IsO4uVmzjl8AZwfhs8UcUwTiD6gcJYSgOtzyjvQKrKYn41syHbUWMkafA==", "dev": true }, - "timeout-abort-controller": { - "version": "3.0.0", - "resolved": "https://registry.npmjs.org/timeout-abort-controller/-/timeout-abort-controller-3.0.0.tgz", - "integrity": "sha512-O3e+2B8BKrQxU2YRyEjC/2yFdb33slI22WRdUaDx6rvysfi9anloNZyR2q0l6LnePo5qH7gSM7uZtvvwZbc2yA==", - "requires": { - "retimer": "^3.0.0" - } - }, "tiny-case": { "version": "1.0.3", "resolved": "https://registry.npmjs.org/tiny-case/-/tiny-case-1.0.3.tgz", diff --git a/package.json b/package.json index 2c8d973c4e..333b2ad3c4 100644 --- a/package.json +++ b/package.json @@ -119,7 +119,6 @@ "sequelize": "^6.29.0", "sqlite": "^5.1.1", "sqlite3": "^5.1.7", - "timeout-abort-controller": "^3.0.0", "toobusy-js": "^0.5.1", "uint8arrays": "^3.1.0", "umzug": "^3.2.1", diff --git a/src/modules/network/implementation/libp2p-service.js b/src/modules/network/implementation/libp2p-service.js index 112ad76ee2..d3fb78482f 100644 --- a/src/modules/network/implementation/libp2p-service.js +++ b/src/modules/network/implementation/libp2p-service.js @@ -14,7 +14,6 @@ import { InMemoryRateLimiter } from 'rolling-rate-limiter'; import toobusy from 'toobusy-js'; import { mkdir, writeFile, readFile, stat } from 'fs/promises'; import ip from 'ip'; -import { TimeoutController } from 'timeout-abort-controller'; import { NETWORK_API_RATE_LIMIT, NETWORK_API_SPAM_DETECTION, @@ -297,110 +296,95 @@ class Libp2pService { .map((addr) => addr.toString().split('/')) .filter((splittedAddr) => !ip.isPrivate(splittedAddr[2]))[0]?.[2]; - this.logger.trace( - `Dialing remotePeerId: ${peerIdString} with public ip: ${publicIp}: protocol: ${protocol}, messageType: ${messageType} , operationId: ${operationId}`, - ); - let dialResult; - let dialStart; - let dialEnd; - try { - dialStart = Date.now(); - dialResult = await this.node.dialProtocol(peerIdObject, protocol); - dialEnd = Date.now(); - } catch (error) { - dialEnd = Date.now(); - nackMessage.data.errorMessage = `Unable to dial peer: ${peerIdString}. protocol: ${protocol}, messageType: ${messageType} , operationId: ${operationId}, dial execution time: ${ - dialEnd - dialStart - } ms. Error: ${error.message}`; + const timeoutSignal = AbortSignal.timeout(timeout); + const abortError = new Error('Message timed out'); - return nackMessage; - } - this.logger.trace( - `Created stream for peer: ${peerIdString}. protocol: ${protocol}, messageType: ${messageType} , operationId: ${operationId}, dial execution time: ${ - dialEnd - dialStart - } ms.`, - ); + const abortPromise = new Promise((_, reject) => { + timeoutSignal.onabort = () => reject(abortError); + }); + let stream; + let response; + let errorMessage; + let operationStart; + let operationEnd; + const onAbort = () => { + if (stream) stream.abort(abortError); + response = null; + }; - const { stream } = dialResult; + try { + timeoutSignal.addEventListener('abort', onAbort, { once: true }); + this.logger.trace( + `Dialing remotePeerId: ${peerIdString} with public ip: ${publicIp}: protocol: ${protocol}, messageType: ${messageType} , operationId: ${operationId}`, + ); - this.updateSessionStream(operationId, peerIdString, stream); + errorMessage = `dial peer`; + operationStart = Date.now(); + const dialPromise = this.node.dialProtocol(peerIdObject, protocol, { + signal: timeoutSignal, + }); + const dialResult = await Promise.race([dialPromise, abortPromise]); + operationEnd = Date.now(); + if (timeoutSignal.aborted) { + throw abortError; + } - const streamMessage = this.createStreamMessage(message, operationId, messageType); + this.logger.trace( + `Created stream for peer: ${peerIdString}. protocol: ${protocol}, messageType: ${messageType} , operationId: ${operationId}, dial execution time: ${ + operationEnd - operationStart + } ms.`, + ); - this.logger.trace( - `Sending message to ${peerIdString}. protocol: ${protocol}, messageType: ${messageType}, operationId: ${operationId}`, - ); + stream = dialResult.stream; - let sendMessageStart; - let sendMessageEnd; - try { - sendMessageStart = Date.now(); - await this._sendMessageToStream(stream, streamMessage); - sendMessageEnd = Date.now(); - } catch (error) { - sendMessageEnd = Date.now(); - nackMessage.data.errorMessage = `Unable to send message to peer: ${peerIdString}. protocol: ${protocol}, messageType: ${messageType}, operationId: ${operationId}, execution time: ${ - sendMessageEnd - sendMessageStart - } ms. Error: ${error.message}`; + this.updateSessionStream(operationId, peerIdString, stream); - return nackMessage; - } + const streamMessage = this.createStreamMessage(message, operationId, messageType); - let readResponseStart; - let readResponseEnd; - let response; - const abortSignalEventListener = async () => { - stream.abort(); - response = null; - }; - const timeoutController = new TimeoutController(timeout); - try { - readResponseStart = Date.now(); + this.logger.trace( + `Sending message to ${peerIdString}. protocol: ${protocol}, messageType: ${messageType}, operationId: ${operationId}`, + ); - timeoutController.signal.addEventListener('abort', abortSignalEventListener, { - once: true, - }); + errorMessage = `send message to peer`; + operationStart = Date.now(); + await this._sendMessageToStream(stream, streamMessage); + operationEnd = Date.now(); + if (timeoutSignal.aborted) { + throw abortError; + } + errorMessage = `read response from peer`; + operationStart = Date.now(); response = await this._readMessageFromStream( stream, this.isResponseValid.bind(this), peerIdString, ); - - if (timeoutController.signal.aborted) { - throw Error('Message timed out!'); + operationEnd = Date.now(); + if (timeoutSignal.aborted) { + throw abortError; } - timeoutController.signal.removeEventListener('abort', abortSignalEventListener); - timeoutController.clear(); + this.logger.trace( + `Receiving response from ${peerIdString}. protocol: ${protocol}, messageType: ${ + response.message?.header?.messageType + }, operationId: ${operationId}, execution time: ${ + operationEnd - operationStart + } ms.`, + ); - readResponseEnd = Date.now(); + if (!response.valid) { + nackMessage.data.errorMessage = 'Invalid response'; + return nackMessage; + } } catch (error) { - timeoutController.signal.removeEventListener('abort', abortSignalEventListener); - timeoutController.clear(); - - readResponseEnd = Date.now(); - nackMessage.data.errorMessage = `Unable to read response from peer ${peerIdString}. protocol: ${protocol}, messageType: ${messageType} , operationId: ${operationId}, execution time: ${ - readResponseEnd - readResponseStart - } ms. Error: ${error.message}`; - - return nackMessage; - } - - this.logger.trace( - `Receiving response from ${peerIdString}. protocol: ${protocol}, messageType: ${ - response.message?.header?.messageType - }, operationId: ${operationId}, execution time: ${ - readResponseEnd - readResponseStart - } ms.`, - ); - - if (!response.valid) { - nackMessage.data.errorMessage = 'Invalid response'; - + nackMessage.data.errorMessage = `Unable to ${errorMessage} ${peerIdString}. protocol: ${protocol}, messageType: ${messageType} , operationId: ${operationId}. Execution time: ${ + (operationEnd ?? Date.now()) - operationStart + } ms. Error: ${error.message.slice(0, 145)}`; return nackMessage; + } finally { + timeoutSignal.removeEventListener('abort', onAbort); } - return response.message; }