Skip to content

Commit ebc5943

Browse files
committed
random stuff
1 parent d9cdfca commit ebc5943

File tree

5 files changed

+150
-121
lines changed

5 files changed

+150
-121
lines changed

lib/dispatcher/client-h1.js

Lines changed: 56 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -176,7 +176,7 @@ function lazyllhttp () {
176176
}
177177

178178
}
179-
})
179+
}).exports
180180
}
181181

182182
let llhttpInstance = null
@@ -210,8 +210,8 @@ class Parser {
210210
* @param {import('net').Socket} socket
211211
* @param {*} llhttp
212212
*/
213-
constructor (client, socket, { exports }) {
214-
this.llhttp = exports
213+
constructor (client, socket, llhttp) {
214+
this.llhttp = llhttp
215215
this.ptr = this.llhttp.llhttp_alloc(constants.TYPE.RESPONSE)
216216
this.client = client
217217
/**
@@ -329,46 +329,44 @@ class Parser {
329329

330330
new Uint8Array(llhttp.memory.buffer, currentBufferPtr, currentBufferSize).set(chunk)
331331

332+
let ret
333+
334+
try {
335+
currentBufferRef = chunk
336+
currentParser = this
337+
ret = llhttp.llhttp_execute(this.ptr, currentBufferPtr, chunk.length)
338+
} catch (err) {
339+
util.destroy(socket, err)
340+
} finally {
341+
currentParser = null
342+
currentBufferRef = null
343+
}
344+
332345
// Call `execute` on the wasm parser.
333346
// We pass the `llhttp_parser` pointer address, the pointer address of buffer view data,
334347
// and finally the length of bytes to parse.
335348
// The return value is an error code or `constants.ERROR.OK`.
336-
try {
337-
let ret
338-
339-
try {
340-
currentBufferRef = chunk
341-
currentParser = this
342-
ret = llhttp.llhttp_execute(this.ptr, currentBufferPtr, chunk.length)
343-
} finally {
344-
currentParser = null
345-
currentBufferRef = null
346-
}
347-
348-
if (ret !== constants.ERROR.OK) {
349-
const data = chunk.subarray(llhttp.llhttp_get_error_pos(this.ptr) - currentBufferPtr)
350-
351-
if (ret === constants.ERROR.PAUSED_UPGRADE) {
352-
this.onUpgrade(data)
353-
} else if (ret === constants.ERROR.PAUSED) {
354-
this.paused = true
355-
socket.unshift(data)
356-
} else {
357-
const ptr = llhttp.llhttp_get_error_reason(this.ptr)
358-
let message = ''
359-
/* istanbul ignore else: difficult to make a test case for */
360-
if (ptr) {
361-
const len = new Uint8Array(llhttp.memory.buffer, ptr).indexOf(0)
362-
message =
363-
'Response does not match the HTTP/1.1 protocol (' +
364-
Buffer.from(llhttp.memory.buffer, ptr, len).toString() +
365-
')'
366-
}
367-
throw new HTTPParserError(message, constants.ERROR[ret], data)
349+
if (ret !== constants.ERROR.OK) {
350+
const data = chunk.subarray(llhttp.llhttp_get_error_pos(this.ptr) - currentBufferPtr)
351+
352+
if (ret === constants.ERROR.PAUSED_UPGRADE) {
353+
this.onUpgrade(data)
354+
} else if (ret === constants.ERROR.PAUSED) {
355+
this.paused = true
356+
socket.unshift(data)
357+
} else {
358+
const ptr = llhttp.llhttp_get_error_reason(this.ptr)
359+
let message = ''
360+
/* istanbul ignore else: difficult to make a test case for */
361+
if (ptr) {
362+
const len = new Uint8Array(llhttp.memory.buffer, ptr).indexOf(0)
363+
message =
364+
'Response does not match the HTTP/1.1 protocol (' +
365+
Buffer.from(llhttp.memory.buffer, ptr, len).toString() +
366+
')'
368367
}
368+
util.destroy(socket, new HTTPParserError(message, constants.ERROR[ret], data))
369369
}
370-
} catch (err) {
371-
util.destroy(socket, err)
372370
}
373371
}
374372

@@ -1342,7 +1340,7 @@ function writeBuffer (abort, body, client, request, socket, contentLength, heade
13421340
socket.cork()
13431341
socket.write(`${header}content-length: ${contentLength}\r\n\r\n`, 'latin1')
13441342
socket.write(body)
1345-
socket.uncork()
1343+
process.nextTick(() => socket.uncork())
13461344
request.onBodySent(body)
13471345

13481346
if (!expectsPayload && request.reset !== false) {
@@ -1366,31 +1364,35 @@ function writeBuffer (abort, body, client, request, socket, contentLength, heade
13661364
* @param {number} contentLength
13671365
* @param {string} header
13681366
* @param {boolean} expectsPayload
1369-
* @returns {Promise<void>}
1367+
* @returns {void}
13701368
*/
1371-
async function writeBlob (abort, body, client, request, socket, contentLength, header, expectsPayload) {
1369+
function writeBlob (abort, body, client, request, socket, contentLength, header, expectsPayload) {
13721370
assert(contentLength === body.size, 'blob body must have content length')
13731371

13741372
try {
13751373
if (contentLength != null && contentLength !== body.size) {
13761374
throw new RequestContentLengthMismatchError()
13771375
}
13781376

1379-
const buffer = Buffer.from(await body.arrayBuffer())
1377+
body.arrayBuffer().then(arrayBuffer => {
1378+
const buffer = Buffer.from(arrayBuffer)
13801379

1381-
socket.cork()
1382-
socket.write(`${header}content-length: ${contentLength}\r\n\r\n`, 'latin1')
1383-
socket.write(buffer)
1384-
socket.uncork()
1380+
socket.cork()
1381+
socket.write(`${header}content-length: ${contentLength}\r\n\r\n`, 'latin1')
1382+
socket.write(buffer)
1383+
process.nextTick(() => socket.uncork())
13851384

1386-
request.onBodySent(buffer)
1387-
request.onRequestSent()
1385+
request.onBodySent(buffer)
1386+
request.onRequestSent()
13881387

1389-
if (!expectsPayload && request.reset !== false) {
1390-
socket[kReset] = true
1391-
}
1388+
if (!expectsPayload && request.reset !== false) {
1389+
socket[kReset] = true
1390+
}
13921391

1393-
client[kResume]()
1392+
client[kResume]()
1393+
}).catch(err => {
1394+
abort(err)
1395+
})
13941396
} catch (err) {
13951397
abort(err)
13961398
}
@@ -1532,7 +1534,7 @@ class AsyncWriter {
15321534

15331535
const ret = socket.write(chunk)
15341536

1535-
socket.uncork()
1537+
process.nextTick(() => socket.uncork())
15361538

15371539
request.onBodySent(chunk)
15381540

@@ -1565,6 +1567,7 @@ class AsyncWriter {
15651567
return
15661568
}
15671569

1570+
socket.cork()
15681571
if (bytesWritten === 0) {
15691572
if (expectsPayload) {
15701573
// https://tools.ietf.org/html/rfc7230#section-3.3.2
@@ -1579,6 +1582,7 @@ class AsyncWriter {
15791582
} else if (contentLength === null) {
15801583
socket.write('\r\n0\r\n\r\n', 'latin1')
15811584
}
1585+
process.nextTick(() => socket.uncork())
15821586

15831587
if (contentLength !== null && bytesWritten !== contentLength) {
15841588
if (client[kStrictContentLength]) {

lib/dispatcher/client-h2.js

Lines changed: 25 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -671,7 +671,7 @@ function writeBuffer (abort, h2stream, body, client, request, socket, contentLen
671671
assert(contentLength === body.byteLength, 'buffer body must have content length')
672672
h2stream.cork()
673673
h2stream.write(body)
674-
h2stream.uncork()
674+
process.nextTick(() => h2stream.uncork())
675675
h2stream.end()
676676

677677
request.onBodySent(body)
@@ -719,29 +719,41 @@ function writeStream (abort, socket, expectsPayload, h2stream, body, client, req
719719
}
720720
}
721721

722-
async function writeBlob (abort, h2stream, body, client, request, socket, contentLength, expectsPayload) {
722+
/**
723+
* @param {(err?: Error) => void} abort
724+
* @param {import('node:http2').ClientHttp2Stream} h2stream
725+
* @param {Blob} body
726+
* @param {import('../../types/client.js').default} client
727+
* @param {*} request
728+
* @param {*} socket
729+
* @param {number|null} contentLength
730+
* @param {boolean} expectsPayload
731+
*/
732+
function writeBlob (abort, h2stream, body, client, request, socket, contentLength, expectsPayload) {
723733
assert(contentLength === body.size, 'blob body must have content length')
724734

725735
try {
726736
if (contentLength != null && contentLength !== body.size) {
727737
throw new RequestContentLengthMismatchError()
728738
}
729739

730-
const buffer = Buffer.from(await body.arrayBuffer())
740+
body.arrayBuffer().then(arrayBuffer => {
741+
const buffer = Buffer.from(arrayBuffer)
731742

732-
h2stream.cork()
733-
h2stream.write(buffer)
734-
h2stream.uncork()
735-
h2stream.end()
743+
h2stream.cork()
744+
h2stream.write(buffer)
745+
process.nextTick(() => h2stream.uncork())
746+
h2stream.end()
736747

737-
request.onBodySent(buffer)
738-
request.onRequestSent()
748+
request.onBodySent(buffer)
749+
request.onRequestSent()
739750

740-
if (!expectsPayload) {
741-
socket[kReset] = true
742-
}
751+
if (!expectsPayload) {
752+
socket[kReset] = true
753+
}
743754

744-
client[kResume]()
755+
client[kResume]()
756+
}).catch(err => abort(err))
745757
} catch (err) {
746758
abort(err)
747759
}

lib/dispatcher/client.js

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -72,9 +72,7 @@ function getPipelining (client) {
7272
return client[kPipelining] ?? client[kHTTPContext]?.defaultPipelining ?? 1
7373
}
7474

75-
/**
76-
* @type {import('../../types/client.js').default}
77-
*/
75+
/** @type {import('../../types/client.js').default} */
7876
class Client extends DispatcherBase {
7977
/**
8078
*

lib/web/fetch/index.js

Lines changed: 67 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -1948,76 +1948,91 @@ async function httpNetworkFetch (
19481948
fetchParams.controller.on('terminated', onAborted)
19491949
}
19501950

1951-
fetchParams.controller.resume = async () => {
1951+
fetchParams.controller.resume = () => {
1952+
const processNext = () => {
19521953
// 1. While true
1953-
while (true) {
1954-
// 1-3. See onData...
1955-
1956-
// 4. Set bytes to the result of handling content codings given
1957-
// codings and bytes.
1958-
let bytes
1959-
let isFailure
1960-
try {
1961-
const { done, value } = await fetchParams.controller.next()
1962-
1954+
return fetchParams.controller.next().then(({ done, value }) => {
19631955
if (isAborted(fetchParams)) {
1964-
break
1956+
return
19651957
}
19661958

1967-
bytes = done ? undefined : value
1968-
} catch (err) {
1969-
if (fetchParams.controller.ended && !timingInfo.encodedBodySize) {
1970-
// zlib doesn't like empty streams.
1971-
bytes = undefined
1972-
} else {
1973-
bytes = err
1959+
// 1-3. See onData...
19741960

1975-
// err may be propagated from the result of calling readablestream.cancel,
1976-
// which might not be an error. https://github.com/nodejs/undici/issues/2009
1977-
isFailure = true
1978-
}
1979-
}
1961+
// 4. Set bytes to the result of handling content codings given
1962+
// codings and bytes.
1963+
const bytes = done ? undefined : value
1964+
const isFailure = false
19801965

1981-
if (bytes === undefined) {
1966+
if (bytes === undefined) {
19821967
// 2. Otherwise, if the bytes transmission for response’s message
19831968
// body is done normally and stream is readable, then close
19841969
// stream, finalize response for fetchParams and response, and
19851970
// abort these in-parallel steps.
1986-
readableStreamClose(fetchParams.controller.controller)
1971+
readableStreamClose(fetchParams.controller.controller)
19871972

1988-
finalizeResponse(fetchParams, response)
1973+
finalizeResponse(fetchParams, response)
19891974

1990-
return
1991-
}
1975+
return
1976+
}
19921977

1993-
// 5. Increase timingInfo’s decoded body size by bytes’s length.
1994-
timingInfo.decodedBodySize += bytes?.byteLength ?? 0
1978+
// 5. Increase timingInfo’s decoded body size by bytes’s length.
1979+
timingInfo.decodedBodySize += bytes?.byteLength ?? 0
19951980

1996-
// 6. If bytes is failure, then terminate fetchParams’s controller.
1997-
if (isFailure) {
1998-
fetchParams.controller.terminate(bytes)
1999-
return
2000-
}
1981+
// 6. If bytes is failure, then terminate fetchParams’s controller.
1982+
if (isFailure) {
1983+
fetchParams.controller.terminate(bytes)
1984+
return
1985+
}
20011986

2002-
// 7. Enqueue a Uint8Array wrapping an ArrayBuffer containing bytes
2003-
// into stream.
2004-
const buffer = new Uint8Array(bytes)
2005-
if (buffer.byteLength) {
2006-
fetchParams.controller.controller.enqueue(buffer)
2007-
}
1987+
// 7. Enqueue a Uint8Array wrapping an ArrayBuffer containing bytes
1988+
// into stream.
1989+
const buffer = new Uint8Array(bytes)
1990+
if (buffer.byteLength) {
1991+
fetchParams.controller.controller.enqueue(buffer)
1992+
}
20081993

2009-
// 8. If stream is errored, then terminate the ongoing fetch.
2010-
if (isErrored(stream)) {
2011-
fetchParams.controller.terminate()
2012-
return
2013-
}
1994+
// 8. If stream is errored, then terminate the ongoing fetch.
1995+
if (isErrored(stream)) {
1996+
fetchParams.controller.terminate()
1997+
return
1998+
}
20141999

2015-
// 9. If stream doesn’t need more data ask the user agent to suspend
2016-
// the ongoing fetch.
2017-
if (fetchParams.controller.controller.desiredSize <= 0) {
2018-
return
2019-
}
2000+
// 9. If stream doesn’t need more data ask the user agent to suspend
2001+
// the ongoing fetch.
2002+
if (fetchParams.controller.controller.desiredSize <= 0) {
2003+
return
2004+
}
2005+
2006+
processNext()
2007+
}).catch((err) => {
2008+
if (isAborted(fetchParams)) {
2009+
return
2010+
}
2011+
2012+
let bytes = err
2013+
let isFailure = true
2014+
2015+
if (fetchParams.controller.ended && !timingInfo.encodedBodySize) {
2016+
// zlib doesn't like empty streams.
2017+
bytes = undefined
2018+
isFailure = false
2019+
2020+
readableStreamClose(fetchParams.controller.controller)
2021+
finalizeResponse(fetchParams, response)
2022+
return
2023+
}
2024+
2025+
// 5. Increase timingInfo's decoded body size by bytes's length.
2026+
timingInfo.decodedBodySize += bytes?.byteLength ?? 0
2027+
2028+
// 6. If bytes is failure, then terminate fetchParams's controller.
2029+
if (isFailure) {
2030+
fetchParams.controller.terminate(bytes)
2031+
}
2032+
})
20202033
}
2034+
2035+
processNext()
20212036
}
20222037

20232038
// 2. If aborted, then:

0 commit comments

Comments
 (0)