Skip to content

Commit a415f12

Browse files
committed
Chunk Data/Text frames by outboundMaxFrameSize.
This change fixes a bug when sending text/data that was large WebsocketKit did not chunk these text/data into mutliple frames.
1 parent bc3c30d commit a415f12

File tree

5 files changed

+151
-12
lines changed

5 files changed

+151
-12
lines changed

Sources/WebSocketKit/WebSocket.swift

Lines changed: 65 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,13 @@ public final class WebSocket {
3434
private var waitingForPong: Bool
3535
private var waitingForClose: Bool
3636
private var scheduledTimeoutTask: Scheduled<Void>?
37-
38-
init(channel: Channel, type: PeerType) {
37+
private var outboundMaxFrameSize: WebSocketMaxFrameSize
38+
39+
init(
40+
channel: Channel,
41+
type: PeerType,
42+
outboundMaxFrameSize: WebSocketMaxFrameSize = .default
43+
) {
3944
self.channel = channel
4045
self.type = type
4146
self.onTextCallback = { _, _ in }
@@ -45,6 +50,7 @@ public final class WebSocket {
4550
self.waitingForPong = false
4651
self.waitingForClose = false
4752
self.scheduledTimeoutTask = nil
53+
self.outboundMaxFrameSize = outboundMaxFrameSize
4854
}
4955

5056
public func onText(_ callback: @escaping (WebSocket, String) -> ()) {
@@ -88,12 +94,66 @@ public final class WebSocket {
8894
let string = String(text)
8995
var buffer = channel.allocator.buffer(capacity: text.count)
9096
buffer.writeString(string)
91-
self.send(raw: buffer.readableBytesView, opcode: .text, fin: true, promise: promise)
9297

98+
self.send(buffer: buffer, opcode: .text, promise: promise)
9399
}
94100

95101
public func send(_ binary: [UInt8], promise: EventLoopPromise<Void>? = nil) {
96-
self.send(raw: binary, opcode: .binary, fin: true, promise: promise)
102+
var buffer = channel.allocator.buffer(capacity: binary.count)
103+
buffer.writeBytes(binary)
104+
self.send(buffer: buffer, opcode: .binary, promise: promise)
105+
}
106+
107+
public func send(
108+
buffer: NIO.ByteBuffer,
109+
opcode: WebSocketOpcode,
110+
promise: EventLoopPromise<Void>? = nil
111+
) {
112+
113+
guard buffer.readableBytes > self.outboundMaxFrameSize.value else {
114+
let frame = WebSocketFrame(
115+
fin: true,
116+
opcode: opcode,
117+
maskKey: self.makeMaskKey(),
118+
data: buffer
119+
)
120+
self.channel.writeAndFlush(frame, promise: promise)
121+
return
122+
}
123+
124+
var buffer = buffer
125+
126+
// We need to ensure we write all of these items in order on the event loop without other writes interrupting the frames.
127+
self.channel.eventLoop.execute {
128+
while let frameBuffer = buffer.readSlice(length: self.outboundMaxFrameSize.value) {
129+
130+
let isFinalFrame = buffer.readableBytes == 0
131+
132+
let frame = WebSocketFrame(
133+
fin: isFinalFrame,
134+
opcode: opcode,
135+
maskKey: self.makeMaskKey(),
136+
data: frameBuffer
137+
)
138+
139+
if isFinalFrame {
140+
self.channel.writeAndFlush(frame, promise: promise)
141+
return
142+
} else {
143+
// write operations that happen when already on the event loop go directly through without any `delay`.
144+
self.channel.write(frame, promise: nil)
145+
}
146+
}
147+
// we will end up here if the number bytes is not a multiple of the `outboundMaxFrameSize`
148+
let frame = WebSocketFrame(
149+
fin: true,
150+
opcode: opcode,
151+
maskKey: self.makeMaskKey(),
152+
data: buffer
153+
)
154+
155+
self.channel.writeAndFlush(frame, promise: promise)
156+
}
97157
}
98158

99159
public func sendPing(promise: EventLoopPromise<Void>? = nil) {
@@ -115,6 +175,7 @@ public final class WebSocket {
115175
{
116176
var buffer = channel.allocator.buffer(capacity: data.count)
117177
buffer.writeBytes(data)
178+
118179
let frame = WebSocketFrame(
119180
fin: fin,
120181
opcode: opcode,

Sources/WebSocketKit/WebSocketClient.swift

Lines changed: 21 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -22,14 +22,26 @@ public final class WebSocketClient {
2222

2323
public struct Configuration {
2424
public var tlsConfiguration: TLSConfiguration?
25-
public var maxFrameSize: Int
25+
public var inboundMaxFrameSize: WebSocketMaxFrameSize
26+
public var outboundMaxFrameSize: WebSocketMaxFrameSize
2627

2728
public init(
2829
tlsConfiguration: TLSConfiguration? = nil,
29-
maxFrameSize: Int = 1 << 14
30+
maxFrameSize: WebSocketMaxFrameSize = WebSocketMaxFrameSize.default
3031
) {
3132
self.tlsConfiguration = tlsConfiguration
32-
self.maxFrameSize = maxFrameSize
33+
self.inboundMaxFrameSize = maxFrameSize
34+
self.outboundMaxFrameSize = maxFrameSize
35+
}
36+
37+
public init(
38+
tlsConfiguration: TLSConfiguration? = nil,
39+
inboundMaxFrameSize: WebSocketMaxFrameSize = WebSocketMaxFrameSize.default,
40+
outboundMaxFrameSize: WebSocketMaxFrameSize = WebSocketMaxFrameSize.default
41+
) {
42+
self.tlsConfiguration = tlsConfiguration
43+
self.inboundMaxFrameSize = inboundMaxFrameSize
44+
self.outboundMaxFrameSize = outboundMaxFrameSize
3345
}
3446
}
3547

@@ -75,10 +87,14 @@ public final class WebSocketClient {
7587
}
7688
let websocketUpgrader = NIOWebSocketClientUpgrader(
7789
requestKey: Data(key).base64EncodedString(),
78-
maxFrameSize: self.configuration.maxFrameSize,
90+
maxFrameSize: self.configuration.inboundMaxFrameSize.value,
7991
automaticErrorHandling: true,
8092
upgradePipelineHandler: { channel, req in
81-
return WebSocket.client(on: channel, onUpgrade: onUpgrade)
93+
return WebSocket.client(
94+
on: channel,
95+
outboundMaxFrameSize: self.configuration.outboundMaxFrameSize,
96+
onUpgrade: onUpgrade
97+
)
8298
}
8399
)
84100

Sources/WebSocketKit/WebSocketHandler.swift

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,24 +4,42 @@ import NIOWebSocket
44
extension WebSocket {
55
public static func client(
66
on channel: Channel,
7+
outboundMaxFrameSize: WebSocketMaxFrameSize = .default,
78
onUpgrade: @escaping (WebSocket) -> ()
89
) -> EventLoopFuture<Void> {
9-
return self.handle(on: channel, as: .client, onUpgrade: onUpgrade)
10+
return self.handle(
11+
on: channel,
12+
as: .client,
13+
outboundMaxFrameSize: outboundMaxFrameSize,
14+
onUpgrade: onUpgrade
15+
)
1016
}
1117

1218
public static func server(
1319
on channel: Channel,
20+
outboundMaxFrameSize: WebSocketMaxFrameSize = .default,
1421
onUpgrade: @escaping (WebSocket) -> ()
1522
) -> EventLoopFuture<Void> {
16-
return self.handle(on: channel, as: .server, onUpgrade: onUpgrade)
23+
return self.handle(
24+
on: channel,
25+
as: .server,
26+
outboundMaxFrameSize: outboundMaxFrameSize,
27+
onUpgrade: onUpgrade
28+
)
1729
}
1830

1931
private static func handle(
2032
on channel: Channel,
2133
as type: PeerType,
34+
outboundMaxFrameSize: WebSocketMaxFrameSize = .default,
2235
onUpgrade: @escaping (WebSocket) -> ()
2336
) -> EventLoopFuture<Void> {
24-
let webSocket = WebSocket(channel: channel, type: type)
37+
let webSocket = WebSocket(
38+
channel: channel,
39+
type: type,
40+
outboundMaxFrameSize: outboundMaxFrameSize
41+
)
42+
2543
return channel.pipeline.addHandler(WebSocketHandler(webSocket: webSocket)).map { _ in
2644
onUpgrade(webSocket)
2745
}
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
public struct WebSocketMaxFrameSize: ExpressibleByIntegerLiteral {
2+
public let value: Int
3+
4+
public init(_ value: Int) {
5+
precondition(value <= UInt32.max, "invalid overlarge max frame size")
6+
self.value = value
7+
}
8+
9+
public init(integerLiteral value: Int) {
10+
precondition(value <= UInt32.max, "invalid overlarge max frame size")
11+
self.value = value
12+
}
13+
14+
public static var `default`: Self {
15+
self.init(integerLiteral: 1 << 14)
16+
}
17+
}

Tests/WebSocketKitTests/WebSocketKitTests.swift

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,33 @@ final class WebSocketKitTests: XCTestCase {
3434
func testBadHost() throws {
3535
XCTAssertThrowsError(try WebSocket.connect(host: "asdf", on: elg) { _ in }.wait())
3636
}
37+
38+
39+
func testLargeTextFrame() throws {
40+
let port = Int.random(in: 8000..<9000)
41+
42+
let sendPromise = self.elg.next().makePromise(of: Void.self)
43+
let serverClose = self.elg.next().makePromise(of: Void.self)
44+
let clientClose = self.elg.next().makePromise(of: Void.self)
45+
let server = try ServerBootstrap.webSocket(on: self.elg) { req, ws in
46+
ws.onText { ws, text in
47+
if text == "close" {
48+
ws.close(promise: serverClose)
49+
}
50+
}
51+
}.bind(host: "localhost", port: port).wait()
52+
let config = WebSocketClient.Configuration(tlsConfiguration: nil, maxFrameSize: 2)
53+
WebSocket.connect(to: "ws://localhost:\(port)", configuration: config, on: self.elg) { ws in
54+
ws.send("close", promise: sendPromise)
55+
ws.onClose.cascade(to: clientClose)
56+
}.cascadeFailure(to: sendPromise)
57+
58+
XCTAssertNoThrow(try sendPromise.futureResult.wait())
59+
XCTAssertNoThrow(try serverClose.futureResult.wait())
60+
XCTAssertNoThrow(try clientClose.futureResult.wait())
61+
try server.close(mode: .all).wait()
62+
}
63+
3764

3865
func testServerClose() throws {
3966
let port = Int.random(in: 8000..<9000)

0 commit comments

Comments
 (0)