Skip to content

Commit 92da8c7

Browse files
vortigontmathieucarbou
authored andcommitted
fix: AsyncAbstractResponse might loose part of send buffer
AsyncAbstractResponse::_ack could allocate temp buffer with size larger than available sock buffer (i.e. to fit headers) and eventually lossing the remainder on transfer due to not checking if the complete data was added to sock buff. Refactoring code in favor of having a dedicated std::vector object acting as accumulating buffer and more carefull control on amount of data actually copied to sockbuff Closes #315
1 parent b67f0e9 commit 92da8c7

File tree

3 files changed

+115
-96
lines changed

3 files changed

+115
-96
lines changed

src/ESPAsyncWebServer.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1336,8 +1336,10 @@ class AsyncWebServerResponse {
13361336
bool _sendContentLength;
13371337
bool _chunked;
13381338
size_t _headLength;
1339+
// amount of data sent for content part of the response (excluding all headers)
13391340
size_t _sentLength;
13401341
size_t _ackedLength;
1342+
// amount of response bytes (including all headers) written to sockbuff for delivery
13411343
size_t _writtenLength;
13421344
WebResponseState _state;
13431345

src/WebResponseImpl.h

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,12 +39,17 @@ class AsyncAbstractResponse : public AsyncWebServerResponse {
3939
// in-flight queue credits
4040
size_t _in_flight_credit{2};
4141
#endif
42-
String _head;
42+
// buffer to accumulate all response headers
43+
String _assembled_headers;
44+
// amount of headers buffer sent
45+
size_t _assembled_headers_sent{0};
4346
// Data is inserted into cache at begin().
4447
// This is inefficient with vector, but if we use some other container,
4548
// we won't be able to access it as contiguous array of bytes when reading from it,
4649
// so by gaining performance in one place, we'll lose it in another.
4750
std::vector<uint8_t> _cache;
51+
// intermediate buffer to copy outbound data to
52+
std::vector<uint8_t> _send_buffer;
4853
size_t _readDataFromCacheOrContent(uint8_t *data, const size_t len);
4954
size_t _fillBufferAndProcessTemplates(uint8_t *buf, size_t maxLen);
5055

src/WebResponses.cpp

Lines changed: 107 additions & 95 deletions
Original file line numberDiff line numberDiff line change
@@ -327,8 +327,8 @@ size_t AsyncBasicResponse::_ack(AsyncWebServerRequest *request, size_t len, uint
327327

328328
/*
329329
* Abstract Response
330-
* */
331-
330+
*
331+
*/
332332
AsyncAbstractResponse::AsyncAbstractResponse(AwsTemplateProcessor callback) : _callback(callback) {
333333
// In case of template processing, we're unable to determine real response size
334334
if (callback) {
@@ -340,7 +340,7 @@ AsyncAbstractResponse::AsyncAbstractResponse(AwsTemplateProcessor callback) : _c
340340

341341
void AsyncAbstractResponse::_respond(AsyncWebServerRequest *request) {
342342
addHeader(T_Connection, T_close, false);
343-
_assembleHead(_head, request->version());
343+
_assembleHead(_assembled_headers, request->version());
344344
_state = RESPONSE_HEADERS;
345345
_ack(request, 0, 0);
346346
}
@@ -364,127 +364,139 @@ size_t AsyncAbstractResponse::_ack(AsyncWebServerRequest *request, size_t len, u
364364
async_ws_log_d("(chunk) out of in-flight credits");
365365
}
366366

367-
_in_flight -= (_in_flight > len) ? len : _in_flight;
368-
// get the size of available sock space
367+
_in_flight -= std::min(len, _in_flight);
368+
// for response data we need to control the queue and in-flight fragmentation. Sending small chunks could give low latency,
369+
// but flood asynctcp's queue and fragment socket buffer space for large responses.
370+
// Let's ignore polled acks and acks in case when we have more in-flight data then the available socket buff space.
371+
// That way we could balance on having half the buffer in-flight while another half is filling up, while minimizing events in asynctcp q
372+
if (_in_flight > request->client()->space()) {
373+
// async_ws_log_d("defer user call %u/%u", _in_flight, space);
374+
// take the credit back since we are ignoring this ack and rely on other inflight data
375+
if (len) {
376+
--_in_flight_credit;
377+
}
378+
return 0;
379+
}
369380
#endif
370381

382+
// this is not functionally needed in AsyncAbstractResponse itself, but keept for compatibility if some of the derived classes are rely on it somehow
371383
_ackedLength += len;
372-
size_t space = request->client()->space();
373384

374-
size_t headLen = _head.length();
385+
// send headers
375386
if (_state == RESPONSE_HEADERS) {
376-
if (space >= headLen) {
377-
_state = RESPONSE_CONTENT;
378-
space -= headLen;
379-
} else {
380-
String out = _head.substring(0, space);
381-
_head = _head.substring(space);
382-
_writtenLength += request->client()->write(out.c_str(), out.length());
383-
#if ASYNCWEBSERVER_USE_CHUNK_INFLIGHT
384-
_in_flight += out.length();
387+
// copy headers buffer to sock buffer
388+
size_t const added = request->client()->add(_assembled_headers.c_str() + _assembled_headers_sent, _assembled_headers.length() - _assembled_headers_sent);
389+
_writtenLength += added;
390+
if (added < _assembled_headers.length()){
391+
// we were not able to fit all headers in current buff, send this part here and return later for the rest
392+
_assembled_headers_sent += added;
393+
#if ASYNCWEBSERVER_USE_CHUNK_INFLIGHT
394+
_in_flight += added;
385395
--_in_flight_credit; // take a credit
386-
#endif
387-
return out.length();
396+
#endif
397+
if (!request->client()->send()){
398+
// something is wrong
399+
request->client()->close();
400+
}
401+
return added;
388402
}
403+
// otherwise we've added all the (ramainder) headers in current buff
404+
_state = RESPONSE_CONTENT;
405+
_assembled_headers = String(); // clear
389406
}
390407

391-
if (_state == RESPONSE_CONTENT) {
408+
// if there are leftovers in buffer from the previous run, let's deal with it first
409+
if (_state == RESPONSE_CONTENT && _send_buffer.size()){
410+
size_t const written = request->client()->add(reinterpret_cast<char*>(_send_buffer.data()), _send_buffer.size());
411+
if (written != _send_buffer.size()){
412+
// we were not able to fit entire buffer again?! OK, let's send partial now and come back later
413+
_writtenLength += written;
392414
#if ASYNCWEBSERVER_USE_CHUNK_INFLIGHT
393-
// for response data we need to control the queue and in-flight fragmentation. Sending small chunks could give low latency,
394-
// but flood asynctcp's queue and fragment socket buffer space for large responses.
395-
// Let's ignore polled acks and acks in case when we have more in-flight data then the available socket buff space.
396-
// That way we could balance on having half the buffer in-flight while another half is filling up, while minimizing events in asynctcp q
397-
if (_in_flight > space) {
398-
// async_ws_log_d("defer user call %u/%u", _in_flight, space);
399-
// take the credit back since we are ignoring this ack and rely on other inflight data
400-
if (len) {
401-
--_in_flight_credit;
402-
}
403-
return 0;
404-
}
415+
_in_flight += written;
416+
--_in_flight_credit; // take a credit
405417
#endif
418+
request->client()->send();
419+
_send_buffer.erase(_send_buffer.begin(), _send_buffer.begin() + written);
420+
return written;
421+
} else
422+
_send_buffer.clear();
423+
// OK buffer depleted, we can go on for more data within same sockbuff
424+
}
406425

407-
size_t outLen;
408-
if (_chunked) {
426+
// send content body
427+
if (_state == RESPONSE_CONTENT) {
428+
size_t const space = request->client()->space();
429+
if (_chunked || !_sendContentLength) {
409430
if (space <= 8) {
410431
return 0;
411432
}
412-
413-
outLen = space;
414-
} else if (!_sendContentLength) {
415-
outLen = space;
433+
_send_buffer.resize(space);
416434
} else {
417-
outLen = ((_contentLength - _sentLength) > space) ? space : (_contentLength - _sentLength);
418-
}
419-
420-
uint8_t *buf = (uint8_t *)malloc(outLen + headLen);
421-
if (!buf) {
422-
async_ws_log_e("Failed to allocate");
423-
request->abort();
424-
return 0;
425-
}
426-
427-
if (headLen) {
428-
memcpy(buf, _head.c_str(), _head.length());
435+
_send_buffer.resize(std::min(space, _contentLength - _sentLength));
429436
}
430437

431-
size_t readLen = 0;
432-
433438
if (_chunked) {
434439
// HTTP 1.1 allows leading zeros in chunk length. Or spaces may be added.
435-
// See RFC2616 sections 2, 3.6.1.
436-
readLen = _fillBufferAndProcessTemplates(buf + headLen + 6, outLen - 8);
440+
// See RFC2616 sections 2, 3.6.1 https://datatracker.ietf.org/doc/html/rfc2616#section-3.6.1
441+
size_t const readLen = _fillBufferAndProcessTemplates(_send_buffer.data() + 6, _send_buffer.size() - 8); // reserve 8 bytes for chunk size data
437442
if (readLen == RESPONSE_TRY_AGAIN) {
438-
free(buf);
439-
return 0;
443+
_send_buffer.clear(); // we won't send anything, do not release mem but leave it for later
444+
} else {
445+
sprintf(reinterpret_cast<char*>(_send_buffer.data()), "%04x\r\n", readLen); // print chunk size in buffer
446+
_send_buffer.at(readLen + 6) = '\r';
447+
_send_buffer.at(readLen + 7) = '\n';
448+
_send_buffer.resize(readLen + 8); // set internal vector's size to match added data
449+
if (!readLen){
450+
// last chunk?
451+
_state = RESPONSE_WAIT_ACK;
452+
}
453+
_sentLength += readLen; // not sure if that is needed for chunked data?
440454
}
441-
outLen = sprintf((char *)buf + headLen, "%04x", readLen) + headLen;
442-
buf[outLen++] = '\r';
443-
buf[outLen++] = '\n';
444-
outLen += readLen;
445-
buf[outLen++] = '\r';
446-
buf[outLen++] = '\n';
447455
} else {
448-
readLen = _fillBufferAndProcessTemplates(buf + headLen, outLen);
456+
size_t const readLen = _fillBufferAndProcessTemplates(_send_buffer.data(), _send_buffer.size());
449457
if (readLen == RESPONSE_TRY_AGAIN) {
450-
free(buf);
451-
return 0;
458+
_send_buffer.clear(); // won't release mem but leave it for later
459+
} else if (readLen == 0){
460+
// no more data to send
461+
_state = RESPONSE_WAIT_ACK;
462+
_send_buffer.clear();
463+
} else {
464+
_send_buffer.resize(readLen); // set internal vector's size to match added data
465+
_sentLength += readLen;
466+
if (_sendContentLength && _sentLength == _contentLength){
467+
// it was last piece of content
468+
_state = RESPONSE_WAIT_ACK;
469+
}
452470
}
453-
outLen = readLen + headLen;
454-
}
455-
456-
if (headLen) {
457-
_head = emptyString;
458471
}
459472

460-
if (outLen) {
461-
_writtenLength += request->client()->write((const char *)buf, outLen);
473+
size_t written{0};
474+
if (_send_buffer.size()){
475+
written = request->client()->add(reinterpret_cast<char*>(_send_buffer.data()), _send_buffer.size());
476+
if (written != _send_buffer.size()){
477+
// we were not able to send entire buffer now somehow, leave it for later
478+
// (this should not happen normally unless connection's TCP window suddenly changed or some other thread highjacked our sock buffer :) )
479+
_send_buffer.erase(_send_buffer.begin(), _send_buffer.begin() + written);
480+
} else
481+
_send_buffer.clear();
482+
483+
_writtenLength += written;
462484
#if ASYNCWEBSERVER_USE_CHUNK_INFLIGHT
463-
_in_flight += outLen;
485+
_in_flight += written;
464486
--_in_flight_credit; // take a credit
465487
#endif
466488
}
467-
468-
if (_chunked) {
469-
_sentLength += readLen;
470-
} else {
471-
_sentLength += outLen - headLen;
472-
}
473-
474-
free(buf);
475-
476-
if ((_chunked && readLen == 0) || (!_sendContentLength && outLen == 0) || (!_chunked && _sentLength == _contentLength)) {
477-
_state = RESPONSE_WAIT_ACK;
478-
}
479-
return outLen;
480-
481-
} else if (_state == RESPONSE_WAIT_ACK) {
482-
if (!_sendContentLength || _ackedLength >= _writtenLength) {
483-
_state = RESPONSE_END;
484-
if (!_chunked && !_sendContentLength) {
485-
request->client()->close(true);
486-
}
487-
}
489+
// wether or not we have a new content in buffer now we must send anyway whatever is in sockbuff (maybe empty body)
490+
// might be other data in sockbuff with hdrs or previous buffer data
491+
request->client()->send();
492+
return written;
493+
}
494+
495+
if (_state == RESPONSE_WAIT_ACK) {
496+
// we do not need to wait for any acks actually if we won't send any more data,
497+
// connection would be closed gracefully with last piece of data
498+
_state = RESPONSE_END;
499+
request->client()->close();
488500
}
489501
return 0;
490502
}
@@ -512,8 +524,8 @@ size_t AsyncAbstractResponse::_fillBufferAndProcessTemplates(uint8_t *data, size
512524
// Now we've read 'len' bytes, either from cache or from file
513525
// Search for template placeholders
514526
uint8_t *pTemplateStart = data;
515-
while ((pTemplateStart < &data[len]) && (pTemplateStart = (uint8_t *)memchr(pTemplateStart, TEMPLATE_PLACEHOLDER, &data[len - 1] - pTemplateStart + 1))
516-
) { // data[0] ... data[len - 1]
527+
while ((pTemplateStart < &data[len]) && (pTemplateStart = (uint8_t *)memchr(pTemplateStart, TEMPLATE_PLACEHOLDER, &data[len - 1] - pTemplateStart + 1)) ) {
528+
// data[0] ... data[len - 1]
517529
uint8_t *pTemplateEnd =
518530
(pTemplateStart < &data[len - 1]) ? (uint8_t *)memchr(pTemplateStart + 1, TEMPLATE_PLACEHOLDER, &data[len - 1] - pTemplateStart) : nullptr;
519531
// temporary buffer to hold parameter name

0 commit comments

Comments
 (0)