@@ -93,6 +93,9 @@ public void ReadShouldLogDisconnectAndRecover()
93
93
using ( var socket = new KafkaTcpSocket ( mockLog . Object , _kafkaEndpoint ) )
94
94
using ( var conn = new KafkaConnection ( socket , log : mockLog . Object ) )
95
95
{
96
+ var disconnected = false ;
97
+ socket . OnServerDisconnected += ( ) => disconnected = true ;
98
+
96
99
TaskTest . WaitFor ( ( ) => server . ConnectionEventcount > 0 ) ;
97
100
Assert . That ( server . ConnectionEventcount , Is . EqualTo ( 1 ) ) ;
98
101
@@ -101,7 +104,8 @@ public void ReadShouldLogDisconnectAndRecover()
101
104
Assert . That ( server . DisconnectionEventCount , Is . EqualTo ( 1 ) ) ;
102
105
103
106
//Wait a while for the client to notice the disconnect and log
104
- Thread . Sleep ( 15 ) ;
107
+ TaskTest . WaitFor ( ( ) => disconnected ) ;
108
+
105
109
106
110
//should log an exception and keep going
107
111
mockLog . Verify ( x => x . ErrorFormat ( It . IsAny < string > ( ) , It . IsAny < Exception > ( ) ) ) ;
@@ -122,14 +126,17 @@ public void ReadShouldIgnoreMessageWithUnknownCorrelationId()
122
126
using ( var socket = new KafkaTcpSocket ( mockLog . Object , _kafkaEndpoint ) )
123
127
using ( var conn = new KafkaConnection ( socket , log : mockLog . Object ) )
124
128
{
129
+ var receivedData = false ;
130
+ socket . OnBytesReceived += i => receivedData = true ;
131
+
125
132
//send correlation message
126
133
server . SendDataAsync ( CreateCorrelationMessage ( correlationId ) ) . Wait ( TimeSpan . FromSeconds ( 5 ) ) ;
127
134
128
135
//wait for connection
129
136
TaskTest . WaitFor ( ( ) => server . ConnectionEventcount > 0 ) ;
130
137
Assert . That ( server . ConnectionEventcount , Is . EqualTo ( 1 ) ) ;
131
138
132
- Thread . Sleep ( 10 ) ;
139
+ TaskTest . WaitFor ( ( ) => receivedData ) ;
133
140
134
141
//should log a warning and keep going
135
142
mockLog . Verify ( x => x . WarnFormat ( It . IsAny < string > ( ) , It . Is < int > ( o => o == correlationId ) ) ) ;
@@ -198,7 +205,7 @@ public void SendAsyncShouldTimeoutMultipleMessagesAtATime()
198
205
using ( var socket = new KafkaTcpSocket ( _log , _kafkaEndpoint ) )
199
206
using ( var conn = new KafkaConnection ( socket , TimeSpan . FromMilliseconds ( 100 ) , log : _log ) )
200
207
{
201
- TaskTest . WaitFor ( ( ) => server . ConnectionEventcount > 0 ) ;
208
+ server . HasClientConnected . Wait ( TimeSpan . FromSeconds ( 3 ) ) ;
202
209
Assert . That ( server . ConnectionEventcount , Is . EqualTo ( 1 ) ) ;
203
210
204
211
var tasks = new [ ]
@@ -213,8 +220,8 @@ public void SendAsyncShouldTimeoutMultipleMessagesAtATime()
213
220
TaskTest . WaitFor ( ( ) => tasks . Any ( t => t . IsFaulted ) ) ;
214
221
foreach ( var task in tasks )
215
222
{
216
- Assert . That ( task . IsFaulted , Is . True ) ;
217
- Assert . That ( task . Exception . InnerException , Is . TypeOf < ResponseTimeoutException > ( ) ) ;
223
+ Assert . That ( task . IsFaulted , Is . True , "Task should have faulted." ) ;
224
+ Assert . That ( task . Exception . InnerException , Is . TypeOf < ResponseTimeoutException > ( ) , "Task fault should be of type ResponseTimeoutException." ) ;
218
225
}
219
226
}
220
227
}
0 commit comments