Skip to content

Commit dd0d364

Browse files
WebSocketFrameFactory: support for efficient bulk encoding of outbound binary frames (#3)
1 parent 9e7fdf7 commit dd0d364

File tree

7 files changed

+257
-10
lines changed

7 files changed

+257
-10
lines changed

netty-websocket-http1-perftest/build.gradle

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -36,23 +36,23 @@ dependencies {
3636

3737
task runServer(type: JavaExec) {
3838
classpath = sourceSets.main.runtimeClasspath
39-
mainClass = "com.jauntsdn.netty.handler.codec.http.websocketx.perftest.server.Main"
39+
mainClass = "com.jauntsdn.netty.handler.codec.http.websocketx.perftest.encoder.server.Main"
4040
}
4141

4242
task runClient(type: JavaExec) {
4343
classpath = sourceSets.main.runtimeClasspath
44-
mainClass = "com.jauntsdn.netty.handler.codec.http.websocketx.perftest.client.Main"
44+
mainClass = "com.jauntsdn.netty.handler.codec.http.websocketx.perftest.encoder.client.Main"
4545
}
4646

4747
task serverScripts(type: CreateStartScripts) {
48-
mainClass = "com.jauntsdn.netty.handler.codec.http.websocketx.perftest.server.Main"
48+
mainClass = "com.jauntsdn.netty.handler.codec.http.websocketx.perftest.encoder.server.Main"
4949
applicationName = "${project.name}-server"
5050
classpath = startScripts.classpath
5151
outputDir = startScripts.outputDir
5252
}
5353

5454
task clientScripts(type: CreateStartScripts) {
55-
mainClass = "com.jauntsdn.netty.handler.codec.http.websocketx.perftest.client.Main"
55+
mainClass = "com.jauntsdn.netty.handler.codec.http.websocketx.perftest.encoder.client.Main"
5656
applicationName = "${project.name}-client"
5757
classpath = startScripts.classpath
5858
outputDir = startScripts.outputDir
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
* limitations under the License.
1515
*/
1616

17-
package com.jauntsdn.netty.handler.codec.http.websocketx.perftest.client;
17+
package com.jauntsdn.netty.handler.codec.http.websocketx.perftest.encoder.client;
1818

1919
import com.jauntsdn.netty.handler.codec.http.websocketx.WebSocketCallbacksHandler;
2020
import com.jauntsdn.netty.handler.codec.http.websocketx.WebSocketClientProtocolHandler;
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
* limitations under the License.
1515
*/
1616

17-
package com.jauntsdn.netty.handler.codec.http.websocketx.perftest.server;
17+
package com.jauntsdn.netty.handler.codec.http.websocketx.perftest.encoder.server;
1818

1919
import com.jauntsdn.netty.handler.codec.http.websocketx.WebSocketCallbacksHandler;
2020
import com.jauntsdn.netty.handler.codec.http.websocketx.WebSocketFrameFactory;

netty-websocket-http1-test/src/test/java/com/jauntsdn/netty/handler/codec/http/websocketx/WebSocketCodecTest.java

Lines changed: 175 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,25 @@ void binaryFramesEncoder(boolean mask) throws Exception {
9898
client.close();
9999
}
100100

101+
@Timeout(300)
102+
@ValueSource(booleans = {true, false})
103+
@ParameterizedTest
104+
void binaryFramesBulkEncoder(boolean mask) throws Exception {
105+
int maxFrameSize = 1000;
106+
Channel s = server = nettyServer(new BinaryFramesTestServerHandler(), mask, false);
107+
BinaryFramesEncoderClientBulkHandler clientHandler =
108+
new BinaryFramesEncoderClientBulkHandler(maxFrameSize);
109+
Channel client =
110+
webSocketCallbacksClient(s.localAddress(), mask, true, maxFrameSize, clientHandler);
111+
112+
WebSocketFrameFactory.BulkEncoder encoder = clientHandler.onHandshakeCompleted().join();
113+
Assertions.assertThat(encoder).isNotNull();
114+
115+
CompletableFuture<Void> onComplete = clientHandler.startFramesExchange();
116+
onComplete.join();
117+
client.close();
118+
}
119+
101120
@Timeout(300)
102121
@MethodSource("maskingArgs")
103122
@ParameterizedTest
@@ -444,6 +463,162 @@ protected void initChannel(SocketChannel ch) {
444463
}
445464
}
446465

466+
static class BinaryFramesEncoderClientBulkHandler
467+
implements WebSocketCallbacksHandler, WebSocketFrameListener {
468+
private final CompletableFuture<WebSocketFrameFactory.BulkEncoder> onHandshakeComplete =
469+
new CompletableFuture<>();
470+
private final CompletableFuture<Void> onFrameExchangeComplete = new CompletableFuture<>();
471+
private WebSocketFrameFactory.BulkEncoder binaryFrameEncoder;
472+
private final int framesCount;
473+
private int receivedFrames;
474+
private int sentFrames;
475+
private ByteBuf outBuffer;
476+
private volatile ChannelHandlerContext ctx;
477+
478+
BinaryFramesEncoderClientBulkHandler(int maxFrameSize) {
479+
this.framesCount = maxFrameSize;
480+
}
481+
482+
@Override
483+
public WebSocketFrameListener exchange(
484+
ChannelHandlerContext ctx, WebSocketFrameFactory webSocketFrameFactory) {
485+
this.binaryFrameEncoder = webSocketFrameFactory.bulkEncoder();
486+
return this;
487+
}
488+
489+
@Override
490+
public void onChannelRead(
491+
ChannelHandlerContext ctx, boolean finalFragment, int rsv, int opcode, ByteBuf payload) {
492+
if (!finalFragment) {
493+
onFrameExchangeComplete.completeExceptionally(
494+
new AssertionError("received non-final frame: " + finalFragment));
495+
payload.release();
496+
return;
497+
}
498+
if (rsv != 0) {
499+
onFrameExchangeComplete.completeExceptionally(
500+
new AssertionError("received frame with non-zero rsv: " + rsv));
501+
payload.release();
502+
return;
503+
}
504+
if (opcode != WebSocketProtocol.OPCODE_BINARY) {
505+
onFrameExchangeComplete.completeExceptionally(
506+
new AssertionError("received non-binary frame: " + Long.toHexString(opcode)));
507+
payload.release();
508+
return;
509+
}
510+
511+
int readableBytes = payload.readableBytes();
512+
513+
int expectedSize = receivedFrames;
514+
if (expectedSize != readableBytes) {
515+
onFrameExchangeComplete.completeExceptionally(
516+
new AssertionError(
517+
"received frame of unexpected size: "
518+
+ expectedSize
519+
+ ", actual: "
520+
+ readableBytes));
521+
payload.release();
522+
return;
523+
}
524+
525+
for (int i = 0; i < readableBytes; i++) {
526+
byte b = payload.readByte();
527+
if (b != (byte) 0xFE) {
528+
onFrameExchangeComplete.completeExceptionally(
529+
new AssertionError("received frame with unexpected content: " + Long.toHexString(b)));
530+
payload.release();
531+
return;
532+
}
533+
}
534+
payload.release();
535+
if (++receivedFrames == framesCount) {
536+
onFrameExchangeComplete.complete(null);
537+
}
538+
}
539+
540+
@Override
541+
public void onChannelWritabilityChanged(ChannelHandlerContext ctx) {
542+
boolean writable = ctx.channel().isWritable();
543+
if (sentFrames > 0 && writable) {
544+
int toSend = framesCount - sentFrames;
545+
if (toSend > 0) {
546+
sendFrames(ctx, toSend);
547+
}
548+
}
549+
}
550+
551+
@Override
552+
public void onOpen(ChannelHandlerContext ctx) {
553+
this.ctx = ctx;
554+
int bufferSize = 4 * framesCount;
555+
this.outBuffer = ctx.alloc().buffer(bufferSize, bufferSize);
556+
onHandshakeComplete.complete(binaryFrameEncoder);
557+
}
558+
559+
@Override
560+
public void onClose(ChannelHandlerContext ctx) {
561+
ByteBuf out = outBuffer;
562+
if (out != null) {
563+
outBuffer = null;
564+
out.release();
565+
}
566+
if (!onFrameExchangeComplete.isDone()) {
567+
onFrameExchangeComplete.completeExceptionally(new ClosedChannelException());
568+
}
569+
}
570+
571+
@Override
572+
public void onExceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
573+
if (!onFrameExchangeComplete.isDone()) {
574+
onFrameExchangeComplete.completeExceptionally(cause);
575+
}
576+
}
577+
578+
CompletableFuture<WebSocketFrameFactory.BulkEncoder> onHandshakeCompleted() {
579+
return onHandshakeComplete;
580+
}
581+
582+
CompletableFuture<Void> startFramesExchange() {
583+
ChannelHandlerContext c = ctx;
584+
c.executor().execute(() -> sendFrames(c, framesCount - sentFrames));
585+
return onFrameExchangeComplete;
586+
}
587+
588+
private void sendFrames(ChannelHandlerContext c, int toSend) {
589+
WebSocketFrameFactory.BulkEncoder frameEncoder = binaryFrameEncoder;
590+
for (int frameIdx = 0; frameIdx < toSend; frameIdx++) {
591+
if (!c.channel().isOpen()) {
592+
return;
593+
}
594+
int payloadSize = sentFrames;
595+
int frameSize = frameEncoder.sizeofBinaryFrame(payloadSize);
596+
ByteBuf out = outBuffer;
597+
if (frameSize > out.capacity() - out.writerIndex()) {
598+
int readableBytes = out.readableBytes();
599+
int bufferSize = 4 * framesCount;
600+
outBuffer = c.alloc().buffer(bufferSize, bufferSize);
601+
if (c.channel().bytesBeforeUnwritable() < readableBytes) {
602+
c.writeAndFlush(out, c.voidPromise());
603+
if (!c.channel().isWritable()) {
604+
return;
605+
}
606+
} else {
607+
c.write(out, c.voidPromise());
608+
}
609+
out = outBuffer;
610+
}
611+
int mask = frameEncoder.encodeBinaryFramePrefix(out, payloadSize);
612+
for (int payloadIdx = 0; payloadIdx < payloadSize; payloadIdx++) {
613+
out.writeByte(0xFE);
614+
}
615+
frameEncoder.maskBinaryFrame(out, mask, payloadSize);
616+
sentFrames++;
617+
}
618+
c.flush();
619+
}
620+
}
621+
447622
static class BinaryFramesEncoderClientHandler
448623
implements WebSocketCallbacksHandler, WebSocketFrameListener {
449624
private final CompletableFuture<WebSocketFrameFactory.Encoder> onHandshakeComplete =

netty-websocket-http1/src/main/java/com/jauntsdn/netty/handler/codec/http/websocketx/MaskingWebSocketEncoder.java

Lines changed: 33 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,10 @@ public WebSocketFrameFactory frameFactory(ChannelHandlerContext ctx) {
4848
return FrameFactory.INSTANCE;
4949
}
5050

51-
static class FrameFactory implements WebSocketFrameFactory, WebSocketFrameFactory.Encoder {
51+
static class FrameFactory
52+
implements WebSocketFrameFactory,
53+
WebSocketFrameFactory.Encoder,
54+
WebSocketFrameFactory.BulkEncoder {
5255
static final int PREFIX_SIZE_SMALL = 6;
5356
static final int BINARY_FRAME_SMALL =
5457
OPCODE_BINARY << 8 | /*FIN*/ (byte) 1 << 15 | /*MASK*/ (byte) 1 << 7;
@@ -145,6 +148,11 @@ public Encoder encoder() {
145148
return this;
146149
}
147150

151+
@Override
152+
public BulkEncoder bulkEncoder() {
153+
return this;
154+
}
155+
148156
@Override
149157
public ByteBuf encodeBinaryFrame(ByteBuf binaryFrame) {
150158
int frameSize = binaryFrame.readableBytes();
@@ -168,6 +176,30 @@ public ByteBuf encodeBinaryFrame(ByteBuf binaryFrame) {
168176
throw new IllegalArgumentException(payloadSizeLimit(payloadSize, 65_535));
169177
}
170178

179+
@Override
180+
public int encodeBinaryFramePrefix(ByteBuf byteBuf, int payloadSize) {
181+
if (payloadSize <= 125) {
182+
byteBuf.writeShort(BINARY_FRAME_SMALL | payloadSize);
183+
int mask = mask();
184+
byteBuf.writeInt(mask);
185+
return mask;
186+
}
187+
188+
if (payloadSize <= 65_535) {
189+
int mask = mask();
190+
byteBuf.writeLong(((BINARY_FRAME_MEDIUM | (long) payloadSize) << 32) | mask);
191+
return mask;
192+
}
193+
throw new IllegalArgumentException(payloadSizeLimit(payloadSize, 65_535));
194+
}
195+
196+
@Override
197+
public ByteBuf maskBinaryFrame(ByteBuf byteBuf, int mask, int payloadSize) {
198+
int end = byteBuf.writerIndex();
199+
int start = end - payloadSize;
200+
return mask(mask, byteBuf, start, end);
201+
}
202+
171203
@Override
172204
public int sizeofBinaryFrame(int payloadSize) {
173205
if (payloadSize <= 125) {

netty-websocket-http1/src/main/java/com/jauntsdn/netty/handler/codec/http/websocketx/NonMaskingWebSocketEncoder.java

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,10 @@ public WebSocketFrameFactory frameFactory(ChannelHandlerContext ctx) {
4747
return FrameFactory.INSTANCE;
4848
}
4949

50-
static class FrameFactory implements WebSocketFrameFactory, WebSocketFrameFactory.Encoder {
50+
static class FrameFactory
51+
implements WebSocketFrameFactory,
52+
WebSocketFrameFactory.Encoder,
53+
WebSocketFrameFactory.BulkEncoder {
5154
static final int PREFIX_SIZE_SMALL = 2;
5255
static final int BINARY_FRAME_SMALL = OPCODE_BINARY << 8 | /*FIN*/ (byte) 1 << 15;
5356

@@ -126,6 +129,11 @@ public Encoder encoder() {
126129
return this;
127130
}
128131

132+
@Override
133+
public BulkEncoder bulkEncoder() {
134+
return this;
135+
}
136+
129137
@Override
130138
public ByteBuf encodeBinaryFrame(ByteBuf binaryFrame) {
131139
int frameSize = binaryFrame.readableBytes();
@@ -144,6 +152,23 @@ public ByteBuf encodeBinaryFrame(ByteBuf binaryFrame) {
144152
throw new IllegalArgumentException(payloadSizeLimit(payloadSize, 65_535));
145153
}
146154

155+
@Override
156+
public int encodeBinaryFramePrefix(ByteBuf byteBuf, int payloadSize) {
157+
if (payloadSize <= 125) {
158+
byteBuf.writeShort(BINARY_FRAME_SMALL | payloadSize);
159+
} else if (payloadSize <= 65_535) {
160+
byteBuf.writeInt(BINARY_FRAME_MEDIUM | payloadSize);
161+
} else {
162+
throw new IllegalArgumentException(payloadSizeLimit(payloadSize, 65_535));
163+
}
164+
return -1;
165+
}
166+
167+
@Override
168+
public ByteBuf maskBinaryFrame(ByteBuf byteBuf, int mask, int payloadSize) {
169+
return byteBuf;
170+
}
171+
147172
@Override
148173
public int sizeofBinaryFrame(int payloadSize) {
149174
if (payloadSize <= 125) {

netty-websocket-http1/src/main/java/com/jauntsdn/netty/handler/codec/http/websocketx/WebSocketFrameFactory.java

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,11 +37,26 @@ public interface WebSocketFrameFactory {
3737

3838
Encoder encoder();
3939

40+
default BulkEncoder bulkEncoder() {
41+
throw new UnsupportedOperationException("WebSocketFrameFactory.bulkEncoder() not implemented");
42+
}
43+
44+
/** Encodes prefix of single binary websocket frame into provided bytebuffer. */
4045
interface Encoder {
41-
/*write prefix/mask, apply mask if needed*/
46+
4247
ByteBuf encodeBinaryFrame(ByteBuf binaryFrame);
4348

44-
/*size with prefix/mask*/
49+
int sizeofBinaryFrame(int payloadSize);
50+
}
51+
52+
/** Encodes prefixes of multiple binary websocket frames into provided bytebuffer. */
53+
interface BulkEncoder {
54+
55+
/** @return frame mask, or -1 if masking not applicable */
56+
int encodeBinaryFramePrefix(ByteBuf byteBuf, int payloadSize);
57+
58+
ByteBuf maskBinaryFrame(ByteBuf byteBuf, int mask, int payloadSize);
59+
4560
int sizeofBinaryFrame(int payloadSize);
4661
}
4762
}

0 commit comments

Comments
 (0)