@@ -23,7 +23,7 @@ static void blog(const char* format, ...) {
23
23
va_start (args, format);
24
24
std::string text = varlog (format, args);
25
25
va_end (args);
26
-
26
+
27
27
auto timeSinceStart = (std::chrono::high_resolution_clock::now () - tp);
28
28
auto hours = std::chrono::duration_cast<std::chrono::hours>(timeSinceStart);
29
29
timeSinceStart -= hours;
@@ -36,7 +36,7 @@ static void blog(const char* format, ...) {
36
36
auto microseconds = std::chrono::duration_cast<std::chrono::microseconds>(timeSinceStart);
37
37
timeSinceStart -= microseconds;
38
38
auto nanoseconds = std::chrono::duration_cast<std::chrono::nanoseconds>(timeSinceStart);
39
-
39
+
40
40
std::vector<char > timebuf (65535 , ' \0 ' );
41
41
std::string timeformat = " %.2d:%.2d:%.2d.%.3d.%.3d.%.3d: %*s\n " ;// "%*s";
42
42
sprintf_s (
@@ -177,22 +177,30 @@ static int server(int argc, char* argv[]);
177
177
static int client (int argc, char * argv[]);
178
178
179
179
int main (int argc, char * argv[]) {
180
- if ((argc = = 2 ) || (strcmp (argv[0 ], " client" ) == 0 )) {
180
+ if ((argc > = 2 ) || (strcmp (argv[0 ], " client" ) == 0 )) {
181
181
client (argc, argv);
182
182
} else {
183
183
server (argc, argv);
184
184
}
185
185
}
186
186
187
187
int serverInstanceThread (std::shared_ptr<os::named_socket_connection> ptr) {
188
+ size_t msgCount = 0 ;
189
+ std::vector<char > buf;
190
+ size_t avail;
188
191
while (ptr->good ()) {
189
- size_t msg = ptr->read_avail ();
190
- if (msg > 0 ) {
191
- ptr->write (ptr->read ());
192
- // blog("Reflected message with side %" PRIu64 ".", msg);
192
+ while ((avail = ptr->read_avail ()) > 0 ) {
193
+ buf.resize (avail);
194
+ size_t msg = ptr->read (buf.data (), avail);
195
+ if (ptr->write (buf) != msg) {
196
+ blog (" %llu: Send Buffer full at %llu messages." , ptr->get_client_id (), msgCount);
197
+ break ;
198
+ };
199
+ msgCount++;
193
200
}
194
201
if (ptr->read_avail () == 0 )
195
202
std::this_thread::sleep_for (std::chrono::milliseconds (1 ));
203
+ // blog("%llu: Total messages: %llu", ptr->get_client_id(), msgCount);
196
204
}
197
205
return 0 ;
198
206
}
@@ -209,7 +217,7 @@ int server(int argc, char* argv[]) {
209
217
210
218
blog (" Spawning %lld clients." , (int64_t )CLIENTCOUNT);
211
219
for (size_t idx = 0 ; idx < CLIENTCOUNT; idx++) {
212
- spawn (argv[0 ], std::string (argv[0 ]) + " client" , get_working_directory ());
220
+ spawn (std::string ( argv[0 ]), ' " ' + std::string (argv[0 ]) + ' " ' + " client" , get_working_directory ());
213
221
}
214
222
215
223
blog (" Waiting for data..." );
@@ -245,32 +253,42 @@ int client(int argc, char* argv[]) {
245
253
std::unique_ptr<os::named_socket> socket = os::named_socket::create ();
246
254
if (!socket->connect (CONN)) {
247
255
blog (" Failed starting client." );
248
- std::cin.get ();
249
256
return -1 ;
250
257
}
251
258
252
259
uint64_t inbox = 0 ;
253
260
uint64_t outbox = 0 ;
254
- uint64_t total = 10000 ;
261
+ uint64_t total = 100000 ;
262
+ auto tpstart = std::chrono::high_resolution_clock::now ();
263
+ std::vector<char > buf;
255
264
while (socket->get_connection ()->good ()) {
256
265
// std::cout << inbox << ", " << outbox << ", " << total << "." << std::endl;
257
266
if (outbox < total) {
258
- if (socket->get_connection ()->write (" Hello World\0 " , 11 ) == 11 ) {
259
- outbox++;
260
- if (outbox % 100 == 0 ) {
261
- blog (" Sent %lld messages so far." , outbox, 0 , 0 , 0 , 0 , 0 );
267
+ for (size_t idx = 0 ; idx < 1000 ; idx++) {
268
+ if (socket->get_connection ()->write (" Hello World\0 " , 11 ) == 11 ) {
269
+ outbox++;
270
+ } else {
271
+ break ;
262
272
}
273
+
274
+ if (outbox >= total) {
275
+ break ;
276
+ }
277
+ // if (outbox % 100 == 0) {
278
+ // blog("Sent %lld messages so far.", outbox, 0, 0, 0, 0, 0);
279
+ // }
263
280
}
264
281
}
265
282
266
283
if (inbox < total) {
267
284
size_t msg = socket->get_connection ()->read_avail ();
268
285
while (msg > 0 ) {
269
- auto buf = socket->get_connection ()->read ();
286
+ buf.resize (msg);
287
+ auto rbytes = socket->get_connection ()->read (buf.data (), buf.size ());
270
288
inbox++;
271
- if (inbox % 100 == 0 ) {
272
- blog (" Received %lld messages so far." , inbox, 0 , 0 , 0 , 0 , 0 );
273
- }
289
+ // if (inbox % 100 == 0) {
290
+ // blog("Received %lld messages so far.", inbox, 0, 0, 0, 0, 0);
291
+ // }
274
292
msg = socket->get_connection ()->read_avail ();
275
293
}
276
294
}
@@ -281,9 +299,20 @@ int client(int argc, char* argv[]) {
281
299
}
282
300
}
283
301
302
+ // std::this_thread::sleep_for(std::chrono::milliseconds(1));
303
+ }
304
+ auto tpend = std::chrono::high_resolution_clock::now ();
305
+
306
+ auto tpdurns = std::chrono::duration_cast<std::chrono::nanoseconds>(tpend - tpstart);
307
+ auto tpdurms = std::chrono::duration_cast<std::chrono::milliseconds>(tpend - tpstart);
308
+
309
+ blog (" Sent & Received %llu messages in %llu milliseconds." , total, tpdurms.count ());
310
+ blog (" Average %llu ns per message." , tpdurns.count () / total);
311
+
312
+ while (socket->get_connection ()->good ()) {
284
313
std::this_thread::sleep_for (std::chrono::milliseconds (1 ));
285
314
}
286
- socket->close ();
287
315
316
+ socket->close ();
288
317
return 0 ;
289
318
}
0 commit comments