Skip to content

Commit d8a861a

Browse files
committed
Chunk Data/Text frames by outboundMaxFrameSize.
This change fixes a bug when sending text/data that was large WebsocketKit did not chunk these text/data into mutliple frames.
1 parent bc3c30d commit d8a861a

File tree

5 files changed

+149
-12
lines changed

5 files changed

+149
-12
lines changed

Sources/WebSocketKit/WebSocket.swift

Lines changed: 63 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,13 @@ public final class WebSocket {
3434
private var waitingForPong: Bool
3535
private var waitingForClose: Bool
3636
private var scheduledTimeoutTask: Scheduled<Void>?
37-
38-
init(channel: Channel, type: PeerType) {
37+
private var outboundMaxFrameSize: WebSocketMaxFrameSize
38+
39+
init(
40+
channel: Channel,
41+
type: PeerType,
42+
outboundMaxFrameSize: WebSocketMaxFrameSize = .default
43+
) {
3944
self.channel = channel
4045
self.type = type
4146
self.onTextCallback = { _, _ in }
@@ -45,6 +50,7 @@ public final class WebSocket {
4550
self.waitingForPong = false
4651
self.waitingForClose = false
4752
self.scheduledTimeoutTask = nil
53+
self.outboundMaxFrameSize = outboundMaxFrameSize
4854
}
4955

5056
public func onText(_ callback: @escaping (WebSocket, String) -> ()) {
@@ -88,12 +94,65 @@ public final class WebSocket {
8894
let string = String(text)
8995
var buffer = channel.allocator.buffer(capacity: text.count)
9096
buffer.writeString(string)
91-
self.send(raw: buffer.readableBytesView, opcode: .text, fin: true, promise: promise)
9297

98+
self.send(buffer: buffer, opcode: .text, promise: promise)
9399
}
94100

95101
public func send(_ binary: [UInt8], promise: EventLoopPromise<Void>? = nil) {
96-
self.send(raw: binary, opcode: .binary, fin: true, promise: promise)
102+
var buffer = channel.allocator.buffer(capacity: binary.count)
103+
buffer.writeBytes(binary)
104+
self.send(buffer: buffer, opcode: .binary, promise: promise)
105+
}
106+
107+
public func send(
108+
buffer: NIO.ByteBuffer,
109+
opcode: WebSocketOpcode,
110+
promise: EventLoopPromise<Void>? = nil
111+
) {
112+
guard buffer.readableBytes > outboundMaxFrameSize.value else {
113+
let frame = WebSocketFrame(
114+
fin: true,
115+
opcode: opcode,
116+
maskKey: self.makeMaskKey(),
117+
data: buffer
118+
)
119+
self.channel.writeAndFlush(frame, promise: promise)
120+
return
121+
}
122+
123+
var buffer = buffer
124+
125+
var framesToSend: [WebSocketFrame] = []
126+
127+
while let frameBuffer = buffer.readSlice(length: outboundMaxFrameSize.value) {
128+
let frame = WebSocketFrame(
129+
fin: buffer.readableBytes == 0,
130+
opcode: opcode,
131+
maskKey: self.makeMaskKey(),
132+
data: frameBuffer
133+
)
134+
framesToSend.append(frame)
135+
}
136+
137+
if buffer.readableBytes > 0 {
138+
let frame = WebSocketFrame(
139+
fin: true,
140+
opcode: opcode,
141+
maskKey: self.makeMaskKey(),
142+
data: buffer
143+
)
144+
framesToSend.append(frame)
145+
}
146+
147+
let startingOut: EventLoopFuture<Void> = self.channel.eventLoop.makeSucceededFuture(Void())
148+
149+
let future: EventLoopFuture<Void> = framesToSend.reduce(startingOut) { future, frame in
150+
return future.flatMap { _ in
151+
self.channel.writeAndFlush(frame)
152+
}
153+
}
154+
155+
promise?.completeWith(future)
97156
}
98157

99158
public func sendPing(promise: EventLoopPromise<Void>? = nil) {

Sources/WebSocketKit/WebSocketClient.swift

Lines changed: 21 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -22,14 +22,26 @@ public final class WebSocketClient {
2222

2323
public struct Configuration {
2424
public var tlsConfiguration: TLSConfiguration?
25-
public var maxFrameSize: Int
25+
public var inboundMaxFrameSize: WebSocketMaxFrameSize
26+
public var outboundMaxFrameSize: WebSocketMaxFrameSize
2627

2728
public init(
2829
tlsConfiguration: TLSConfiguration? = nil,
29-
maxFrameSize: Int = 1 << 14
30+
maxFrameSize: WebSocketMaxFrameSize = WebSocketMaxFrameSize.default
3031
) {
3132
self.tlsConfiguration = tlsConfiguration
32-
self.maxFrameSize = maxFrameSize
33+
self.inboundMaxFrameSize = maxFrameSize
34+
self.outboundMaxFrameSize = maxFrameSize
35+
}
36+
37+
public init(
38+
tlsConfiguration: TLSConfiguration? = nil,
39+
inboundMaxFrameSize: WebSocketMaxFrameSize = WebSocketMaxFrameSize.default,
40+
outboundMaxFrameSize: WebSocketMaxFrameSize = WebSocketMaxFrameSize.default
41+
) {
42+
self.tlsConfiguration = tlsConfiguration
43+
self.inboundMaxFrameSize = inboundMaxFrameSize
44+
self.outboundMaxFrameSize = outboundMaxFrameSize
3345
}
3446
}
3547

@@ -75,10 +87,14 @@ public final class WebSocketClient {
7587
}
7688
let websocketUpgrader = NIOWebSocketClientUpgrader(
7789
requestKey: Data(key).base64EncodedString(),
78-
maxFrameSize: self.configuration.maxFrameSize,
90+
maxFrameSize: self.configuration.inboundMaxFrameSize.value,
7991
automaticErrorHandling: true,
8092
upgradePipelineHandler: { channel, req in
81-
return WebSocket.client(on: channel, onUpgrade: onUpgrade)
93+
return WebSocket.client(
94+
on: channel,
95+
outboundMaxFrameSize: self.configuration.outboundMaxFrameSize,
96+
onUpgrade: onUpgrade
97+
)
8298
}
8399
)
84100

Sources/WebSocketKit/WebSocketHandler.swift

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,24 +4,42 @@ import NIOWebSocket
44
extension WebSocket {
55
public static func client(
66
on channel: Channel,
7+
outboundMaxFrameSize: WebSocketMaxFrameSize = .default,
78
onUpgrade: @escaping (WebSocket) -> ()
89
) -> EventLoopFuture<Void> {
9-
return self.handle(on: channel, as: .client, onUpgrade: onUpgrade)
10+
return self.handle(
11+
on: channel,
12+
as: .client,
13+
outboundMaxFrameSize: outboundMaxFrameSize,
14+
onUpgrade: onUpgrade
15+
)
1016
}
1117

1218
public static func server(
1319
on channel: Channel,
20+
outboundMaxFrameSize: WebSocketMaxFrameSize = .default,
1421
onUpgrade: @escaping (WebSocket) -> ()
1522
) -> EventLoopFuture<Void> {
16-
return self.handle(on: channel, as: .server, onUpgrade: onUpgrade)
23+
return self.handle(
24+
on: channel,
25+
as: .server,
26+
outboundMaxFrameSize: outboundMaxFrameSize,
27+
onUpgrade: onUpgrade
28+
)
1729
}
1830

1931
private static func handle(
2032
on channel: Channel,
2133
as type: PeerType,
34+
outboundMaxFrameSize: WebSocketMaxFrameSize = .default,
2235
onUpgrade: @escaping (WebSocket) -> ()
2336
) -> EventLoopFuture<Void> {
24-
let webSocket = WebSocket(channel: channel, type: type)
37+
let webSocket = WebSocket(
38+
channel: channel,
39+
type: type,
40+
outboundMaxFrameSize: outboundMaxFrameSize
41+
)
42+
2543
return channel.pipeline.addHandler(WebSocketHandler(webSocket: webSocket)).map { _ in
2644
onUpgrade(webSocket)
2745
}
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
public struct WebSocketMaxFrameSize: ExpressibleByIntegerLiteral {
2+
public let value: Int
3+
4+
public init(_ value: Int) {
5+
precondition(value <= UInt32.max, "invalid overlarge max frame size")
6+
self.value = value
7+
}
8+
9+
public init(integerLiteral value: Int) {
10+
precondition(value <= UInt32.max, "invalid overlarge max frame size")
11+
self.value = value
12+
}
13+
14+
public static var `default`: Self {
15+
self.init(integerLiteral: 1 << 14)
16+
}
17+
}

Tests/WebSocketKitTests/WebSocketKitTests.swift

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,33 @@ final class WebSocketKitTests: XCTestCase {
3434
func testBadHost() throws {
3535
XCTAssertThrowsError(try WebSocket.connect(host: "asdf", on: elg) { _ in }.wait())
3636
}
37+
38+
39+
func testLargeTextFrame() throws {
40+
let port = Int.random(in: 8000..<9000)
41+
42+
let sendPromise = self.elg.next().makePromise(of: Void.self)
43+
let serverClose = self.elg.next().makePromise(of: Void.self)
44+
let clientClose = self.elg.next().makePromise(of: Void.self)
45+
let server = try ServerBootstrap.webSocket(on: self.elg) { req, ws in
46+
ws.onText { ws, text in
47+
if text == "close" {
48+
ws.close(promise: serverClose)
49+
}
50+
}
51+
}.bind(host: "localhost", port: port).wait()
52+
let config = WebSocketClient.Configuration(tlsConfiguration: nil, maxFrameSize: 2)
53+
WebSocket.connect(to: "ws://localhost:\(port)", configuration: config, on: self.elg) { ws in
54+
ws.send("close", promise: sendPromise)
55+
ws.onClose.cascade(to: clientClose)
56+
}.cascadeFailure(to: sendPromise)
57+
58+
XCTAssertNoThrow(try sendPromise.futureResult.wait())
59+
XCTAssertNoThrow(try serverClose.futureResult.wait())
60+
XCTAssertNoThrow(try clientClose.futureResult.wait())
61+
try server.close(mode: .all).wait()
62+
}
63+
3764

3865
func testServerClose() throws {
3966
let port = Int.random(in: 8000..<9000)

0 commit comments

Comments
 (0)