Skip to content

Commit b477ddb

Browse files
committed
Start using readonly memory
1 parent d74d834 commit b477ddb

15 files changed

+120
-70
lines changed

Source/MQTTnet.Tests/Server/Load_Tests.cs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
#if DEBUG
22

3+
using System.Globalization;
34
using MQTTnet.Internal;
45
using MQTTnet.Packets;
56
using MQTTnet.Protocol;
@@ -52,7 +53,7 @@ await client.SendAsync(
5253

5354
for (var j = 0; j < 1000; j++)
5455
{
55-
publishPacket.Topic = j.ToString();
56+
publishPacket.Topic = j.ToString(CultureInfo.InvariantCulture);
5657

5758
await client.SendAsync(publishPacket, CancellationToken.None);
5859
}
@@ -139,7 +140,7 @@ public async Task Handle_100_000_Messages_In_Server()
139140

140141
for (var j = 0; j < 1000; j++)
141142
{
142-
var message = applicationMessageBuilder.WithTopic(j.ToString()).Build();
143+
var message = applicationMessageBuilder.WithTopic(j.ToString(CultureInfo.InvariantCulture)).Build();
143144

144145
await client.PublishAsync(message);
145146
}

Source/MQTTnet/Connecting/MqttClientConnectResultFactory.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ static MqttClientConnectResult CreateForMqtt500(MqttConnAckPacket connAckPacket)
6363
RetainAvailable = connAckPacket.RetainAvailable,
6464
AssignedClientIdentifier = connAckPacket.AssignedClientIdentifier,
6565
AuthenticationMethod = connAckPacket.AuthenticationMethod,
66-
AuthenticationData = connAckPacket.AuthenticationData,
66+
AuthenticationData = connAckPacket.AuthenticationData.ToArray(),
6767
MaximumPacketSize = connAckPacket.MaximumPacketSize,
6868
ReasonString = connAckPacket.ReasonString,
6969
ReceiveMaximum = connAckPacket.ReceiveMaximum,

Source/MQTTnet/Formatter/MqttApplicationMessageFactory.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ public static MqttApplicationMessage Create(MqttPublishPacket publishPacket)
2121
Dup = publishPacket.Dup,
2222
ResponseTopic = publishPacket.ResponseTopic,
2323
ContentType = publishPacket.ContentType,
24-
CorrelationData = publishPacket.CorrelationData,
24+
CorrelationData = publishPacket.CorrelationData.ToArray(),
2525
MessageExpiryInterval = publishPacket.MessageExpiryInterval,
2626
SubscriptionIdentifiers = publishPacket.SubscriptionIdentifiers,
2727
TopicAlias = publishPacket.TopicAlias,

Source/MQTTnet/Formatter/MqttBufferReader.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ public uint ReadFourByteInteger()
5858
return value;
5959
}
6060

61-
public byte[] ReadRemainingData()
61+
public ReadOnlyMemory<byte> ReadRemainingData()
6262
{
6363
var bufferLength = BytesLeft;
6464
if (bufferLength == 0)

Source/MQTTnet/Formatter/MqttBufferWriter.cs

Lines changed: 82 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@
22
// The .NET Foundation licenses this file to you under the MIT license.
33
// See the LICENSE file in the project root for more information.
44

5+
using System.Buffers;
6+
using System.Buffers.Binary;
57
using System.Runtime.CompilerServices;
68
using System.Text;
79
using MQTTnet.Exceptions;
@@ -108,29 +110,45 @@ public void Write(MqttBufferWriter propertyWriter)
108110
WriteBinary(propertyWriter._buffer, 0, propertyWriter.Length);
109111
}
110112

111-
public void WriteBinary(byte[] value)
113+
public void WriteBinary(ReadOnlySequence<byte> value)
112114
{
113-
if (value == null || value.Length == 0)
115+
if (value.IsEmpty)
114116
{
115-
EnsureAdditionalCapacity(2);
117+
WriteEmptyBinary();
118+
return;
119+
}
116120

117-
_buffer[_position] = 0;
118-
_buffer[_position + 1] = 0;
121+
EnsureAdditionalCapacity(value.Length + 2);
122+
123+
_buffer[_position] = (byte)(value.Length >> 8);
124+
_buffer[_position + 1] = (byte)value.Length;
119125

120-
IncreasePosition(2);
126+
Advance(2);
127+
128+
foreach (var segment in value)
129+
{
130+
MqttMemoryHelper.Copy(segment, 0, _buffer, _position + 2, segment.Length);
131+
Advance(segment.Length);
121132
}
122-
else
133+
}
134+
135+
public void WriteBinary(ReadOnlyMemory<byte> value)
136+
{
137+
if (value.Length == 0)
123138
{
124-
var valueLength = value.Length;
139+
WriteEmptyBinary();
140+
return;
141+
}
125142

126-
EnsureAdditionalCapacity(valueLength + 2);
143+
EnsureAdditionalCapacity(value.Length + 2);
127144

128-
_buffer[_position] = (byte)(valueLength >> 8);
129-
_buffer[_position + 1] = (byte)valueLength;
145+
_buffer[_position] = (byte)(value.Length >> 8);
146+
_buffer[_position + 1] = (byte)value.Length;
130147

131-
MqttMemoryHelper.Copy(value, 0, _buffer, _position + 2, valueLength);
132-
IncreasePosition(valueLength + 2);
133-
}
148+
Advance(2);
149+
150+
MqttMemoryHelper.Copy(value, 0, _buffer, _position + 2, (int)value.Length);
151+
Advance(value.Length);
134152
}
135153

136154
public void WriteBinary(byte[] buffer, int offset, int count)
@@ -145,15 +163,7 @@ public void WriteBinary(byte[] buffer, int offset, int count)
145163
EnsureAdditionalCapacity(count);
146164

147165
MqttMemoryHelper.Copy(buffer, offset, _buffer, _position, count);
148-
IncreasePosition(count);
149-
}
150-
151-
public void WriteByte(byte @byte)
152-
{
153-
EnsureAdditionalCapacity(1);
154-
155-
_buffer[_position] = @byte;
156-
IncreasePosition(1);
166+
Advance(count);
157167
}
158168

159169
public void WriteString(string value)
@@ -165,7 +175,7 @@ public void WriteString(string value)
165175
_buffer[_position] = 0;
166176
_buffer[_position + 1] = 0;
167177

168-
IncreasePosition(2);
178+
Advance(2);
169179
}
170180
else
171181
{
@@ -189,34 +199,60 @@ public void WriteString(string value)
189199
_buffer[_position] = (byte)(writtenBytes >> 8);
190200
_buffer[_position + 1] = (byte)writtenBytes;
191201

192-
IncreasePosition(writtenBytes + 2);
202+
Advance(writtenBytes + 2);
193203
}
194204
}
195205

206+
public void WriteByte(byte @byte)
207+
{
208+
const int size = sizeof(byte);
209+
var span = GetSpan(size);
210+
211+
span[0] = @byte;
212+
Advance(size);
213+
}
214+
196215
public void WriteTwoByteInteger(ushort value)
197216
{
198-
EnsureAdditionalCapacity(2);
217+
const int size = sizeof(ushort);
218+
var span = GetSpan(size);
219+
220+
BinaryPrimitives.WriteUInt16BigEndian(span, value);
221+
222+
Advance(size);
223+
}
224+
225+
[MethodImpl(MethodImplOptions.AggressiveInlining)]
226+
Span<byte> GetSpan(int size)
227+
{
228+
var freeSpace = _buffer.Length - _position;
229+
if (freeSpace < size)
230+
{
231+
EnsureCapacity(_buffer.Length + size - freeSpace);
232+
}
199233

200-
_buffer[_position] = (byte)(value >> 8);
201-
IncreasePosition(1);
202-
_buffer[_position] = (byte)value;
203-
IncreasePosition(1);
234+
return _buffer.AsSpan(_position, size);
204235
}
205236

237+
238+
239+
240+
241+
206242
public void WriteVariableByteInteger(uint value)
207243
{
208244
if (value == 0)
209245
{
210246
_buffer[_position] = 0;
211-
IncreasePosition(1);
247+
Advance(1);
212248

213249
return;
214250
}
215251

216252
if (value <= 127)
217253
{
218254
_buffer[_position] = (byte)value;
219-
IncreasePosition(1);
255+
Advance(1);
220256

221257
return;
222258
}
@@ -238,11 +274,11 @@ public void WriteVariableByteInteger(uint value)
238274
size++;
239275
} while (x > 0);
240276

241-
IncreasePosition(size);
277+
Advance(size);
242278
}
243279

244280
[MethodImpl(MethodImplOptions.AggressiveInlining)]
245-
void EnsureAdditionalCapacity(int additionalCapacity)
281+
void EnsureAdditionalCapacity(long additionalCapacity)
246282
{
247283
var bufferLength = _buffer.Length;
248284

@@ -256,7 +292,7 @@ void EnsureAdditionalCapacity(int additionalCapacity)
256292
}
257293

258294
[MethodImpl(MethodImplOptions.AggressiveInlining)]
259-
void EnsureCapacity(int capacity)
295+
void EnsureCapacity(long capacity)
260296
{
261297
var newBufferLength = _buffer.Length;
262298

@@ -275,7 +311,7 @@ void EnsureCapacity(int capacity)
275311
}
276312

277313
[MethodImpl(MethodImplOptions.AggressiveInlining)]
278-
void IncreasePosition(int length)
314+
void Advance(int length)
279315
{
280316
_position += length;
281317

@@ -286,4 +322,14 @@ void IncreasePosition(int length)
286322
Length = _position;
287323
}
288324
}
325+
326+
void WriteEmptyBinary()
327+
{
328+
EnsureAdditionalCapacity(2);
329+
330+
_buffer[_position] = 0;
331+
_buffer[_position + 1] = 0;
332+
333+
Advance(2);
334+
}
289335
}

Source/MQTTnet/Formatter/MqttConnectPacketFactory.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
// The .NET Foundation licenses this file to you under the MIT license.
33
// See the LICENSE file in the project root for more information.
44

5+
using System.Buffers;
56
using MQTTnet.Packets;
67

78
namespace MQTTnet.Formatter;
@@ -40,7 +41,7 @@ public static MqttConnectPacket Create(MqttClientOptions clientOptions)
4041
connectPacket.WillFlag = true;
4142
connectPacket.WillTopic = clientOptions.WillTopic;
4243
connectPacket.WillQoS = clientOptions.WillQualityOfServiceLevel;
43-
connectPacket.WillMessage = clientOptions.WillPayload;
44+
connectPacket.WillMessage = new ReadOnlySequence<byte>(clientOptions.WillPayload);
4445
connectPacket.WillRetain = clientOptions.WillRetain;
4546
connectPacket.WillDelayInterval = clientOptions.WillDelayInterval;
4647
connectPacket.WillContentType = clientOptions.WillContentType;

Source/MQTTnet/Formatter/V3/MqttV3PacketFormatter.cs

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -201,7 +201,7 @@ MqttConnectPacket DecodeConnectPacket(ArraySegment<byte> body)
201201
packet.WillRetain = willRetain;
202202

203203
packet.WillTopic = _bufferReader.ReadString();
204-
packet.WillMessage = _bufferReader.ReadBinaryData();
204+
packet.WillMessage = new ReadOnlySequence<byte>(_bufferReader.ReadBinaryData());
205205
}
206206

207207
if (usernameFlag)
@@ -271,7 +271,7 @@ MqttPublishPacket DecodePublishPacket(ReceivedMqttPacket receivedMqttPacket)
271271

272272
if (!_bufferReader.EndOfStream)
273273
{
274-
packet.PayloadSegment = new ArraySegment<byte>(_bufferReader.ReadRemainingData());
274+
packet.Payload = new ReadOnlySequence<byte>(_bufferReader.ReadRemainingData());
275275
}
276276

277277
return packet;
@@ -430,12 +430,12 @@ static byte EncodeConnectPacket(MqttConnectPacket packet, MqttBufferWriter buffe
430430
}
431431
}
432432

433-
if (packet.Password != null && packet.Username == null)
433+
if (packet.Password.Length > 0 && packet.Username == null)
434434
{
435435
throw new MqttProtocolViolationException("If the User Name Flag is set to 0, the Password Flag MUST be set to 0 [MQTT-3.1.2-22].");
436436
}
437437

438-
if (packet.Password != null)
438+
if (packet.Password.Length > 0)
439439
{
440440
connectFlags |= 0x40;
441441
}
@@ -460,7 +460,7 @@ static byte EncodeConnectPacket(MqttConnectPacket packet, MqttBufferWriter buffe
460460
bufferWriter.WriteString(packet.Username);
461461
}
462462

463-
if (packet.Password != null)
463+
if (packet.Password.Length > 0)
464464
{
465465
bufferWriter.WriteBinary(packet.Password);
466466
}
@@ -501,12 +501,12 @@ static byte EncodeConnectPacketV311(MqttConnectPacket packet, MqttBufferWriter b
501501
}
502502
}
503503

504-
if (packet.Password != null && packet.Username == null)
504+
if (packet.Password.Length > 0 && packet.Username == null)
505505
{
506506
throw new MqttProtocolViolationException("If the User Name Flag is set to 0, the Password Flag MUST be set to 0 [MQTT-3.1.2-22].");
507507
}
508508

509-
if (packet.Password != null)
509+
if (packet.Password.Length > 0)
510510
{
511511
connectFlags |= 0x40;
512512
}
@@ -531,7 +531,7 @@ static byte EncodeConnectPacketV311(MqttConnectPacket packet, MqttBufferWriter b
531531
bufferWriter.WriteString(packet.Username);
532532
}
533533

534-
if (packet.Password != null)
534+
if (packet.Password.Length > 0)
535535
{
536536
bufferWriter.WriteBinary(packet.Password);
537537
}

Source/MQTTnet/Formatter/V5/MqttV5PacketDecoder.cs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
// The .NET Foundation licenses this file to you under the MIT license.
33
// See the LICENSE file in the project root for more information.
44

5+
using System.Buffers;
56
using MQTTnet.Adapter;
67
using MQTTnet.Exceptions;
78
using MQTTnet.Packets;
@@ -314,7 +315,7 @@ MqttConnectPacket DecodeConnectPacket(ArraySegment<byte> body)
314315
}
315316

316317
packet.WillTopic = _bufferReader.ReadString();
317-
packet.WillMessage = _bufferReader.ReadBinaryData();
318+
packet.WillMessage = new ReadOnlySequence<byte>(_bufferReader.ReadBinaryData());
318319
packet.WillUserProperties = willPropertiesReader.CollectedUserProperties;
319320
}
320321

@@ -521,7 +522,7 @@ MqttPublishPacket DecodePublishPacket(byte header, ArraySegment<byte> body)
521522

522523
if (!_bufferReader.EndOfStream)
523524
{
524-
packet.PayloadSegment = new ArraySegment<byte>(_bufferReader.ReadRemainingData());
525+
packet.Payload = new ReadOnlySequence<byte>(_bufferReader.ReadBinaryData());
525526
}
526527

527528
return packet;

Source/MQTTnet/Formatter/V5/MqttV5PacketEncoder.cs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -138,12 +138,12 @@ byte EncodeConnectPacket(MqttConnectPacket packet)
138138
}
139139
}
140140

141-
if (packet.Password != null && packet.Username == null)
141+
if (packet.Password.Length > 0 && packet.Username == null)
142142
{
143143
throw new MqttProtocolViolationException("If the User Name Flag is set to 0, the Password Flag MUST be set to 0 [MQTT-3.1.2-22].");
144144
}
145145

146-
if (packet.Password != null)
146+
if (packet.Password.Length > 0)
147147
{
148148
connectFlags |= 0x40;
149149
}
@@ -193,7 +193,7 @@ byte EncodeConnectPacket(MqttConnectPacket packet)
193193
_bufferWriter.WriteString(packet.Username);
194194
}
195195

196-
if (packet.Password != null)
196+
if (packet.Password.Length > 0)
197197
{
198198
_bufferWriter.WriteBinary(packet.Password);
199199
}

0 commit comments

Comments
 (0)