Skip to content

Commit 3ed85c7

Browse files
committed
Adjustments.
1 parent 9c02a96 commit 3ed85c7

File tree

3 files changed

+18
-11
lines changed

3 files changed

+18
-11
lines changed

src/common/ipc/IpcMessage.h

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,7 @@ class IpcMessageObjectImpl final : public IpcObject
116116
{
117117
int32_t ownerPid;
118118
int32_t ownerId;
119+
std::atomic_uint8_t alive;
119120
#ifdef IPC_MESSAGE_USE_SHARED_SIGNAL
120121
IpcSharedSignal receiverSignal;
121122
IpcSharedSignal senderSignal;
@@ -140,6 +141,7 @@ class IpcMessageObjectImpl final : public IpcObject
140141
{
141142
header->ownerPid = (int) getpid();
142143
header->ownerId = ++IPC_MESSAGE_COUNTER;
144+
header->alive = 1;
143145

144146
#ifdef IPC_MESSAGE_USE_SHARED_SIGNAL
145147
new (&header->receiverSignal) IpcSharedSignal();
@@ -320,6 +322,9 @@ inline void IpcMessageReceiver<Message>::disconnect()
320322
{
321323
disconnected = true;
322324
std::lock_guard mutexLock(mutex);
325+
326+
const auto header = ipc.sharedMemory.getHeader();
327+
header->alive = 0;
323328
}
324329
}
325330

@@ -347,6 +352,7 @@ inline std::optional<Message> IpcMessageReceiver<Message>::receive(std::function
347352
}
348353

349354
ipc.receiverSignal->reset();
355+
header->receiverFlag.store(0, std::memory_order_release);
350356

351357
std::optional<Message> messageOpt;
352358

@@ -375,10 +381,7 @@ inline std::optional<Message> IpcMessageReceiver<Message>::receive(std::function
375381
memcpy(span.data(), header->messageBuffer, span.size());
376382
}
377383

378-
header->receiverFlag.store(0, std::memory_order_release);
379-
380384
ipc.senderSignal->signal();
381-
382385
header->senderFlag.store(1, std::memory_order_release);
383386

384387
return messageOpt;
@@ -430,6 +433,9 @@ inline bool IpcMessageSender<Message>::send(const Message& message, std::functio
430433

431434
while (!guard.tryLock(IPC_MESSAGE_TIMEOUT))
432435
{
436+
if (!header->alive.load(std::memory_order_relaxed))
437+
disconnected = true;
438+
433439
if (disconnected)
434440
return false;
435441

@@ -458,14 +464,16 @@ inline bool IpcMessageSender<Message>::send(const Message& message, std::functio
458464
memcpy(header->messageBuffer, span.data(), span.size());
459465
}
460466

461-
header->receiverFlag.store(1, std::memory_order_release);
462-
463467
ipc.receiverSignal->signal();
468+
header->receiverFlag.store(1, std::memory_order_release);
464469

465470
while (header->senderFlag.load(std::memory_order_acquire) == 0)
466471
{
467472
if (!ipc.senderSignal->wait(IPC_MESSAGE_TIMEOUT))
468473
{
474+
if (!header->alive.load(std::memory_order_relaxed))
475+
disconnected = true;
476+
469477
if (disconnected)
470478
return false;
471479

@@ -475,7 +483,6 @@ inline bool IpcMessageSender<Message>::send(const Message& message, std::functio
475483
}
476484

477485
ipc.senderSignal->reset();
478-
479486
header->senderFlag.store(0, std::memory_order_release);
480487

481488
return true;

src/common/ipc/tests/IpcChatTest.cpp

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -80,11 +80,11 @@ BOOST_AUTO_TEST_CASE(IpcChatTest)
8080
IpcChatServer<RequestMessage, ResponseMessage> server(parameters);
8181
IpcChatClient<RequestMessage, ResponseMessage> client(parameters);
8282

83-
constexpr unsigned numMessages = 4'000;
83+
constexpr unsigned NUM_MESSAGES = 4'000;
8484
unsigned readCount = 0;
8585

8686
std::thread consumerThread([&]() {
87-
for (readCount = 0; readCount < numMessages; ++readCount)
87+
for (readCount = 0; readCount < NUM_MESSAGES; ++readCount)
8888
{
8989
const auto requestMessageOpt = server.receive();
9090

@@ -104,7 +104,7 @@ BOOST_AUTO_TEST_CASE(IpcChatTest)
104104
}
105105
});
106106

107-
for (unsigned writeCount = 0; writeCount < numMessages; ++writeCount)
107+
for (unsigned writeCount = 0; writeCount < NUM_MESSAGES; ++writeCount)
108108
{
109109
BOOST_CHECK(client.send(Request{ .clientAddress = client.getAddress(), .n = writeCount }));
110110
const auto responseMessageOpt = client.receive();
@@ -116,7 +116,7 @@ BOOST_AUTO_TEST_CASE(IpcChatTest)
116116

117117
consumerThread.join();
118118

119-
BOOST_CHECK_EQUAL(readCount, numMessages);
119+
BOOST_CHECK_EQUAL(readCount, NUM_MESSAGES);
120120
}
121121

122122

src/common/ipc/tests/IpcMessageTest.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -240,7 +240,7 @@ BOOST_AUTO_TEST_CASE(ServerDisconnectMessageTest)
240240
std::thread producerThread([&]() {
241241
try
242242
{
243-
while (!server.isDisconnected())
243+
while (!client.isDisconnected())
244244
{
245245
if (client.send(Message{0}))
246246
++produced;

0 commit comments

Comments
 (0)