Skip to content

Commit 439fdb2

Browse files
committed
Fix and improve stream implementations
1 parent c34d321 commit 439fdb2

File tree

3 files changed

+88
-103
lines changed

3 files changed

+88
-103
lines changed

lib/request.js

Lines changed: 10 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ function Request(conn, id, role, keepalive) {
7272
this.destroy();
7373
});
7474

75-
this.on('close', this.errorStream._close.bind(this.errorStream));
75+
this.on('close', this.errorStream.destroy.bind(this.errorStream, null));
7676
}
7777
util.inherits(Request, IOStream);
7878

@@ -138,6 +138,8 @@ Request.prototype._createReqRes = function () {
138138

139139
this._req.complete = this._stdinComplete && this._dataComplete;
140140

141+
this.pause();
142+
141143
this.on('data', function (data) {
142144
if (this._req) {
143145
if (!this._req.push(data)) {
@@ -152,8 +154,6 @@ Request.prototype._createReqRes = function () {
152154
}
153155
});
154156

155-
this.pause();
156-
157157
if (this._role === fcgi.records.BeginRequest.roles.AUTHORIZER) {
158158
this._res = new AuthorizerResponse(this._req);
159159
} else {
@@ -238,12 +238,6 @@ Object.defineProperties(Request.prototype, {
238238
get: function () {
239239
return parseInt(this.params.REMOTE_PORT) || 0;
240240
}
241-
},
242-
243-
"destroyed": {
244-
get: function () {
245-
return !this._open;
246-
}
247241
}
248242
});
249243

@@ -255,11 +249,13 @@ Request.prototype.address = function () {
255249
};
256250
};
257251

258-
Request.prototype.destroy = function (err) {
259-
if (this._open) {
260-
this._close(err ? true : false);
261-
this._conn.endRequest(this._id, err ? 1 : 0);
262-
}
252+
Request.prototype._destroy = function (err, callback) {
253+
var self = this;
254+
IOStream.prototype._destroy.call(this, err, function () {
255+
self._conn.endRequest(self._id, err ? 1 : 0, undefined, function () {
256+
callback(err);
257+
});
258+
});
263259
}
264260

265261
Request.prototype.ref = function () {

lib/server.js

Lines changed: 35 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -126,18 +126,25 @@ function connectionListener(socket) {
126126
var stream = new fcgi.FastCGIStream(socket),
127127
requests = {};
128128

129-
function endRequest(id, status, reason) {
129+
function endRequest(id, status, reason, callback) {
130130
if (reason === undefined) {
131131
reason = fcgi.records.EndRequest.protocolStatus.REQUEST_COMPLETE;
132132
}
133133

134134
if (socket.writable && !socket.destroyed) {
135+
var closeConn = (id in requests && !requests[id]._keepAlive);
136+
delete requests[id];
137+
135138
stream.writeRecord(
136-
id, new fcgi.records.EndRequest(status, reason));
139+
id, new fcgi.records.EndRequest(status, reason),
140+
function (err) {
141+
if (closeConn && !socket.destroyed) {
142+
socket.end();
143+
}
137144

138-
if (id in requests && !requests[id]._keepAlive) {
139-
socket.end();
140-
}
145+
if (callback)
146+
callback(err);
147+
});
141148
}
142149

143150
delete requests[id];
@@ -177,10 +184,10 @@ function connectionListener(socket) {
177184
});
178185

179186
socket.once('error', function (err) {
180-
this.on('error', function () {})
187+
socket.on('error', function () {});
181188

182-
if (!self.emit('clientError', err, this)) {
183-
this.destroy(err);
189+
if (!self.emit('clientError', err, socket)) {
190+
socket.destroy();
184191
}
185192
});
186193

@@ -216,46 +223,46 @@ function connectionListener(socket) {
216223
switch (record.role) {
217224
case fcgi.records.BeginRequest.roles.RESPONDER:
218225
if (self.listeners('request').length < 1) {
219-
endRequest(id, 1, fcgi.records.EndRequest.protocolStatus.UNKNOWN_ROLE);
220-
221-
if (!keepAlive) {
222-
socket.end();
223-
}
226+
endRequest(id, 1, fcgi.records.EndRequest.protocolStatus.UNKNOWN_ROLE, function () {
227+
if (!keepAlive && !socket.destroyed) {
228+
socket.end();
229+
}
230+
});
224231

225232
return;
226233
}
227234
break;
228235

229236
case fcgi.records.BeginRequest.roles.AUTHORIZER:
230237
if (self.listeners('authorize').length < 1) {
231-
endRequest(id, 1, fcgi.records.EndRequest.protocolStatus.UNKNOWN_ROLE);
232-
233-
if (!keepAlive) {
234-
socket.end();
235-
}
238+
endRequest(id, 1, fcgi.records.EndRequest.protocolStatus.UNKNOWN_ROLE, function () {
239+
if (!keepAlive && !socket.destroyed) {
240+
socket.end();
241+
}
242+
});
236243

237244
return;
238245
}
239246
break;
240247

241248
case fcgi.records.BeginRequest.roles.FILTER:
242249
if (self.listeners('filter').length < 1) {
243-
endRequest(id, 1, fcgi.records.EndRequest.protocolStatus.UNKNOWN_ROLE);
244-
245-
if (!keepAlive) {
246-
socket.end();
247-
}
250+
endRequest(id, 1, fcgi.records.EndRequest.protocolStatus.UNKNOWN_ROLE, function () {
251+
if (!keepAlive && !socket.destroyed) {
252+
socket.end();
253+
}
254+
});
248255

249256
return;
250257
}
251258

252259
break;
253260
default:
254-
endRequest(id, 1, fcgi.records.EndRequest.protocolStatus.UNKNOWN_ROLE);
255-
256-
if (!keepAlive) {
257-
socket.end();
258-
}
261+
endRequest(id, 1, fcgi.records.EndRequest.protocolStatus.UNKNOWN_ROLE, function () {
262+
if (!keepAlive && !socket.destroyed) {
263+
socket.end();
264+
}
265+
});
259266

260267
return;
261268
}

lib/streams.js

Lines changed: 43 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,9 @@ function InputStream() {
4343
util.inherits(InputStream, stream.Readable);
4444

4545
InputStream.prototype._data = function (chunk) {
46+
if (this.closed || this.destroyed)
47+
return;
48+
4649
if (this._canPush) {
4750
this._canPush = this.push(chunk);
4851
} else {
@@ -55,6 +58,12 @@ InputStream.prototype._read = function (size) {
5558
while (this._buffer.length && (this._canPush = this.push(this._buffer.shift())));
5659
};
5760

61+
InputStream.prototype._destroy = function (err, callback) {
62+
this._buffer = [];
63+
this._canPush = false;
64+
callback(err);
65+
}
66+
5867
/**
5968
* function OutputStream(conn, type)
6069
* Writable stream interface for FastCGI output streams
@@ -68,66 +77,42 @@ function OutputStream(conn, id, recordType) {
6877
this._conn = conn;
6978
this._id = id;
7079

71-
this._open = true;
72-
73-
this.on('finish', function () {
74-
this._conn.stream.writeRecord(
75-
this._id,
76-
new this.recordType());
77-
});
80+
this._finalized = false;
7881
}
7982
util.inherits(OutputStream, stream.Writable);
8083

81-
OutputStream.prototype._close = function (hadError) {
82-
if (this._open) {
83-
this._open = false;
84-
this.emit('close', hadError ? true : false);
85-
}
86-
};
87-
8884
OutputStream.prototype._write = function (chunk, encoding, callback) {
89-
var chunks = [];
90-
91-
if (!Buffer.isBuffer(chunk)) {
92-
chunk = new Buffer(chunk, encoding);
93-
}
94-
95-
if (chunk.length <= 0) {
96-
callback.call(this);
97-
return;
98-
}
99-
100-
var start = 0, end = 65535;
101-
while (end < chunk.length) {
102-
this._conn.stream.writeRecord(
103-
this._id, new this.recordType(chunk.slice(start, end)));
104-
105-
start = end;
106-
end += 65535;
85+
var start = 0;
86+
var self = this;
87+
88+
function writeSubChunk(err) {
89+
if (err || start >= chunk.length) {
90+
callback(err);
91+
return;
92+
}
93+
94+
self._conn.stream.writeRecord(
95+
self._id,
96+
new self.recordType(chunk.subarray(start, Math.min(start += 65535, chunk.length))),
97+
writeSubChunk);
10798
}
10899

109-
this._conn.stream.writeRecord(
110-
this._id,
111-
new this.recordType(chunk.slice(start)),
112-
callback.bind(this));
100+
writeSubChunk();
113101
};
114102

115-
OutputStream.prototype.write = function () {
116-
if (!this._open) {
117-
this.emit('error', new Error("Output stream is not open"));
118-
return;
119-
}
120-
121-
return stream.Writable.prototype.write.apply(this, arguments);
103+
OutputStream.prototype._final = function (callback) {
104+
this._finalized = true;
105+
this._conn.stream.writeRecord(this._id, new this.recordType(), callback);
122106
}
123107

124-
OutputStream.prototype.end = function () {
125-
if (!this._open) {
126-
this.emit('error', new Error("Output stream is not open"));
127-
return;
108+
OutputStream.prototype._destroy = function (err, callback) {
109+
if (!this._finalized) {
110+
this._conn.stream.writeRecord(this._id, new this.recordType(), function () {
111+
callback(err);
112+
});
113+
} else {
114+
callback(err);
128115
}
129-
130-
return stream.Writable.prototype.end.apply(this, arguments);
131116
}
132117

133118
/**
@@ -138,28 +123,25 @@ OutputStream.prototype.end = function () {
138123
function IOStream(conn, id, recordType) {
139124
stream.Duplex.call(this);
140125

141-
this.recordType = recordType || fcgi.records.StdOut;
142-
143126
this._buffer = [];
144127
this._canPush = false;
145128

129+
this.recordType = recordType || fcgi.records.StdOut;
130+
146131
this._conn = conn;
147132
this._id = id;
148133

149-
this._open = true;
150-
151-
this.on('finish', function () {
152-
this._conn.stream.writeRecord(
153-
this._id,
154-
new this.recordType());
155-
});
134+
this._finalized = false;
156135
}
157136
util.inherits(IOStream, stream.Duplex);
158137

159138
IOStream.prototype._data = InputStream.prototype._data;
160139
IOStream.prototype._read = InputStream.prototype._read;
161140

162-
IOStream.prototype._close = OutputStream.prototype._close;
163141
IOStream.prototype._write = OutputStream.prototype._write;
164-
IOStream.prototype.write = OutputStream.prototype.write;
165-
IOStream.prototype.end = OutputStream.prototype.end;
142+
IOStream.prototype._final = OutputStream.prototype._final;
143+
144+
IOStream.prototype._destroy = function (err, callback) {
145+
InputStream.prototype._destroy.call(this, err,
146+
OutputStream.prototype._destroy.bind(this, err, callback));
147+
}

0 commit comments

Comments
 (0)