diff --git a/builds/posix/make.android.arm64 b/builds/posix/make.android.arm64 index 20aa9587ab2..999d4159919 100644 --- a/builds/posix/make.android.arm64 +++ b/builds/posix/make.android.arm64 @@ -33,6 +33,9 @@ LIB_PLATFORM_RPATH=-Wl,-rpath,\$$ORIGIN LibraryFullName=$(LibraryBaseName) LibrarySoName=$(LibraryBaseName) +# Global c++ flags: firebird needs no RTTI, choose build standard and c++ specific warnings level +PLUSPLUS_FLAGS:= -fno-rtti -std=c++20 -Werror=delete-incomplete -Werror=return-type + COMMON_FLAGS=-ggdb -DFB_SEND_FLAGS=MSG_NOSIGNAL -DLINUX -DANDROID -DARM64 -pipe -MMD -fPIC -fmessage-length=0 \ -I$(ROOT)/extern/libtommath -I$(ROOT)/extern/libtomcrypt/src/headers \ $(CROSS_FLAGS) -fsigned-char \ diff --git a/builds/posix/make.android.arme b/builds/posix/make.android.arme index 283492d5554..dc9c2005735 100644 --- a/builds/posix/make.android.arme +++ b/builds/posix/make.android.arme @@ -33,6 +33,9 @@ LIB_PLATFORM_RPATH=-Wl,-rpath,\$$ORIGIN LibraryFullName=$(LibraryBaseName) LibrarySoName=$(LibraryBaseName) +# Global c++ flags: firebird needs no RTTI, choose build standard and c++ specific warnings level +PLUSPLUS_FLAGS:= -fno-rtti -std=c++20 -Werror=delete-incomplete -Werror=return-type + COMMON_FLAGS=-ggdb -DFB_SEND_FLAGS=MSG_NOSIGNAL -DLINUX -DANDROID -DARM -pipe -MMD -fPIC -fmessage-length=0 \ -I$(ROOT)/extern/libtommath -I$(ROOT)/extern/libtomcrypt/src/headers \ $(CROSS_FLAGS) -fsigned-char \ diff --git a/builds/posix/make.android.x86 b/builds/posix/make.android.x86 index 42906e00604..22dbc0717de 100644 --- a/builds/posix/make.android.x86 +++ b/builds/posix/make.android.x86 @@ -33,6 +33,9 @@ LIB_PLATFORM_RPATH=-Wl,-rpath,\$$ORIGIN LibraryFullName=$(LibraryBaseName) LibrarySoName=$(LibraryBaseName) +# Global c++ flags: firebird needs no RTTI, choose build standard and c++ specific warnings level +PLUSPLUS_FLAGS:= -fno-rtti -std=c++20 -Werror=delete-incomplete -Werror=return-type + COMMON_FLAGS=-ggdb -DFB_SEND_FLAGS=MSG_NOSIGNAL -DLINUX -DANDROID -pipe -MMD -fPIC -fmessage-length=0 \ -I$(ROOT)/extern/libtommath -I$(ROOT)/extern/libtomcrypt/src/headers \ $(CROSS_FLAGS) \ diff --git a/builds/posix/make.android.x86_64 b/builds/posix/make.android.x86_64 index 3602f4e2f68..c2b67a64f43 100644 --- a/builds/posix/make.android.x86_64 +++ b/builds/posix/make.android.x86_64 @@ -33,6 +33,9 @@ LIB_PLATFORM_RPATH=-Wl,-rpath,\$$ORIGIN LibraryFullName=$(LibraryBaseName) LibrarySoName=$(LibraryBaseName) +# Global c++ flags: firebird needs no RTTI, choose build standard and c++ specific warnings level +PLUSPLUS_FLAGS:= -fno-rtti -std=c++20 -Werror=delete-incomplete -Werror=return-type + COMMON_FLAGS=-ggdb -DFB_SEND_FLAGS=MSG_NOSIGNAL -DLINUX -DANDROID -DAMD64 -pipe -MMD -fPIC -fmessage-length=0 \ -I$(ROOT)/extern/libtommath -I$(ROOT)/extern/libtomcrypt/src/headers \ $(CROSS_FLAGS) \ diff --git a/builds/posix/make.shared.variables b/builds/posix/make.shared.variables index 5074e5cf68b..16fce089b02 100644 --- a/builds/posix/make.shared.variables +++ b/builds/posix/make.shared.variables @@ -39,7 +39,8 @@ AllObjects += $(Common_Objects) # Common test files COT1:= $(call dirObjects,common/tests) COT2:= $(call dirObjects,common/classes/tests) -Common_Test_Objects:= $(COT1) $(COT2) $(call makeObjects,yvalve,gds.cpp) +COT3:= $(call dirObjects,common/ipc/tests) +Common_Test_Objects:= $(COT1) $(COT2) $(COT3) $(call makeObjects,yvalve,gds.cpp) AllObjects += $(Common_Test_Objects) diff --git a/builds/win32/msvc15/common.vcxproj b/builds/win32/msvc15/common.vcxproj index ff1fd6032c6..2c8448d35db 100644 --- a/builds/win32/msvc15/common.vcxproj +++ b/builds/win32/msvc15/common.vcxproj @@ -197,6 +197,9 @@ + + + @@ -423,4 +426,4 @@ - \ No newline at end of file + diff --git a/builds/win32/msvc15/common.vcxproj.filters b/builds/win32/msvc15/common.vcxproj.filters index 28803f10167..0145c958fce 100644 --- a/builds/win32/msvc15/common.vcxproj.filters +++ b/builds/win32/msvc15/common.vcxproj.filters @@ -626,5 +626,14 @@ headers + + headers + + + headers + + + headers + - \ No newline at end of file + diff --git a/builds/win32/msvc15/common_test.vcxproj b/builds/win32/msvc15/common_test.vcxproj index 6f8db374594..507da6124ea 100644 --- a/builds/win32/msvc15/common_test.vcxproj +++ b/builds/win32/msvc15/common_test.vcxproj @@ -249,6 +249,8 @@ + + diff --git a/builds/win32/msvc15/common_test.vcxproj.filters b/builds/win32/msvc15/common_test.vcxproj.filters index cf423d69fd9..711a69b6d04 100644 --- a/builds/win32/msvc15/common_test.vcxproj.filters +++ b/builds/win32/msvc15/common_test.vcxproj.filters @@ -48,5 +48,11 @@ source + + source + + + source + diff --git a/configure.ac b/configure.ac index 65c6ae1d65c..ed992bf547d 100644 --- a/configure.ac +++ b/configure.ac @@ -1175,6 +1175,7 @@ dnl Checks for pthread functions AC_CHECK_FUNCS(pthread_mutexattr_setprotocol) AC_CHECK_FUNCS(pthread_mutexattr_setrobust) AC_CHECK_FUNCS(pthread_mutex_consistent) +AC_CHECK_FUNCS(pthread_mutex_timedlock) AC_CHECK_FUNCS(pthread_rwlockattr_setkind) AC_CHECK_FUNCS(pthread_cancel) AC_CHECK_FUNCS(pthread_atfork) diff --git a/src/common/StdHelper.h b/src/common/StdHelper.h index c055a1257e4..11449cb8481 100644 --- a/src/common/StdHelper.h +++ b/src/common/StdHelper.h @@ -24,8 +24,17 @@ #ifndef FB_COMMON_STD_HELPER_H #define FB_COMMON_STD_HELPER_H +#include +#include +#include +#include +#include +#include +#include "boost/type_traits/copy_cv.hpp" + namespace Firebird { + // To be used with std::visit template @@ -37,6 +46,116 @@ struct StdVisitOverloads : Ts... template StdVisitOverloads(Ts...) -> StdVisitOverloads; + +// Variant helpers + +template +struct IsVariant : std::false_type {}; + +template +struct IsVariant> : std::true_type {}; + +template +struct IsVariant> : std::true_type {}; + +template +concept Variant = IsVariant::value; + +template +struct VariantIndex; + +template +struct VariantIndex> +{ + static constexpr std::size_t value = []() constexpr + { + std::size_t index = 0; + const bool found = ((std::is_same_v ? true : (++index, false)) || ...); + return found ? index : std::variant_npos; + }(); +}; + +template +struct VariantIndex> +{ + static constexpr std::size_t value = VariantIndex>::value; +}; + +template +constexpr std::size_t VariantIndexValue = VariantIndex::value; + +template +concept VariantContains = Variant && requires +{ + requires VariantIndexValue != std::variant_npos; +}; + +template + requires VariantContains +constexpr std::size_t getVariantIndex() +{ + return VariantIndexValue; +} + + +template +constexpr std::size_t maxVariantSizeImpl(std::index_sequence) +{ + return std::max({sizeof(std::variant_alternative_t)...}); +} + +template +constexpr std::size_t maxVariantSize() +{ + return maxVariantSizeImpl(std::make_index_sequence>{}); +} + + +template +constexpr V createVariantByIndexImpl(std::size_t index, std::index_sequence) +{ + using FactoryFunc = V(*)(void); + constexpr FactoryFunc factories[] = { + +[] { return V{std::in_place_index}; }... + }; + + if (index < sizeof...(Is)) + return factories[index](); + else + throw std::out_of_range("Invalid variant index: " + std::to_string(index)); +} + +template +constexpr V createVariantByIndex(std::size_t index) +{ + return createVariantByIndexImpl( + index, + std::make_index_sequence>{} + ); +} + + +template +constexpr auto getVariantIndexAndSpan(V& message) +{ + return std::visit( + [](auto& arg) -> auto + { + using T = std::decay_t; + + return std::pair{ + getVariantIndex(), + std::span>{ + reinterpret_cast>>(&arg), + sizeof(T) + } + }; + }, + message + ); +} + + } // namespace Firebird #endif // FB_COMMON_STD_HELPER_H diff --git a/src/common/file_params.h b/src/common/file_params.h index bfde15c7e2d..c7db9e03c8e 100644 --- a/src/common/file_params.h +++ b/src/common/file_params.h @@ -44,6 +44,7 @@ static inline constexpr const char* TPC_HDR_FILE = "fb_tpc_%s"; static inline constexpr const char* TPC_BLOCK_FILE = "fb_tpc_%s_%" UQUADFORMAT; static inline constexpr const char* SNAPSHOTS_FILE = "fb_snap_%s"; static inline constexpr const char* PROFILER_FILE = "fb_profiler_%s_%" UQUADFORMAT; +static inline constexpr const char* IPC_CHAT_CLIENT_FILE = "fb_ipc_chat_%" UQUADFORMAT "_%" UQUADFORMAT; // Global usage static inline constexpr const char* TRACE_FILE = "fb" COMMON_FILE_PREFIX "_trace"; diff --git a/src/common/ipc/IpcChat.h b/src/common/ipc/IpcChat.h new file mode 100644 index 00000000000..50fb3273511 --- /dev/null +++ b/src/common/ipc/IpcChat.h @@ -0,0 +1,239 @@ +/* + * The contents of this file are subject to the Initial + * Developer's Public License Version 1.0 (the "License"); + * you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * http://www.ibphoenix.com/main.nfs?a=ibphoenix&page=ibp_idpl. + * + * Software distributed under the License is distributed AS IS, + * WITHOUT WARRANTY OF ANY KIND, either express or implied. + * See the License for the specific language governing rights + * and limitations under the License. + * + * The Original Code was created by Adriano dos Santos Fernandes + * for the Firebird Open Source RDBMS project. + * + * Copyright (c) 2025 Adriano dos Santos Fernandes + * and all contributors signed below. + * + * All Rights Reserved. + * Contributor(s): ______________________________________. + */ + +#ifndef COMMON_IPC_CHAT_H +#define COMMON_IPC_CHAT_H + +#include "firebird.h" +#include "../ipc/IpcMessage.h" +#include "../classes/auto.h" +#include "../classes/fb_string.h" +#include "../file_params.h" +#include +#include +#include +#include +#include + +#ifdef WIN_NT +#include +#endif + +namespace Firebird { + + +struct IpcChatClientAddress +{ + uint64_t pid; + uint64_t uid; +}; + + +inline std::atomic_uint64_t nextIpcChatClientAddressUid = 0; + + +template +class IpcChatServer final +{ +public: + explicit IpcChatServer(const IpcMessageParameters& parameters); + + IpcChatServer(const IpcChatServer&) = delete; + IpcChatServer& operator=(const IpcChatServer&) = delete; + + ~IpcChatServer() + { + disconnect(); + } + +public: + // May be called while receive or sendTo is being called in another thread + void disconnect(); + + std::optional> receive( + std::function idleFunc = nullptr); + + bool sendTo(const IpcChatClientAddress& clientAddress, const VarResponseMessage& message, + std::function idleFunc = nullptr); + + const auto& getParameters() const + { + return receiver.getParameters(); + } + + bool isDisconnected() const + { + return receiver.isDisconnected(); + } + +private: + IpcMessageReceiver> receiver; + const USHORT version; +}; + + +template +class IpcChatClient final +{ +public: + explicit IpcChatClient(const IpcMessageParameters& parameters); + + IpcChatClient(const IpcChatClient&) = delete; + IpcChatClient& operator=(const IpcChatClient&) = delete; + + ~IpcChatClient() + { + disconnect(); + } + +public: + // May be called while receive or send is being called in another thread + void disconnect(); + + std::optional receive(std::function idleFunc = nullptr); + + bool send(const VarRequestMessage& message, std::function idleFunc = nullptr); + + std::optional sendAndReceive(const VarRequestMessage& message, + std::function idleFunc = nullptr); + + const auto& getAddress() const + { + return address; + } + + const auto& getParameters() const + { + return receiver.getParameters(); + } + + bool isDisconnected() const + { + return receiver.isDisconnected() || sender.isDisconnected(); + } + +private: + static IpcMessageParameters buildReceiverParameters(const IpcChatClientAddress& address, USHORT version) + { + PathName fileName; + fileName.printf(IPC_CHAT_CLIENT_FILE, address.pid, address.uid); + + return { + .physicalName = fileName.c_str(), + .logicalName = "IpcChatClient", + .type = static_cast(SharedMemoryBase::SRAM_CHAT_CLIENT), + .version = version, + }; + } + +private: + const IpcChatClientAddress address; + IpcMessageSender> sender; + IpcMessageReceiver> receiver; +}; + + +template +inline IpcChatServer::IpcChatServer(const IpcMessageParameters& parameters) + : receiver(parameters), + version(parameters.version) +{ +} + +template +inline void IpcChatServer::disconnect() +{ + receiver.disconnect(); +} + +template +inline std::optional> +IpcChatServer::receive(std::function idleFunc) +{ + return receiver.receive(idleFunc); +} + +template +inline bool IpcChatServer::sendTo( + const IpcChatClientAddress& clientAddress, const VarResponseMessage& message, std::function idleFunc) +{ + PathName fileName; + fileName.printf(IPC_CHAT_CLIENT_FILE, clientAddress.pid, clientAddress.uid); + + IpcMessageSender> sender({ + .physicalName = fileName.c_str(), + .logicalName = "IpcChatClient", + .type = static_cast(SharedMemoryBase::SRAM_CHAT_CLIENT), + .version = version, + }); + + return sender.send(std::make_pair(message, clientAddress), idleFunc); +} + + +template +inline IpcChatClient::IpcChatClient(const IpcMessageParameters& parameters) + : address{ + .pid = static_cast(getpid()), + .uid = nextIpcChatClientAddressUid++ + }, + sender(parameters), + receiver(buildReceiverParameters(address, parameters.version)) +{ +} + +template +inline void IpcChatClient::disconnect() +{ + sender.disconnect(); + receiver.disconnect(); +} + +template +inline std::optional IpcChatClient::receive( + std::function idleFunc) +{ + const auto received = receiver.receive(idleFunc); + return received.has_value() ? std::make_optional(received->first) : std::nullopt; +} + +template +inline bool IpcChatClient::send(const VarRequestMessage& message, + std::function idleFunc) +{ + return sender.send(std::make_pair(message, address), idleFunc); +} + +template +inline std::optional IpcChatClient::sendAndReceive( + const VarRequestMessage& message, std::function idleFunc) +{ + if (!send(message, idleFunc)) + return std::nullopt; + + return receive(idleFunc); +} + + +} // namespace Firebird + +#endif // COMMON_IPC_CHAT_H diff --git a/src/common/ipc/IpcMessage.h b/src/common/ipc/IpcMessage.h new file mode 100644 index 00000000000..8c558824bd1 --- /dev/null +++ b/src/common/ipc/IpcMessage.h @@ -0,0 +1,494 @@ +/* + * The contents of this file are subject to the Initial + * Developer's Public License Version 1.0 (the "License"); + * you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * http://www.ibphoenix.com/main.nfs?a=ibphoenix&page=ibp_idpl. + * + * Software distributed under the License is distributed AS IS, + * WITHOUT WARRANTY OF ANY KIND, either express or implied. + * See the License for the specific language governing rights + * and limitations under the License. + * + * The Original Code was created by Adriano dos Santos Fernandes + * for the Firebird Open Source RDBMS project. + * + * Copyright (c) 2025 Adriano dos Santos Fernandes + * and all contributors signed below. + * + * All Rights Reserved. + * Contributor(s): ______________________________________. + */ + +#ifndef COMMON_IPC_MESSAGE_H +#define COMMON_IPC_MESSAGE_H + +#include "firebird.h" +#include "../StdHelper.h" +#include "../classes/fb_string.h" +#include "../StatusArg.h" +#include "../isc_s_proto.h" +#include "../isc_proto.h" +#include "../utils_proto.h" +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#ifdef WIN_NT +#include +#else +#include +#endif + +#ifndef WIN_NT +#define IPC_MESSAGE_USE_SHARED_SIGNAL +#endif + +#ifdef IPC_MESSAGE_USE_SHARED_SIGNAL +#include "../ipc/IpcSharedSignal.h" +#else +#include "../ipc/IpcNamedSignal.h" +#endif + +namespace Firebird { + + +inline constexpr auto IPC_MESSAGE_TIMEOUT = std::chrono::milliseconds(500); // 0.5s +inline constexpr auto IPC_MESSAGE_SIGNAL_FORMAT = "ipc_message_%d_%d"; +inline std::atomic_uint32_t IPC_MESSAGE_COUNTER = 0; + + +struct IpcMessageParameters final +{ + const std::string physicalName; + const std::string logicalName; + const USHORT type = 0; + const USHORT version = 0; +}; + + +/* + * MessageConcept can be a std::variant or a std::pair. + * std::variant is a variant message of PODs that can be exchanged between client and server. + * The optional FixedMessage is a fixed POD type that also goes together with the variant message. + */ +template +concept MessageConcept = + Variant || + (requires { + typename T::first_type; + typename T::second_type; + } && Variant); + + +template +class IpcMessageObjectImpl final : public IpcObject +{ +private: + template + struct IsPair : std::false_type {}; + + template + struct IsPair> : std::true_type {}; + +public: + static constexpr bool isMessagePair = IsPair::value; + +private: + static constexpr size_t getMaxSize() + { + if constexpr (isMessagePair) + return maxVariantSize() + sizeof(typename Message::second_type); + else + return maxVariantSize(); + } + + static_assert(getMaxSize() <= std::numeric_limits::max()); + +public: + struct Header : public MemoryHeader + { + int32_t ownerPid; + int32_t ownerId; + std::atomic_uint8_t alive; +#ifdef IPC_MESSAGE_USE_SHARED_SIGNAL + IpcSharedSignal receiverSignal; + IpcSharedSignal senderSignal; +#endif + std::atomic_uint8_t receiverFlag; + std::atomic_uint8_t senderFlag; + uint16_t messageLen; + uint8_t messageIndex; + uint8_t messageBuffer[getMaxSize()]; + }; + +public: + explicit IpcMessageObjectImpl(const IpcMessageParameters& aParameters, bool aIsOwner) + : parameters(aParameters), + sharedMemory(parameters.physicalName.c_str(), sizeof(Header), this), + isOwner(aIsOwner) + { + const auto header = sharedMemory.getHeader(); + checkHeader(header); + + if (isOwner) + { + header->ownerPid = (int) getpid(); + header->ownerId = ++IPC_MESSAGE_COUNTER; + header->alive = 1; + +#ifdef IPC_MESSAGE_USE_SHARED_SIGNAL + new (&header->receiverSignal) IpcSharedSignal(); + new (&header->senderSignal) IpcSharedSignal(); +#endif + } + + string signalPrefix; + signalPrefix.printf(IPC_MESSAGE_SIGNAL_FORMAT, (int) header->ownerPid, (int) header->ownerId); + +#ifdef IPC_MESSAGE_USE_SHARED_SIGNAL + receiverSignal = &header->receiverSignal; + senderSignal = &header->senderSignal; +#else + receiverSignal.emplace(signalPrefix + "_r"); + senderSignal.emplace(signalPrefix + "_s"); +#endif + } + + ~IpcMessageObjectImpl() + { + if (isOwner) + sharedMemory.removeMapFile(); + } + + IpcMessageObjectImpl(const IpcMessageObjectImpl&) = delete; + IpcMessageObjectImpl& operator=(const IpcMessageObjectImpl&) = delete; + +public: + bool initialize(SharedMemoryBase* sm, bool init) override + { + if (init) + { + const auto header = reinterpret_cast(sm->sh_mem_header); + + // Initialize the shared data header. + initHeader(header); + } + + return true; + } + + void mutexBug(int osErrorCode, const char* text) override + { + iscLogStatus(("Error when working with " + parameters.logicalName).c_str(), + (Arg::Gds(isc_sys_request) << text << Arg::OsError(osErrorCode)).value()); + } + + USHORT getType() const override + { + return parameters.type; + } + + USHORT getVersion() const override + { + return parameters.version; + } + + const char* getName() const override + { + return parameters.logicalName.c_str(); + } + +public: + IpcMessageParameters parameters; + SharedMemory
sharedMemory; +#ifdef IPC_MESSAGE_USE_SHARED_SIGNAL + IpcSharedSignal* receiverSignal = nullptr; + IpcSharedSignal* senderSignal = nullptr; +#else + std::optional receiverSignal; + std::optional senderSignal; +#endif + const bool isOwner; +}; + + +template +class IpcMessageReceiver final +{ +public: + explicit IpcMessageReceiver(const IpcMessageParameters& parameters); + + IpcMessageReceiver(const IpcMessageReceiver&) = delete; + IpcMessageReceiver& operator=(const IpcMessageReceiver&) = delete; + + ~IpcMessageReceiver(); + +public: + // May be called while receive is being called in another thread + void disconnect(); + + std::optional receive(std::function idleFunc = nullptr); + + const auto& getParameters() const + { + return ipc.parameters; + } + + bool isDisconnected() const + { + return disconnected; + } + +private: + IpcMessageObjectImpl ipc; + std::atomic_bool disconnected = false; + std::mutex mutex; +}; + +template +class IpcMessageSender final +{ +public: + explicit IpcMessageSender(const IpcMessageParameters& parameters); + + IpcMessageSender(const IpcMessageSender&) = delete; + IpcMessageSender& operator=(const IpcMessageSender&) = delete; + + ~IpcMessageSender(); + +public: + static bool sendTo(const IpcMessageParameters& parameters, const Message& message, + std::function idleFunc = nullptr); + +public: + // May be called while send is being called in another thread + void disconnect(); + + bool send(const Message& message, std::function idleFunc = nullptr); + + const auto& getParameters() const + { + return ipc.parameters; + } + + bool isDisconnected() const + { + return disconnected; + } + +private: + IpcMessageObjectImpl ipc; + std::atomic_bool disconnected = false; + std::mutex mutex; +}; + + +template +inline IpcMessageReceiver::IpcMessageReceiver(const IpcMessageParameters& parameters) + : ipc(parameters, true) +{ +} + +template +inline IpcMessageReceiver::~IpcMessageReceiver() +{ + const auto header = ipc.sharedMemory.getHeader(); + + disconnect(); + +#ifdef IPC_MESSAGE_USE_SHARED_SIGNAL + header->receiverSignal.~IpcSharedSignal(); + header->senderSignal.~IpcSharedSignal(); +#elif !defined(WIN_NT) + string signalPrefix; + signalPrefix.printf(IPC_MESSAGE_SIGNAL_FORMAT, (int) header->ownerPid, (int) header->ownerId); + + IpcNamedSignal::remove(signalPrefix + "_r"); + IpcNamedSignal::remove(signalPrefix + "_s"); +#endif +} + +template +inline void IpcMessageReceiver::disconnect() +{ + if (!disconnected) + { + disconnected = true; + std::lock_guard mutexLock(mutex); + + const auto header = ipc.sharedMemory.getHeader(); + header->alive = 0; + } +} + +template +inline std::optional IpcMessageReceiver::receive(std::function idleFunc) +{ + std::lock_guard mutexLock(mutex); + + if (disconnected) + return std::nullopt; + + const auto sharedMemory = &ipc.sharedMemory; + const auto header = sharedMemory->getHeader(); + + while (header->receiverFlag.load(std::memory_order_acquire) == 0) + { + if (!ipc.receiverSignal->wait(IPC_MESSAGE_TIMEOUT)) + { + if (disconnected) + return std::nullopt; + + if (idleFunc) + idleFunc(); + } + } + + ipc.receiverSignal->reset(); + header->receiverFlag.store(0, std::memory_order_release); + + std::optional messageOpt; + + if constexpr (IpcMessageObjectImpl::isMessagePair) + { + messageOpt.emplace( + createVariantByIndex(header->messageIndex), + typename Message::second_type{}); + auto& varMessage = messageOpt->first; + auto& fixedMessage = messageOpt->second; + + const auto span = getVariantIndexAndSpan(varMessage).second; + fb_assert(span.size() == header->messageLen); + + memcpy(&fixedMessage, header->messageBuffer, sizeof(fixedMessage)); + memcpy(span.data(), header->messageBuffer + sizeof(fixedMessage), span.size()); + } + else + { + messageOpt.emplace(createVariantByIndex(header->messageIndex)); + auto& varMessage = messageOpt.value(); + + const auto span = getVariantIndexAndSpan(varMessage).second; + fb_assert(span.size() == header->messageLen); + + memcpy(span.data(), header->messageBuffer, span.size()); + } + + ipc.senderSignal->signal(); + header->senderFlag.store(1, std::memory_order_release); + + return messageOpt; +} + + +template +inline IpcMessageSender::IpcMessageSender(const IpcMessageParameters& parameters) + : ipc(parameters, false) +{ +} + +template +inline IpcMessageSender::~IpcMessageSender() +{ + disconnect(); +} + +template +inline bool IpcMessageSender::sendTo(const IpcMessageParameters& parameters, const Message& message, + std::function idleFunc) +{ + IpcMessageSender sender(parameters); + return sender.send(message, idleFunc); +} + +template +inline void IpcMessageSender::disconnect() +{ + if (!disconnected) + { + disconnected = true; + std::lock_guard mutexLock(mutex); + } +} + +template +inline bool IpcMessageSender::send(const Message& message, std::function idleFunc) +{ + std::lock_guard mutexLock(mutex); + + if (disconnected) + return false; + + const auto sharedMemory = &ipc.sharedMemory; + const auto header = sharedMemory->getHeader(); + + SharedMutexGuard guard(sharedMemory, false); + + while (!guard.tryLock(IPC_MESSAGE_TIMEOUT)) + { + if (!header->alive.load(std::memory_order_relaxed)) + disconnected = true; + + if (disconnected) + return false; + + if (idleFunc) + idleFunc(); + } + + if constexpr (IpcMessageObjectImpl::isMessagePair) + { + const auto& varMessage = message.first; + const auto& fixedMessage = message.second; + + const auto [index, span] = getVariantIndexAndSpan(varMessage); + + header->messageIndex = index; + header->messageLen = static_cast(span.size()); + memcpy(header->messageBuffer, &fixedMessage, sizeof(fixedMessage)); + memcpy(header->messageBuffer + sizeof(fixedMessage), span.data(), span.size()); + } + else + { + const auto [index, span] = getVariantIndexAndSpan(message); + + header->messageIndex = index; + header->messageLen = static_cast(span.size()); + memcpy(header->messageBuffer, span.data(), span.size()); + } + + ipc.receiverSignal->signal(); + header->receiverFlag.store(1, std::memory_order_release); + + while (header->senderFlag.load(std::memory_order_acquire) == 0) + { + if (!ipc.senderSignal->wait(IPC_MESSAGE_TIMEOUT)) + { + if (!header->alive.load(std::memory_order_relaxed)) + disconnected = true; + + if (disconnected) + return false; + + if (idleFunc) + idleFunc(); + } + } + + ipc.senderSignal->reset(); + header->senderFlag.store(0, std::memory_order_release); + + return true; +} + + +} // namespace Firebird + +#endif // COMMON_IPC_MESSAGE_H diff --git a/src/common/ipc/IpcNamedSignal.h b/src/common/ipc/IpcNamedSignal.h new file mode 100644 index 00000000000..667e4f51095 --- /dev/null +++ b/src/common/ipc/IpcNamedSignal.h @@ -0,0 +1,220 @@ +/* + * The contents of this file are subject to the Initial + * Developer's Public License Version 1.0 (the "License"); + * you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * http://www.ibphoenix.com/main.nfs?a=ibphoenix&page=ibp_idpl. + * + * Software distributed under the License is distributed AS IS, + * WITHOUT WARRANTY OF ANY KIND, either express or implied. + * See the License for the specific language governing rights + * and limitations under the License. + * + * The Original Code was created by Adriano dos Santos Fernandes + * for the Firebird Open Source RDBMS project. + * + * Copyright (c) 2025 Adriano dos Santos Fernandes + * and all contributors signed below. + * + * All Rights Reserved. + * Contributor(s): ______________________________________. + */ + +#ifndef COMMON_IPC_NAMED_SIGNAL_H +#define COMMON_IPC_NAMED_SIGNAL_H + +#include "firebird.h" +#include "../classes/fb_string.h" +#include "../StatusArg.h" +#include "../utils_proto.h" +#include +#ifdef WIN_NT +#include +#else +#include "fb_pthread.h" +#include +#include +#include +#endif + +#ifdef ANDROID +#error This is not supported in Android. +#endif + +namespace Firebird { + + +// An event in Windows, a semaphore in POSIX. +class IpcNamedSignal final +{ +public: + explicit IpcNamedSignal(const string& name); + ~IpcNamedSignal(); + + IpcNamedSignal(const IpcNamedSignal&) = delete; + IpcNamedSignal& operator=(const IpcNamedSignal&) = delete; + +private: + static string fixName(const string& name); + +#ifndef WIN_NT +public: + static void remove(const string& name) + { + if (sem_unlink(fixName(name).c_str()) != 0) + fb_assert(false); + } +#endif + +public: + void reset(); + void signal(); + bool wait(std::chrono::microseconds timeout); + +private: +#ifdef WIN_NT + HANDLE handle = INVALID_HANDLE_VALUE; +#else + sem_t* handle = nullptr; +#endif +}; + + +#ifdef WIN_NT + + +inline IpcNamedSignal::IpcNamedSignal(const string& name) +{ + TEXT objectName[BUFFER_TINY]; + strncpy(objectName, name.c_str(), sizeof(objectName)); + + if (!fb_utils::private_kernel_object_name(objectName, sizeof(objectName))) + (Arg::Gds(isc_random) << "private_kernel_object_name failed").raise(); + + handle = CreateEvent(nullptr, TRUE, FALSE, objectName); + + if (!handle) + system_error::raise("CreateEvent"); + + SetHandleInformation(handle, HANDLE_FLAG_INHERIT, 0); +} + +inline IpcNamedSignal::~IpcNamedSignal() +{ + if (handle) + CloseHandle(handle); +} + +inline string IpcNamedSignal::fixName(const string& name) +{ + return name; +} + +inline void IpcNamedSignal::reset() +{ + ResetEvent(handle); +} + +inline void IpcNamedSignal::signal() +{ + if (!SetEvent(handle)) + system_error::raise("SetEvent"); +} + +inline bool IpcNamedSignal::wait(std::chrono::microseconds timeout) +{ + const auto wait = WaitForSingleObject(handle, timeout.count() / 1'000); + + if (wait == WAIT_OBJECT_0) + return true; + else if (wait == WAIT_TIMEOUT) + return false; + else + system_error::raise("WaitForSingleObject"); +} + + +#else + + +inline IpcNamedSignal::IpcNamedSignal(const string& name) +{ + handle = sem_open(fixName(name).c_str(), O_CREAT, S_IRUSR | S_IWUSR, 0); + + if (handle == SEM_FAILED) + system_error::raise("sem_open"); +} + +inline IpcNamedSignal::~IpcNamedSignal() +{ + if (handle) + sem_close(handle); +} + +inline string IpcNamedSignal::fixName(const string& name) +{ + if (name[0] == '/') + return name; + + return "/" + name; +} + +inline void IpcNamedSignal::reset() +{ + int semVal; + + while (sem_getvalue(handle, &semVal) == 0 && semVal > 0) + sem_trywait(handle); +} + +inline void IpcNamedSignal::signal() +{ + if (sem_post(handle) != 0) + system_error::raise("sem_post"); +} + +inline bool IpcNamedSignal::wait(std::chrono::microseconds timeout) +{ + timespec timer; + +#if defined(HAVE_CLOCK_GETTIME) + clock_gettime(CLOCK_REALTIME, &timer); +#elif defined(HAVE_GETTIMEOFDAY) + struct timeval tp; + GETTIMEOFDAY(&tp); + timer.tv_sec = tp.tv_sec; + timer.tv_nsec = tp.tv_usec * 1'000; +#else + struct timeb time_buffer; + ftime(&time_buffer); + timer.tv_sec = time_buffer.time; + timer.tv_nsec = time_buffer.millitm * 1'000'000; +#endif + + constexpr SINT64 BILLION = 1'000'000'000; + const SINT64 nanos = + std::chrono::seconds(timer.tv_sec).count() + timer.tv_nsec + std::chrono::nanoseconds(timeout).count(); + timer.tv_sec = nanos / BILLION; + timer.tv_nsec = nanos % BILLION; + +#ifdef HAVE_SEM_TIMEDWAIT + const auto wait = sem_timedwait(handle, &timer); +#else + const auto wait = sem_timedwait_fallback(handle, &timer); +#endif + + if (wait == 0) + return true; + else if (errno == ETIMEDOUT) + return false; + else + system_error::raise("sem_timedwait"); +} + + +#endif + + +} // namespace Firebird + +#endif // COMMON_IPC_NAMED_SIGNAL_H diff --git a/src/common/ipc/IpcSharedSignal.h b/src/common/ipc/IpcSharedSignal.h new file mode 100644 index 00000000000..b86ee1d8c1b --- /dev/null +++ b/src/common/ipc/IpcSharedSignal.h @@ -0,0 +1,182 @@ +/* + * The contents of this file are subject to the Initial + * Developer's Public License Version 1.0 (the "License"); + * you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * http://www.ibphoenix.com/main.nfs?a=ibphoenix&page=ibp_idpl. + * + * Software distributed under the License is distributed AS IS, + * WITHOUT WARRANTY OF ANY KIND, either express or implied. + * See the License for the specific language governing rights + * and limitations under the License. + * + * The Original Code was created by Adriano dos Santos Fernandes + * for the Firebird Open Source RDBMS project. + * + * Copyright (c) 2025 Adriano dos Santos Fernandes + * and all contributors signed below. + * + * All Rights Reserved. + * Contributor(s): ______________________________________. + */ + +#ifndef COMMON_IPC_SHARED_SIGNAL_H +#define COMMON_IPC_SHARED_SIGNAL_H + +#include "firebird.h" + +#ifdef WIN_NT +#error This is not supported in Windows. +#endif + +#ifndef PTHREAD_PROCESS_SHARED +#error Your system must support PTHREAD_PROCESS_SHARED to use firebird. +#endif + +#include "../StatusArg.h" +#include "../common/os/mac_utils.h" +#include "../utils_proto.h" +#include +#include "fb_pthread.h" + +namespace Firebird { + + +class IpcSharedSignal final +{ +public: + explicit IpcSharedSignal(); + ~IpcSharedSignal(); + + IpcSharedSignal(const IpcSharedSignal&) = delete; + IpcSharedSignal& operator=(const IpcSharedSignal&) = delete; + +public: + void reset(); + void signal(); + bool wait(std::chrono::microseconds timeout); + +private: + pthread_mutex_t mutex[1]; + pthread_cond_t cond[1]; + std::atomic_uint8_t flag = 0; +}; + + +inline IpcSharedSignal::IpcSharedSignal() +{ + pthread_mutexattr_t mattr; + pthread_condattr_t cattr; + + if (pthread_mutexattr_init(&mattr) != 0) + system_error::raise("pthread_mutexattr_init"); + + if (pthread_condattr_init(&cattr) != 0) + system_error::raise("pthread_condattr_init"); + + if (!isSandboxed()) + { + if (pthread_mutexattr_setpshared(&mattr, PTHREAD_PROCESS_SHARED) != 0) + system_error::raise("pthread_mutexattr_setpshared"); + + if (pthread_condattr_setpshared(&cattr, PTHREAD_PROCESS_SHARED) != 0) + system_error::raise("pthread_condattr_setpshared"); + } + + if (pthread_mutex_init(mutex, &mattr) != 0) + system_error::raise("pthread_mutex_init"); + + if (pthread_cond_init(cond, &cattr) != 0) + system_error::raise("pthread_cond_init"); + + if (pthread_mutexattr_destroy(&mattr) != 0) + system_error::raise("pthread_mutexattr_destroy"); + + if (pthread_condattr_destroy(&cattr) != 0) + system_error::raise("pthread_condattr_destroy"); +} + +inline IpcSharedSignal::~IpcSharedSignal() +{ + pthread_mutex_destroy(mutex); + pthread_cond_destroy(cond); +} + +inline void IpcSharedSignal::reset() +{ + flag.store(0, std::memory_order_release); +} + +inline void IpcSharedSignal::signal() +{ + if (pthread_mutex_lock(mutex) != 0) + system_error::raise("pthread_mutex_lock"); + + flag.store(1, std::memory_order_release); + + const auto ret = pthread_cond_broadcast(cond); + + if (pthread_mutex_unlock(mutex) != 0) + system_error::raise("pthread_mutex_unlock"); + + if (ret != 0) + system_error::raise("pthread_cond_broadcast"); +} + +inline bool IpcSharedSignal::wait(std::chrono::microseconds timeout) +{ + if (flag.load(std::memory_order_acquire) == 1) + return true; + + timespec timer; + +#if defined(HAVE_CLOCK_GETTIME) + clock_gettime(CLOCK_REALTIME, &timer); +#elif defined(HAVE_GETTIMEOFDAY) + struct timeval tp; + GETTIMEOFDAY(&tp); + timer.tv_sec = tp.tv_sec; + timer.tv_nsec = tp.tv_usec * 1'000; +#else + struct timeb time_buffer; + ftime(&time_buffer); + timer.tv_sec = time_buffer.time; + timer.tv_nsec = time_buffer.millitm * 1'000'000; +#endif + + bool ret = true; + + if (pthread_mutex_lock(mutex) != 0) + system_error::raise("pthread_mutex_lock"); + + do + { + if (flag.load(std::memory_order_acquire) == 1) + { + ret = true; + break; + } + + // The Posix pthread_cond_wait & pthread_cond_timedwait calls + // atomically release the mutex and start a wait. + // The mutex is reacquired before the call returns. + const auto wait = pthread_cond_timedwait(cond, mutex, &timer); + + if (wait == ETIMEDOUT) + { + // The timer expired - see if the event occurred and return + ret = flag.load(std::memory_order_acquire) == 1; + break; + } + } while (true); + + if (pthread_mutex_unlock(mutex) != 0) + system_error::raise("pthread_mutex_unlock"); + + return ret; +} + + +} // namespace Firebird + +#endif // COMMON_IPC_SHARED_SIGNAL_H diff --git a/src/common/ipc/tests/IpcChatTest.cpp b/src/common/ipc/tests/IpcChatTest.cpp new file mode 100644 index 00000000000..1d42508a163 --- /dev/null +++ b/src/common/ipc/tests/IpcChatTest.cpp @@ -0,0 +1,124 @@ +/* + * The contents of this file are subject to the Initial + * Developer's Public License Version 1.0 (the "License"); + * you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * http://www.ibphoenix.com/main.nfs?a=ibphoenix&page=ibp_idpl. + * + * Software distributed under the License is distributed AS IS, + * WITHOUT WARRANTY OF ANY KIND, either express or implied. + * See the License for the specific language governing rights + * and limitations under the License. + * + * The Original Code was created by Adriano dos Santos Fernandes + * for the Firebird Open Source RDBMS project. + * + * Copyright (c) 2025 Adriano dos Santos Fernandes + * and all contributors signed below. + * + * All Rights Reserved. + * Contributor(s): ______________________________________. + */ + +#include "firebird.h" +#include "boost/test/unit_test.hpp" +#include "../common/classes/auto.h" +#include "../common/classes/fb_string.h" +#include "../common/ipc/IpcChat.h" +#include +#include +#include +#include +#include + +using namespace Firebird; +using namespace std::chrono_literals; + + +static std::string getTempPath() +{ + static std::atomic counter{0}; + + const auto now = std::chrono::system_clock::now(); + const auto nowNs = std::chrono::duration_cast(now.time_since_epoch()).count(); + + return "chat_test_" + + std::to_string(nowNs) + "_" + + std::to_string(counter.fetch_add(1)); +} + + +BOOST_AUTO_TEST_SUITE(CommonSuite) +BOOST_AUTO_TEST_SUITE(IpcChatSuite) + + +BOOST_AUTO_TEST_CASE(IpcChatTest) +{ + struct Request + { + IpcChatClientAddress clientAddress; + unsigned n; + }; + + struct Response + { + unsigned n; + }; + + using RequestMessage = std::variant; + using ResponseMessage = std::variant; + + const auto testPath = getTempPath(); + + const IpcMessageParameters parameters{ + .physicalName = testPath, + .logicalName = "IpcChatClientTest", + .type = 1, + .version = 1 + }; + + IpcChatServer server(parameters); + IpcChatClient client(parameters); + + constexpr unsigned NUM_MESSAGES = 4'000; + unsigned readCount = 0; + + std::thread consumerThread([&]() { + for (readCount = 0; readCount < NUM_MESSAGES; ++readCount) + { + const auto requestMessageOpt = server.receive(); + + if (!requestMessageOpt.has_value()) + continue; + + const auto& [requestMessage, clientAddress] = requestMessageOpt.value(); + + if (clientAddress.pid != client.getAddress().pid || clientAddress.uid != client.getAddress().uid) + throw std::domain_error("Invalid address"); // cannot use boost.test here in another thread + + if (std::holds_alternative(requestMessage)) + { + const auto& request = std::get(requestMessage); + server.sendTo(request.clientAddress, Response{ .n = request.n * 2 }); + } + } + }); + + for (unsigned writeCount = 0; writeCount < NUM_MESSAGES; ++writeCount) + { + BOOST_CHECK(client.send(Request{ .clientAddress = client.getAddress(), .n = writeCount })); + const auto responseMessageOpt = client.receive(); + const auto& responseMessage = responseMessageOpt.value(); + + BOOST_CHECK(std::holds_alternative(responseMessage)); + BOOST_CHECK_EQUAL(std::get(responseMessage).n, writeCount * 2); + } + + consumerThread.join(); + + BOOST_CHECK_EQUAL(readCount, NUM_MESSAGES); +} + + +BOOST_AUTO_TEST_SUITE_END() // IpcChatSuite +BOOST_AUTO_TEST_SUITE_END() // CommonSuite diff --git a/src/common/ipc/tests/IpcMessageTest.cpp b/src/common/ipc/tests/IpcMessageTest.cpp new file mode 100644 index 00000000000..be07889a88b --- /dev/null +++ b/src/common/ipc/tests/IpcMessageTest.cpp @@ -0,0 +1,283 @@ +/* + * The contents of this file are subject to the Initial + * Developer's Public License Version 1.0 (the "License"); + * you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * http://www.ibphoenix.com/main.nfs?a=ibphoenix&page=ibp_idpl. + * + * Software distributed under the License is distributed AS IS, + * WITHOUT WARRANTY OF ANY KIND, either express or implied. + * See the License for the specific language governing rights + * and limitations under the License. + * + * The Original Code was created by Adriano dos Santos Fernandes + * for the Firebird Open Source RDBMS project. + * + * Copyright (c) 2025 Adriano dos Santos Fernandes + * and all contributors signed below. + * + * All Rights Reserved. + * Contributor(s): ______________________________________. + */ + +#include "firebird.h" +#include "boost/test/unit_test.hpp" +#include "../common/classes/auto.h" +#include "../common/classes/fb_string.h" +#include "../common/ipc/IpcMessage.h" +#include +#include +#include +#include +#include +#include +#include + +using namespace Firebird; +using namespace std::chrono_literals; + + +static std::string getTempPath() +{ + static std::atomic counter{0}; + + const auto now = std::chrono::system_clock::now(); + const auto nowNs = std::chrono::duration_cast(now.time_since_epoch()).count(); + + return "message_test_" + + std::to_string(nowNs) + "_" + + std::to_string(counter.fetch_add(1)); +} + + +BOOST_AUTO_TEST_SUITE(CommonSuite) +BOOST_AUTO_TEST_SUITE(IpcMessageSuite) + + +BOOST_AUTO_TEST_CASE(ProducerConsumerMessageTest) +{ + struct Small + { + unsigned n; + }; + + struct Big + { + Big(unsigned aN) + : n(aN) + { + memset(s, n % 256, sizeof(s)); + } + + Big() + {} + + unsigned n; + char s[32000]{}; + }; + + struct Stop {}; + + using TestMessage = std::variant; + + constexpr unsigned MESSAGE_COUNT = 8'000; + constexpr unsigned THREAD_COUNT = 2; + + constexpr auto ENV_NAME = "FB_PRODUCER_CONSUMER_MESSAGE_TEST_NAME"; + constexpr auto ENV_RECEIVER = "FB_PRODUCER_CONSUMER_MESSAGE_TEST_RECEIVER"; + constexpr auto ENV_PRODUCER_PROCESSES = "FB_PRODUCER_CONSUMER_MESSAGE_TEST_PRODUCER_PROCESSES"; + + const char* const envName = std::getenv(ENV_NAME); + const char* const envReceiver = std::getenv(ENV_RECEIVER); + const auto envProducerProcesses = std::getenv(ENV_PRODUCER_PROCESSES); + + const bool multiProcess = envName != nullptr; + const bool multiProcessIsReceiver = multiProcess && envReceiver != nullptr; + const unsigned processCount = multiProcess ? (unsigned) std::stoi(envProducerProcesses) : 1u; + const auto testPath = envName ? std::string(envName) : getTempPath(); + + std::optional> receiver; + + if (multiProcessIsReceiver || !multiProcess) + { + receiver.emplace(IpcMessageParameters{ + .physicalName = testPath, + .logicalName = "IpcMessageTest", + .type = 1, + .version = 1 + }); + } + + std::vector>> senders; + + for (unsigned i = 0u; i < (multiProcessIsReceiver ? 0u : THREAD_COUNT); ++i) + { + senders.emplace_back(std::make_unique>(IpcMessageParameters{ + .physicalName = testPath, + .logicalName = "IpcMessageTest", + .type = 1, + .version = 1 + })); + } + + std::vector writeNum(THREAD_COUNT, 0); + unsigned readCount = 0; + unsigned stopReads = 0; + unsigned smallReads = 0; + unsigned bigReads = 0; + std::atomic_uint problems = 0; + std::vector threads; + + if (!multiProcess || !multiProcessIsReceiver) + { + const auto senderFunc = [&](unsigned i) { + for (writeNum[i] = MESSAGE_COUNT * i; writeNum[i] - MESSAGE_COUNT * i < MESSAGE_COUNT; ++writeNum[i]) + { + if (writeNum[i] % 2u == 0) + { + if (!senders[i]->send(Small{ writeNum[i] })) + ++problems; + } + else + { + if (!senders[i]->send(Big{ writeNum[i] })) + ++problems; + } + } + + if (!senders[i]->send(Stop{})) + ++problems; + }; + + for (unsigned i = 0u; i < THREAD_COUNT; ++i) + threads.emplace_back(senderFunc, i); + } + + if (!multiProcess || multiProcessIsReceiver) + { + threads.emplace_back([&]() { + for (readCount = 0u; readCount < (MESSAGE_COUNT + 1u) * processCount * THREAD_COUNT;) + { + const auto message = receiver->receive(); + + if (!message.has_value()) + continue; + + if (std::holds_alternative(message.value())) + ++stopReads; + else if (std::holds_alternative(message.value())) + ++smallReads; + else + { + if (std::holds_alternative(message.value())) + { + const auto& big = std::get(message.value()); + + char s[sizeof(big.s)]; + memset(s, big.n % 256, sizeof(s)); + if (memcmp(s, big.s, sizeof(s)) != 0) + ++problems; + + ++bigReads; + } + else + ++problems; + } + + ++readCount; + } + }); + } + + for (auto& thread : threads) + thread.join(); + + BOOST_CHECK_EQUAL(problems, 0u); + + if (!multiProcess || !multiProcessIsReceiver) + { + for (unsigned i = 0u; i < THREAD_COUNT; ++i) + BOOST_CHECK_EQUAL(writeNum[i], MESSAGE_COUNT * (i + 1u)); + } + + if (!multiProcess || multiProcessIsReceiver) + { + BOOST_CHECK_EQUAL(readCount, (MESSAGE_COUNT + 1u) * processCount * THREAD_COUNT); + BOOST_CHECK_EQUAL(stopReads, processCount * THREAD_COUNT); + BOOST_CHECK_EQUAL(smallReads, processCount * MESSAGE_COUNT * THREAD_COUNT / 2u); + BOOST_CHECK_EQUAL(bigReads, processCount * MESSAGE_COUNT * THREAD_COUNT / 2u); + } +} + + +BOOST_AUTO_TEST_CASE(ServerDisconnectMessageTest) +{ + struct Message + { + unsigned n; + }; + + using TestMessage = std::variant; + + const auto testPath = getTempPath(); + + IpcMessageReceiver server({ + .physicalName = testPath, + .logicalName = "IpcMessageTest", + .type = 1, + .version = 1 + }); + IpcMessageSender client({ + .physicalName = testPath, + .logicalName = "IpcMessageTest", + .type = 1, + .version = 1 + }); + + unsigned produced = 0; + unsigned consumed = 0; + + std::thread producerThread([&]() { + try + { + while (!client.isDisconnected()) + { + if (client.send(Message{0})) + ++produced; + } + } + catch (...) + { + } + }); + + std::thread consumerThread([&]() { + try + { + while (!server.isDisconnected()) + { + const auto message = server.receive(); + + if (message.has_value()) + ++consumed; + } + } + catch (...) + { + } + }); + + std::this_thread::sleep_for(1s); + server.disconnect(); + + producerThread.join(); + consumerThread.join(); + + BOOST_CHECK_GT(produced, 0u); + BOOST_CHECK_GT(consumed, 0u); + BOOST_CHECK(produced == consumed || produced - 1u == consumed); +} + + +BOOST_AUTO_TEST_SUITE_END() // IpcMessageSuite +BOOST_AUTO_TEST_SUITE_END() // CommonSuite diff --git a/src/common/isc_s_proto.h b/src/common/isc_s_proto.h index fd69f275399..f5889159a20 100644 --- a/src/common/isc_s_proto.h +++ b/src/common/isc_s_proto.h @@ -33,6 +33,8 @@ #include "../common/classes/RefCounted.h" #include "../common/classes/fb_string.h" #include "../common/classes/timestamp.h" +#include +#include // Firebird platform-specific synchronization data structures @@ -267,8 +269,8 @@ class SharedMemoryBase static void unlinkFile(const TEXT* expanded_filename) noexcept; Firebird::PathName getMapFileName(); - void mutexLock(); - bool mutexLockCond(); + bool mutexLock(std::optional timeout = std::nullopt); + bool mutexTryLock(); void mutexUnlock(); int eventInit(event_t* event); @@ -329,7 +331,8 @@ class SharedMemoryBase SRAM_TPC_SNAPSHOTS = 0xF7, SRAM_CHANGELOG_STATE = 0xF6, SRAM_TRACE_AUDIT_MTX = 0xF5, - SRAM_PROFILER = 0XF4 + SRAM_PROFILER = 0XF4, + SRAM_CHAT_CLIENT = 0XF3 }; protected: @@ -389,8 +392,8 @@ class SharedMutexGuard SharedMutexGuard(const SharedMutexGuard&) = delete; SharedMutexGuard& operator=(const SharedMutexGuard&) = delete; - bool tryLock() { - m_locked = m_shmem->mutexLockCond(); + bool tryLock(std::optional timeout) { + m_locked = m_shmem->mutexLock(timeout); return m_locked; } diff --git a/src/common/isc_sync.cpp b/src/common/isc_sync.cpp index abd5e881291..364be0d149d 100644 --- a/src/common/isc_sync.cpp +++ b/src/common/isc_sync.cpp @@ -38,6 +38,7 @@ */ #include "firebird.h" +#include #include #include #include @@ -2421,7 +2422,7 @@ void ISC_mutex_fini(struct mtx *mutex) } -int ISC_mutex_lock(struct mtx* mutex) +int ISC_mutex_lock(struct mtx* mutex, std::optional timeout) { /************************************** * @@ -2434,29 +2435,10 @@ int ISC_mutex_lock(struct mtx* mutex) * **************************************/ + const DWORD dwTimeout = timeout.has_value() ? static_cast(timeout->count()) : INFINITE; const DWORD status = (mutex->mtx_fast.lpSharedInfo) ? - enterFastMutex(&mutex->mtx_fast, INFINITE) : - WaitForSingleObject(mutex->mtx_fast.hEvent, INFINITE); - - return (status == WAIT_OBJECT_0 || status == WAIT_ABANDONED) ? FB_SUCCESS : FB_FAILURE; -} - - -int ISC_mutex_lock_cond(struct mtx* mutex) -{ -/************************************** - * - * I S C _ m u t e x _ l o c k _ c o n d ( W I N _ N T ) - * - ************************************** - * - * Functional description - * Conditionally seize a mutex. - * - **************************************/ - - const DWORD status = (mutex->mtx_fast.lpSharedInfo) ? - enterFastMutex(&mutex->mtx_fast, 0) : WaitForSingleObject(mutex->mtx_fast.hEvent, 0L); + enterFastMutex(&mutex->mtx_fast, dwTimeout) : + WaitForSingleObject(mutex->mtx_fast.hEvent, dwTimeout); return (status == WAIT_OBJECT_0 || status == WAIT_ABANDONED) ? FB_SUCCESS : FB_FAILURE; } @@ -2743,15 +2725,43 @@ static bool make_object_name(TEXT* buffer, size_t bufsize, #endif // WIN_NT -void SharedMemoryBase::mutexLock() +bool SharedMemoryBase::mutexLock(std::optional timeout) { #if defined(WIN_NT) - int state = ISC_mutex_lock(sh_mem_mutex); + int state = ISC_mutex_lock(sh_mem_mutex, timeout); #else // POSIX SHARED MUTEX - int state = pthread_mutex_lock(sh_mem_mutex->mtx_mutex); + int state; + + if (timeout.has_value()) + { + if (timeout.value().count() == 0) + state = pthread_mutex_trylock(sh_mem_mutex->mtx_mutex); + else + { + const auto now = std::chrono::system_clock::now(); + const auto deadline = now + timeout.value(); + const auto deadlineDuration = deadline.time_since_epoch(); + const auto seconds = std::chrono::duration_cast(deadlineDuration); + const auto nanoseconds = std::chrono::duration_cast(deadlineDuration - seconds); + + const timespec ts{ + .tv_sec = (time_t) seconds.count(), + .tv_nsec = (long) nanoseconds.count() + }; + +#ifdef HAVE_PTHREAD_MUTEX_TIMEDLOCK + state = pthread_mutex_timedlock(sh_mem_mutex->mtx_mutex, &ts); +#else + state = pthread_mutex_timedlock_fallback(sh_mem_mutex->mtx_mutex, &ts); +#endif + } + } + else + state = pthread_mutex_lock(sh_mem_mutex->mtx_mutex); + #ifdef USE_ROBUST_MUTEX if (state == EOWNERDEAD) { @@ -2764,35 +2774,16 @@ void SharedMemoryBase::mutexLock() #endif // os-dependent choice - if (state != 0) - { + if (!timeout.has_value() && state != 0) sh_mem_callback->mutexBug(state, "mutexLock"); - } -} - - -bool SharedMemoryBase::mutexLockCond() -{ -#if defined(WIN_NT) - - return ISC_mutex_lock_cond(sh_mem_mutex) == 0; - -#else // POSIX SHARED MUTEX - int state = pthread_mutex_trylock(sh_mem_mutex->mtx_mutex); -#ifdef USE_ROBUST_MUTEX - if (state == EOWNERDEAD) - { - // We always perform check for dead process - // Therefore may safely mark mutex as recovered - LOG_PTHREAD_ERROR(pthread_mutex_consistent(sh_mem_mutex->mtx_mutex)); - state = 0; - } -#endif return state == 0; +} -#endif // os-dependent choice +bool SharedMemoryBase::mutexTryLock() +{ + return mutexLock(std::chrono::milliseconds(0)); } diff --git a/src/common/tests/CommonTest.cpp b/src/common/tests/CommonTest.cpp index 45026c2d758..aae1729c3b6 100644 --- a/src/common/tests/CommonTest.cpp +++ b/src/common/tests/CommonTest.cpp @@ -1,5 +1,8 @@ #include "firebird.h" #include "firebird/Interface.h" +#include "../common/classes/init.h" +#include "../common/classes/TempFile.h" +#include "../common/config/config.h" #include "../common/gdsassert.h" #define BOOST_TEST_MODULE CommonTest @@ -14,6 +17,19 @@ int ISC_EXPORT fb_shutdown(unsigned int, const int) namespace Firebird { + namespace + { + struct Init + { + Init(auto&&) + { + Config::setRootDirectoryFromCommandLine(TempFile::getTempPath()); + } + }; + + GlobalPtr init; + } + IMaster* API_ROUTINE fb_get_master_interface() { fb_assert(false); diff --git a/src/include/fb_pthread.h b/src/include/fb_pthread.h index 16dc5b3be24..5d52196b112 100644 --- a/src/include/fb_pthread.h +++ b/src/include/fb_pthread.h @@ -28,10 +28,84 @@ #ifndef INCLUDE_FB_PTHREAD_H #define INCLUDE_FB_PTHREAD_H +#include "firebird.h" + #if defined(LINUX) && (!defined(__USE_GNU)) #define __USE_GNU 1 // required on this OS to have required for us stuff declared #endif // LINUX // should be defined before include - AP 2009 #include +#ifndef HAVE_PTHREAD_MUTEX_TIMEDLOCK + +#include +#include + +inline int pthread_mutex_timedlock_fallback(pthread_mutex_t* mutex, const timespec* timeout) +{ + timespec current_time, sleep_time; + sleep_time.tv_sec = 0; + sleep_time.tv_nsec = 10'000'000; // 10ms sleep between attempts + + do + { + int result = pthread_mutex_trylock(mutex); + + if (result == 0) + return 0; // Successfully acquired lock + + if (result != EBUSY && result != EINTR) + return result; // Some other error + + clock_gettime(CLOCK_REALTIME, ¤t_time); + + if (current_time.tv_sec > timeout->tv_sec || + (current_time.tv_sec == timeout->tv_sec && + current_time.tv_nsec >= timeout->tv_nsec)) + { + return ETIMEDOUT; + } + + nanosleep(&sleep_time, nullptr); + } while(true); +} + +#endif // !HAVE_PTHREAD_MUTEX_TIMEDLOCK + +#ifndef HAVE_SEM_TIMEDWAIT + +#include + +inline int sem_timedwait_fallback(sem_t* sem, const timespec* timeout) +{ + timespec current_time, sleep_time; + sleep_time.tv_sec = 0; + sleep_time.tv_nsec = 10'000'000; // 10ms sleep between attempts + + do + { + int result = sem_trywait(sem); + + if (result == 0) + return 0; // Successfully acquired lock + + if (result != EAGAIN && result != EINTR) + return result; // Some other error + + clock_gettime(CLOCK_REALTIME, ¤t_time); + + if (current_time.tv_sec > timeout->tv_sec || + (current_time.tv_sec == timeout->tv_sec && + current_time.tv_nsec >= timeout->tv_nsec)) + { + errno = ETIMEDOUT; + return -1; + } + + nanosleep(&sleep_time, nullptr); + } while(true); +} + +#endif // !HAVE_SEM_TIMEDWAIT + #endif // INCLUDE_FB_PTHREAD_H diff --git a/src/jrd/ProfilerManager.cpp b/src/jrd/ProfilerManager.cpp index ddf8d55e7cf..1c45579bd9b 100644 --- a/src/jrd/ProfilerManager.cpp +++ b/src/jrd/ProfilerManager.cpp @@ -22,6 +22,8 @@ #include "firebird.h" #include "../jrd/ProfilerManager.h" +#include "../common/ipc/IpcChat.h" +#include "../common/ipc/IpcMessage.h" #include "../jrd/Record.h" #include "../jrd/ini.h" #include "../jrd/tra.h" @@ -33,14 +35,7 @@ #include "../jrd/met_proto.h" #include "../jrd/pag_proto.h" #include "../jrd/tra_proto.h" -#include "../common/classes/Spinlock.h" - -#include -#include - -#ifdef WIN_NT -#include -#endif +#include using namespace Jrd; using namespace Firebird; @@ -51,95 +46,134 @@ using namespace Firebird; namespace { - class ProfilerIpc final : public IpcObject + struct CheckUserRequest { - public: - enum class Tag : UCHAR - { - NOP = 0, - - SERVER_STARTED, - SERVER_EXITED, - - RESPONSE, - EXCEPTION, - - FIRST_CLIENT_OP, - CANCEL_SESSION = FIRST_CLIENT_OP, - DISCARD, - FINISH_SESSION, - FLUSH, - PAUSE_SESSION, - RESUME_SESSION, - SET_FLUSH_INTERVAL, - START_SESSION - }; + char userName[USERNAME_LENGTH + 1]; + }; - struct Header : public MemoryHeader - { - event_t serverEvent; - event_t clientEvent; - USHORT bufferSize; - std::atomic tag; - unsigned seq; - SpinLock bufferMutex; - char userName[USERNAME_LENGTH + 1]; // \0 if has PROFILE_ANY_ATTACHMENT - alignas(FB_ALIGNMENT) UCHAR buffer[4096]; - }; + struct Nothing {}; + + struct ExceptionResponse + { + char text[4096]; + }; - static const USHORT VERSION = 3; + using IpcRequestMessage = std::variant< + CheckUserRequest, + ProfilerPackage::DiscardInput::Type, + ProfilerPackage::FlushInput::Type, + ProfilerPackage::CancelSessionInput::Type, + ProfilerPackage::PauseSessionInput::Type, + ProfilerPackage::ResumeSessionInput::Type, + ProfilerPackage::FinishSessionInput::Type, + ProfilerPackage::SetFlushIntervalInput::Type, + ProfilerPackage::StartSessionInput::Type + >; - public: - ProfilerIpc(thread_db* tdbb, MemoryPool& pool, AttNumber aAttachmentId, bool server = false); - ~ProfilerIpc(); + using IpcResponseMessage = std::variant< + Nothing, + ExceptionResponse, + ProfilerPackage::StartSessionOutput::Type + >; - ProfilerIpc(const ProfilerIpc&) = delete; - ProfilerIpc& operator=(const ProfilerIpc&) = delete; + IpcMessageParameters buildParameters(thread_db* tdbb, AttNumber attachmentId) + { + static_assert(std::is_same::value); - public: - bool initialize(SharedMemoryBase* sm, bool init) override; - void mutexBug(int osErrorCode, const char* text) override; + static constexpr USHORT VERSION = 3; - USHORT getType() const override - { - return SharedMemoryBase::SRAM_PROFILER; - } + const auto database = tdbb->getDatabase(); - USHORT getVersion() const override - { - return VERSION; - } + PathName fileName; + fileName.printf(PROFILER_FILE, database->getUniqueFileId().c_str(), attachmentId); + + return { + .physicalName = fileName.c_str(), + .logicalName = "ProfilerManager", + .type = static_cast(SharedMemoryBase::SRAM_PROFILER), + .version = VERSION, + }; + } + + void startRemoteProfiler(thread_db* tdbb, AttNumber attachmentId) + { + ThreadStatusGuard tempStatus(tdbb); + + Lock tempLock(tdbb, sizeof(SINT64), LCK_attachment); + tempLock.setKey(attachmentId); - const char* getName() const override + // Check if attachment is alive. + if (LCK_lock(tdbb, &tempLock, LCK_EX, LCK_NO_WAIT)) { - return "ProfilerManager"; + LCK_release(tdbb, &tempLock); + (Arg::Gds(isc_random) << "Cannot start remote profile session - attachment is not active").raise(); } - public: - template - void sendAndReceive(thread_db* tdbb, Tag tag, const Input* in, Output* out) + // Ask remote attachment to initialize the profile listener. + + tempLock.lck_type = LCK_profiler_listener; + + if (LCK_lock(tdbb, &tempLock, LCK_SR, LCK_WAIT)) + LCK_release(tdbb, &tempLock); + } + + const auto& checkResponseIsNotException(const std::optional& responseMessageOpt) + { + if (responseMessageOpt.has_value()) { - static_assert(sizeof(*in) <= sizeof(std::declval
().buffer), "Buffer size too small"); - internalSendAndReceive(tdbb, tag, in, sizeof(*in), out, sizeof(*out)); + if (const auto exceptionResponse = std::get_if(&responseMessageOpt.value())) + (Arg::Gds(isc_random) << exceptionResponse->text).raise(); } - template - void send(thread_db* tdbb, Tag tag, const Input* in) + return responseMessageOpt; + } + + const auto& checkResponseIsPresent(const std::optional& responseMessageOpt) + { + if (!responseMessageOpt.has_value()) + (Arg::Gds(isc_random) << "Profiler client disconnected from server").raise();; + + return responseMessageOpt; + } + + void checkResponseIsNothing(const std::optional& responseMessageOpt) + { + checkResponseIsPresent(responseMessageOpt); + + if (!std::holds_alternative(responseMessageOpt.value())) + (Arg::Gds(isc_random) << "Invalid profiler's remote response").raise(); + } + + template + std::optional clientSendAndReceiveMessage(thread_db* tdbb, AttNumber attachmentId, const T& in) + { + const auto attachment = tdbb->getAttachment(); + std::optional userName; + + if (!attachment->locksmith(tdbb, PROFILE_ANY_ATTACHMENT)) + userName = attachment->getUserName(); + + startRemoteProfiler(tdbb, attachmentId); + + EngineCheckout cout(tdbb, FB_FUNCTION); + + IpcChatClient chatClient(buildParameters(tdbb, attachmentId)); + + const auto udleFunc = [&] { + Attachment::SyncGuard attGuard(attachment, FB_FUNCTION); + JRD_reschedule(tdbb, true); + }; + + if (userName.has_value()) { - static_assert(sizeof(*in) <= sizeof(std::declval
().buffer), "Buffer size too small"); - internalSendAndReceive(tdbb, tag, in, sizeof(*in), nullptr, 0); + CheckUserRequest checkUserRequest; + strcpy(checkUserRequest.userName, userName->c_str()); + checkResponseIsNotException(chatClient.sendAndReceive(checkUserRequest, udleFunc)); } - private: - void internalSendAndReceive(thread_db* tdbb, Tag tag, const void* in, unsigned inSize, void* out, unsigned outSize); - void initClient(); - - public: - AutoPtr> sharedMemory; - AttNumber attachmentId; - const bool isServer; - }; -} // anonymous namespace + return checkResponseIsNotException(chatClient.sendAndReceive(in, udleFunc)); + } +} class Jrd::ProfilerListener final @@ -162,14 +196,12 @@ class Jrd::ProfilerListener final listener->watcherThread(); } - void processCommand(thread_db* tdbb, ProfilerIpc::Tag tag, UCharBuffer& buffer); + IpcResponseMessage processCommand(thread_db* tdbb, const IpcRequestMessage& requestMessage); private: Attachment* const attachment; - Firebird::Semaphore startupSemaphore; + IpcChatServer chatServer; ThreadFinishSync cleanupSync; - Firebird::AutoPtr ipc; - bool exiting = false; }; @@ -184,8 +216,7 @@ IExternalResultSet* ProfilerPackage::discardProcedure(ThrowStatusExceptionWrappe if (!in->attachmentIdNull && AttNumber(in->attachmentId) != attachment->att_attachment_id) { - ProfilerIpc ipc(tdbb, *getDefaultMemoryPool(), in->attachmentId); - ipc.send(tdbb, ProfilerIpc::Tag::DISCARD, in); + checkResponseIsNothing(clientSendAndReceiveMessage(tdbb, AttNumber(in->attachmentId), *in)); return nullptr; } @@ -204,8 +235,7 @@ IExternalResultSet* ProfilerPackage::flushProcedure(ThrowStatusExceptionWrapper* if (!in->attachmentIdNull && AttNumber(in->attachmentId) != attachment->att_attachment_id) { - ProfilerIpc ipc(tdbb, *getDefaultMemoryPool(), in->attachmentId); - ipc.send(tdbb, ProfilerIpc::Tag::FLUSH, in); + checkResponseIsNothing(clientSendAndReceiveMessage(tdbb, AttNumber(in->attachmentId), *in)); return nullptr; } @@ -224,8 +254,7 @@ IExternalResultSet* ProfilerPackage::cancelSessionProcedure(ThrowStatusException if (!in->attachmentIdNull && AttNumber(in->attachmentId) != attachment->att_attachment_id) { - ProfilerIpc ipc(tdbb, *getDefaultMemoryPool(), in->attachmentId); - ipc.send(tdbb, ProfilerIpc::Tag::CANCEL_SESSION, in); + checkResponseIsNothing(clientSendAndReceiveMessage(tdbb, AttNumber(in->attachmentId), *in)); return nullptr; } @@ -245,8 +274,7 @@ IExternalResultSet* ProfilerPackage::finishSessionProcedure(ThrowStatusException if (!in->attachmentIdNull && AttNumber(in->attachmentId) != attachment->att_attachment_id) { - ProfilerIpc ipc(tdbb, *getDefaultMemoryPool(), in->attachmentId); - ipc.send(tdbb, ProfilerIpc::Tag::FINISH_SESSION, in); + checkResponseIsNothing(clientSendAndReceiveMessage(tdbb, AttNumber(in->attachmentId), *in)); return nullptr; } @@ -265,8 +293,7 @@ IExternalResultSet* ProfilerPackage::pauseSessionProcedure(ThrowStatusExceptionW if (!in->attachmentIdNull && AttNumber(in->attachmentId) != attachment->att_attachment_id) { - ProfilerIpc ipc(tdbb, *getDefaultMemoryPool(), in->attachmentId); - ipc.send(tdbb, ProfilerIpc::Tag::PAUSE_SESSION, in); + checkResponseIsNothing(clientSendAndReceiveMessage(tdbb, AttNumber(in->attachmentId), *in)); return nullptr; } @@ -285,8 +312,7 @@ IExternalResultSet* ProfilerPackage::resumeSessionProcedure(ThrowStatusException if (!in->attachmentIdNull && AttNumber(in->attachmentId) != attachment->att_attachment_id) { - ProfilerIpc ipc(tdbb, *getDefaultMemoryPool(), in->attachmentId); - ipc.send(tdbb, ProfilerIpc::Tag::RESUME_SESSION, in); + checkResponseIsNothing(clientSendAndReceiveMessage(tdbb, AttNumber(in->attachmentId), *in)); return nullptr; } @@ -305,8 +331,7 @@ IExternalResultSet* ProfilerPackage::setFlushIntervalProcedure(ThrowStatusExcept if (!in->attachmentIdNull && AttNumber(in->attachmentId) != attachment->att_attachment_id) { - ProfilerIpc ipc(tdbb, *getDefaultMemoryPool(), in->attachmentId); - ipc.send(tdbb, ProfilerIpc::Tag::SET_FLUSH_INTERVAL, in); + checkResponseIsNothing(clientSendAndReceiveMessage(tdbb, AttNumber(in->attachmentId), *in)); return nullptr; } @@ -325,8 +350,17 @@ void ProfilerPackage::startSessionFunction(ThrowStatusExceptionWrapper* /*status if (!in->attachmentIdNull && AttNumber(in->attachmentId) != attachment->att_attachment_id) { - ProfilerIpc ipc(tdbb, *getDefaultMemoryPool(), in->attachmentId); - ipc.sendAndReceive(tdbb, ProfilerIpc::Tag::START_SESSION, in, out); + const auto responseMessageOpt = checkResponseIsPresent( + clientSendAndReceiveMessage(tdbb, AttNumber(in->attachmentId), *in)); + + if (std::holds_alternative(responseMessageOpt.value())) + *out = std::get(responseMessageOpt.value()); + else + { + fb_assert(false); + out->sessionIdNull = FB_TRUE; + } + return; } @@ -694,261 +728,20 @@ ProfilerManager::Statement* ProfilerManager::getStatement(Request* request) //-------------------------------------- -ProfilerIpc::ProfilerIpc(thread_db* tdbb, MemoryPool& pool, AttNumber aAttachmentId, bool server) - : attachmentId(aAttachmentId), - isServer(server) -{ - const auto database = tdbb->getDatabase(); - - string fileName; - static_assert(std::is_same::value); - fileName.printf(PROFILER_FILE, database->getUniqueFileId().c_str(), attachmentId); - - try - { - sharedMemory = FB_NEW_POOL(pool) SharedMemory
(fileName.c_str(), sizeof(Header), this); - } - catch (const Exception& ex) - { - iscLogException("ProfilerManager: cannot initialize the shared memory region", ex); - throw; - } - - const auto header = sharedMemory->getHeader(); - checkHeader(header); - - if (isServer) - { - SharedMutexGuard guard(sharedMemory); - - if (sharedMemory->eventInit(&header->serverEvent) != FB_SUCCESS) - (Arg::Gds(isc_random) << "ProfilerIpc eventInit(serverEvent) failed").raise(); - } -} - -ProfilerIpc::~ProfilerIpc() -{ - SharedMutexGuard guard(sharedMemory); - - const auto header = sharedMemory->getHeader(); - - event_t* evnt = this->isServer ? &header->serverEvent : &header->clientEvent; - if (evnt->event_pid) - { - sharedMemory->eventFini(evnt); - evnt->event_pid = 0; - } - - if (header->serverEvent.event_pid == 0 && header->clientEvent.event_pid == 0) - sharedMemory->removeMapFile(); -} - -bool ProfilerIpc::initialize(SharedMemoryBase* sm, bool init) -{ - if (init) - { - const auto header = reinterpret_cast(sm->sh_mem_header); - - // Initialize the shared data header. - initHeader(header); - } - - return true; -} - -void ProfilerIpc::mutexBug(int osErrorCode, const char* text) -{ - iscLogStatus("Error when working with profiler shared memory", - (Arg::Gds(isc_sys_request) << text << Arg::OsError(osErrorCode)).value()); -} - -void ProfilerIpc::internalSendAndReceive(thread_db* tdbb, Tag tag, - const void* in, unsigned inSize, void* out, unsigned outSize) -{ - const auto* attachment = tdbb->getAttachment(); - - { // scope - ThreadStatusGuard tempStatus(tdbb); - - Lock tempLock(tdbb, sizeof(SINT64), LCK_attachment); - tempLock.setKey(attachmentId); - - // Check if attachment is alive. - if (LCK_lock(tdbb, &tempLock, LCK_EX, LCK_NO_WAIT)) - { - LCK_release(tdbb, &tempLock); - (Arg::Gds(isc_random) << "Cannot start remote profile session - attachment is not active").raise(); - } - - // Ask remote attachment to initialize the profile listener. - - tempLock.lck_type = LCK_profiler_listener; - - if (LCK_lock(tdbb, &tempLock, LCK_SR, LCK_WAIT)) - LCK_release(tdbb, &tempLock); - } - - SharedMutexGuard guard(sharedMemory); - - const auto header = sharedMemory->getHeader(); - - initClient(); - - Cleanup finiClient([&] { - if (header->clientEvent.event_pid) - { - sharedMemory->eventFini(&header->clientEvent); - header->clientEvent.event_pid = 0; - } - }); - - const SLONG clientEventCounter = sharedMemory->eventClear(&header->clientEvent); - - std::unique_lock bufferMutexLock(header->bufferMutex); - - switch (header->tag) - { - case Tag::NOP: - (Arg::Gds(isc_random) << "Remote attachment failed to start listener thread").raise(); - break; - - case Tag::SERVER_EXITED: - (Arg::Gds(isc_random) << "Cannot start remote profile session - attachment exited").raise(); - break; - - default: - break; - }; - - if (attachment->locksmith(tdbb, PROFILE_ANY_ATTACHMENT)) - header->userName[0] = '\0'; - else - strcpy(header->userName, attachment->getUserName().c_str()); - - header->bufferSize = inSize; - - fb_assert(inSize <= sizeof(header->buffer)); - memcpy(header->buffer, in, inSize); - - header->tag = tag; - ++header->seq; - - bufferMutexLock.unlock(); - - if (sharedMemory->eventPost(&header->serverEvent) != FB_SUCCESS) - (Arg::Gds(isc_random) << "Cannot start remote profile session - attachment exited").raise(); - - constexpr SLONG TIMEOUT = 500 * 1000; // 0.5 sec - const int serverPID = header->serverEvent.event_pid; - - while (true) - { - { // scope - EngineCheckout cout(tdbb, FB_FUNCTION); - - if (sharedMemory->eventWait(&header->clientEvent, clientEventCounter, TIMEOUT) == FB_SUCCESS) - break; - - if (serverPID != getpid() && !ISC_check_process_existence(serverPID)) - { - // Server process was died or exited - fb_assert((header->tag == tag) || header->tag == Tag::SERVER_EXITED); - - if (header->tag == tag) - { - header->tag = Tag::SERVER_EXITED; - if (header->serverEvent.event_pid) - { - sharedMemory->eventFini(&header->serverEvent); - header->serverEvent.event_pid = 0; - } - } - - break; - } - } - - JRD_reschedule(tdbb, true); - } - - bufferMutexLock.lock(); - - switch (header->tag) - { - case Tag::SERVER_EXITED: - (Arg::Gds(isc_random) << "Cannot start remote profile session - attachment exited").raise(); - break; - - case Tag::RESPONSE: - fb_assert(outSize == header->bufferSize); - memcpy(out, header->buffer, header->bufferSize); - break; - - case Tag::EXCEPTION: - (Arg::Gds(isc_random) << (char*) header->buffer).raise(); - break; - - default: - fb_assert(false); - } -} - -void ProfilerIpc::initClient() -{ - // Shared memory mutex must be locked by caller - - fb_assert(isServer == false); - - const auto header = sharedMemory->getHeader(); - - // Here should not be event created by another alive client - - if (header->clientEvent.event_pid) - { - fb_assert(header->clientEvent.event_pid != getpid()); - - if (header->clientEvent.event_pid != getpid()) - { - if (ISC_check_process_existence(header->clientEvent.event_pid)) - (Arg::Gds(isc_random) << "ProfilerIpc eventInit(clientEvent) failed").raise(); - } - - sharedMemory->eventFini(&header->clientEvent); - } - - if (sharedMemory->eventInit(&header->clientEvent) != FB_SUCCESS) - (Arg::Gds(isc_random) << "ProfilerIpc eventInit(clientEvent) failed").raise(); -} - - -//-------------------------------------- - - ProfilerListener::ProfilerListener(thread_db* tdbb) : attachment(tdbb->getAttachment()), + chatServer(buildParameters(tdbb, attachment->att_attachment_id)), cleanupSync(*attachment->att_pool, watcherThread, THREAD_medium) { - auto& pool = *attachment->att_pool; - - ipc = FB_NEW_POOL(pool) ProfilerIpc(tdbb, pool, attachment->att_attachment_id, true); - cleanupSync.run(this); - startupSemaphore.enter(); } ProfilerListener::~ProfilerListener() { - exiting = true; + chatServer.disconnect(); // Terminate the watcher thread. - - if (ipc) - { - auto& sharedMemory = ipc->sharedMemory; - sharedMemory->eventPost(&sharedMemory->getHeader()->serverEvent); - - cleanupSync.waitForCompletion(); - } + cleanupSync.waitForCompletion(); } void ProfilerListener::exceptionHandler(const Exception& ex, ThreadFinishSync::ThreadRoutine*) @@ -958,223 +751,132 @@ void ProfilerListener::exceptionHandler(const Exception& ex, ThreadFinishSyncsharedMemory; - const auto header = sharedMemory->getHeader(); - - fb_assert(header->tag == ProfilerIpc::Tag::NOP); - header->tag = ProfilerIpc::Tag::SERVER_STARTED; - try { - while (!exiting) + while (!chatServer.isDisconnected()) { - const SLONG serverEventCounter = sharedMemory->eventClear(&header->serverEvent); - - if (startup) - { - startup = false; - startupSemaphore.release(); - } - else - { - ProfilerIpc::Tag tag; - unsigned seq; - UCharBuffer buffer; + const auto requestMessageOpt = chatServer.receive(); + if (!requestMessageOpt.has_value()) + continue; - fb_assert(header->tag >= ProfilerIpc::Tag::FIRST_CLIENT_OP); + const auto& [requestMessage, clientAddress] = requestMessageOpt.value(); + IpcResponseMessage responseMessage; - try - { + try + { + { // scope FbLocalStatus statusVector; EngineContextHolder tdbb(&statusVector, attachment->getInterface(), FB_FUNCTION); - { // scope - std::unique_lock bufferMutexLock(header->bufferMutex); - - if (header->userName[0] && attachment->getUserName() != header->userName) - status_exception::raise(Arg::Gds(isc_miss_prvlg) << "PROFILE_ANY_ATTACHMENT"); - - fb_assert(header->tag >= ProfilerIpc::Tag::FIRST_CLIENT_OP); - - tag = header->tag; - seq = header->seq; - memcpy(buffer.getBuffer(header->bufferSize, false), header->buffer, header->bufferSize); - } - - processCommand(tdbb, tag, buffer); - - tag = ProfilerIpc::Tag::RESPONSE; + responseMessage = processCommand(tdbb, requestMessage); } - catch (const status_exception& e) - { - tag = ProfilerIpc::Tag::EXCEPTION; - - //// TODO: Serialize status vector instead of formated message. - const ISC_STATUS* status = e.value(); - string errorMsg; - TEXT temp[BUFFER_LARGE]; + chatServer.sendTo(clientAddress, responseMessage); + } + catch (const status_exception& e) + { + //// TODO: Serialize status vector instead of formated message. - while (fb_interpret(temp, sizeof(temp), &status)) - { - if (errorMsg.hasData()) - errorMsg += "\n\t"; + const ISC_STATUS* status = e.value(); + string errorMsg; + TEXT temp[BUFFER_LARGE]; - errorMsg += temp; - } + while (fb_interpret(temp, sizeof(temp), &status)) + { + if (errorMsg.hasData()) + errorMsg += "\n\t"; - buffer.getBuffer(MIN(errorMsg.length(), sizeof(header->buffer)), false); - memcpy(buffer.begin(), errorMsg.c_str(), buffer.getCount()); + errorMsg += temp; } - fb_assert(buffer.getCount() <= sizeof(header->buffer)); - - { // scope - std::unique_lock bufferMutexLock(header->bufferMutex, std::try_to_lock); - - // Otherwise a client lost interest in the response. - if (bufferMutexLock.owns_lock() && header->seq == seq) - { - if (header->seq == seq) - { - header->tag = tag; - header->bufferSize = buffer.getCount(); - memcpy(header->buffer, buffer.begin(), buffer.getCount()); - - sharedMemory->eventPost(&header->clientEvent); - } - } - } - } + ExceptionResponse exceptionResponse; + const auto errorLen = MIN(errorMsg.length(), sizeof(exceptionResponse.text) - 1); - if (exiting) - break; + memcpy(exceptionResponse.text, errorMsg.c_str(), errorLen); + exceptionResponse.text[errorLen] = '\0'; - sharedMemory->eventWait(&header->serverEvent, serverEventCounter, 0); + chatServer.sendTo(clientAddress, exceptionResponse); + } } } catch (const Exception& ex) { iscLogException("Error in profiler watcher thread\n", ex); } - - { // scope - std::unique_lock bufferMutexLock(header->bufferMutex); - - if (header->tag >= ProfilerIpc::Tag::FIRST_CLIENT_OP) - { - fb_assert(header->clientEvent.event_pid); - sharedMemory->eventPost(&header->clientEvent); - } - - header->tag = ProfilerIpc::Tag::SERVER_EXITED; - } - - try - { - if (startup) - startupSemaphore.release(); - } - catch (const Exception& ex) - { - exceptionHandler(ex, nullptr); - } } -void ProfilerListener::processCommand(thread_db* tdbb, ProfilerIpc::Tag tag, UCharBuffer& buffer) +IpcResponseMessage ProfilerListener::processCommand(thread_db* tdbb, const IpcRequestMessage& requestMessage) { const auto profilerManager = attachment->getProfilerManager(tdbb); - using Tag = ProfilerIpc::Tag; + return std::visit(StdVisitOverloads{ + [&](const CheckUserRequest& checkUser) -> IpcResponseMessage + { + if (attachment->getUserName() != checkUser.userName) + status_exception::raise(Arg::Gds(isc_miss_prvlg) << "PROFILE_ANY_ATTACHMENT"); + return Nothing{}; + }, - switch (tag) - { - case Tag::CANCEL_SESSION: - fb_assert(buffer.isEmpty()); + [&](const ProfilerPackage::CancelSessionInput::Type&) -> IpcResponseMessage + { profilerManager->cancelSession(); - buffer.resize(0); - break; + return Nothing{}; + }, - case Tag::DISCARD: - fb_assert(buffer.isEmpty()); + [&](const ProfilerPackage::DiscardInput::Type&) -> IpcResponseMessage + { profilerManager->discard(); - buffer.resize(0); - break; + return Nothing{}; + }, - case Tag::FINISH_SESSION: + [&](const ProfilerPackage::FinishSessionInput::Type& message) -> IpcResponseMessage { - const auto in = reinterpret_cast(buffer.begin()); - fb_assert(sizeof(*in) == buffer.getCount()); - - profilerManager->finishSession(tdbb, in->flush); - - buffer.resize(0); - break; - } + profilerManager->finishSession(tdbb, message.flush); + return Nothing{}; + }, - case Tag::FLUSH: - fb_assert(buffer.isEmpty()); + [&](const ProfilerPackage::FlushInput::Type&) -> IpcResponseMessage + { profilerManager->flush(); - buffer.resize(0); - break; + return Nothing{}; + }, - case Tag::PAUSE_SESSION: + [&](const ProfilerPackage::PauseSessionInput::Type& message) -> IpcResponseMessage { - const auto in = reinterpret_cast(buffer.begin()); - fb_assert(sizeof(*in) == buffer.getCount()); - - profilerManager->pauseSession(in->flush); - - buffer.resize(0); - break; - } + profilerManager->pauseSession(message.flush); + return Nothing{}; + }, - case Tag::RESUME_SESSION: - fb_assert(buffer.isEmpty()); + [&](const ProfilerPackage::ResumeSessionInput::Type&) -> IpcResponseMessage + { profilerManager->resumeSession(); - buffer.resize(0); - break; + return Nothing{}; + }, - case Tag::SET_FLUSH_INTERVAL: + [&](const ProfilerPackage::SetFlushIntervalInput::Type& message) -> IpcResponseMessage { - const auto in = reinterpret_cast(buffer.begin()); - fb_assert(sizeof(*in) == buffer.getCount()); - - profilerManager->setFlushInterval(in->flushInterval); - - buffer.resize(0); - break; - } + profilerManager->setFlushInterval(message.flushInterval); + return Nothing{}; + }, - case Tag::START_SESSION: + [&](const ProfilerPackage::StartSessionInput::Type& message) -> IpcResponseMessage { - const auto in = reinterpret_cast(buffer.begin()); - fb_assert(sizeof(*in) == buffer.getCount()); - - const string description(in->description.str, - in->descriptionNull ? 0 : in->description.length); - const std::optional flushInterval(in->flushIntervalNull ? - std::nullopt : std::optional{in->flushInterval}); - const PathName pluginName(in->pluginName.str, - in->pluginNameNull ? 0 : in->pluginName.length); - const string pluginOptions(in->pluginOptions.str, - in->pluginOptionsNull ? 0 : in->pluginOptions.length); - - ProfilerPackage::StartSessionOutput::Type* out; - out = reinterpret_cast(buffer.getBuffer(sizeof(*out), false)); - - out->sessionIdNull = FB_FALSE; - out->sessionId = profilerManager->startSession(tdbb, flushInterval, - pluginName, description, pluginOptions); - break; - } - - default: - fb_assert(false); - (Arg::Gds(isc_random) << "Invalid profiler's remote command").raise(); - break; - } + const string description(message.description.str, + message.descriptionNull ? 0 : message.description.length); + const std::optional flushInterval(message.flushIntervalNull ? + std::nullopt : std::optional{message.flushInterval}); + const PathName pluginName(message.pluginName.str, + message.pluginNameNull ? 0 : message.pluginName.length); + const string pluginOptions(message.pluginOptions.str, + message.pluginOptionsNull ? 0 : message.pluginOptions.length); + + return ProfilerPackage::StartSessionOutput::Type{ + .sessionId = profilerManager->startSession(tdbb, flushInterval, + pluginName, description, pluginOptions), + .sessionIdNull = FB_FALSE, + }; + }, + }, requestMessage); } diff --git a/src/jrd/ProfilerManager.h b/src/jrd/ProfilerManager.h index d1344a69fdf..d5d972d7ba5 100644 --- a/src/jrd/ProfilerManager.h +++ b/src/jrd/ProfilerManager.h @@ -360,28 +360,28 @@ class ProfilerPackage final : public SystemPackage ProfilerPackage(const ProfilerPackage&) = delete; ProfilerPackage& operator=(const ProfilerPackage&) = delete; -private: - FB_MESSAGE(AttachmentIdMessage, Firebird::ThrowStatusExceptionWrapper, +public: + FB_MESSAGE(DiscardInput, Firebird::ThrowStatusExceptionWrapper, (FB_BIGINT, attachmentId) ); - //---------- - - using DiscardInput = AttachmentIdMessage; - static Firebird::IExternalResultSet* discardProcedure(Firebird::ThrowStatusExceptionWrapper* status, Firebird::IExternalContext* context, const DiscardInput::Type* in, void* out); //---------- - using FlushInput = AttachmentIdMessage; + FB_MESSAGE(FlushInput, Firebird::ThrowStatusExceptionWrapper, + (FB_BIGINT, attachmentId) + ); static Firebird::IExternalResultSet* flushProcedure(Firebird::ThrowStatusExceptionWrapper* status, Firebird::IExternalContext* context, const FlushInput::Type* in, void* out); //---------- - using CancelSessionInput = AttachmentIdMessage; + FB_MESSAGE(CancelSessionInput, Firebird::ThrowStatusExceptionWrapper, + (FB_BIGINT, attachmentId) + ); static Firebird::IExternalResultSet* cancelSessionProcedure(Firebird::ThrowStatusExceptionWrapper* status, Firebird::IExternalContext* context, const CancelSessionInput::Type* in, void* out); @@ -408,7 +408,9 @@ class ProfilerPackage final : public SystemPackage //---------- - using ResumeSessionInput = AttachmentIdMessage; + FB_MESSAGE(ResumeSessionInput, Firebird::ThrowStatusExceptionWrapper, + (FB_BIGINT, attachmentId) + ); static Firebird::IExternalResultSet* resumeSessionProcedure(Firebird::ThrowStatusExceptionWrapper* status, Firebird::IExternalContext* context, const ResumeSessionInput::Type* in, void* out); diff --git a/src/lock/lock.cpp b/src/lock/lock.cpp index 0f376ad781c..b04a7dee847 100644 --- a/src/lock/lock.cpp +++ b/src/lock/lock.cpp @@ -1048,7 +1048,7 @@ void LockManager::acquire_shmem(SRQ_PTR owner_offset) ULONG spins = 0; while (spins++ < spins_to_try) { - if (m_sharedMemory->mutexLockCond()) + if (m_sharedMemory->mutexTryLock()) { locked = true; break;