From c98cab8c0850870d309e6a0b9d4a89bd9e43011a Mon Sep 17 00:00:00 2001 From: frost-intel Date: Mon, 9 Jun 2025 21:40:03 +0000 Subject: [PATCH 01/15] Add initial XPU support for FlightRecorder --- src/BuildOnLinux.cmake | 2 + src/xccl/FlightRecorderXCCL.cpp | 21 ++++++ src/xccl/ProcessGroupXCCL.cpp | 129 +++++++++++++++++++++++++++++++- src/xccl/ProcessGroupXCCL.hpp | 29 ++++++- 4 files changed, 179 insertions(+), 2 deletions(-) create mode 100644 src/xccl/FlightRecorderXCCL.cpp diff --git a/src/BuildOnLinux.cmake b/src/BuildOnLinux.cmake index aee7118f01..87b0fe3454 100644 --- a/src/BuildOnLinux.cmake +++ b/src/BuildOnLinux.cmake @@ -16,6 +16,7 @@ macro(setup_common_libraries) if(USE_C10D_XCCL) target_compile_definitions(torch_xpu_ops PRIVATE USE_C10D_XCCL) target_link_libraries(torch_xpu_ops PUBLIC torch::xccl) + target_link_libraries(torch_xpu_ops PUBLIC fmt::fmt-header-only) endif() list(APPEND TORCH_XPU_OPS_LIBRARIES torch_xpu_ops) endmacro() @@ -125,6 +126,7 @@ else() if(USE_C10D_XCCL) target_compile_definitions(torch_xpu_ops PRIVATE USE_C10D_XCCL) target_link_libraries(torch_xpu_ops PUBLIC torch::xccl) + target_link_libraries(torch_xpu_ops PUBLIC fmt::fmt-header-only) endif() install(TARGETS torch_xpu_ops DESTINATION "${TORCH_INSTALL_LIB_DIR}") diff --git a/src/xccl/FlightRecorderXCCL.cpp b/src/xccl/FlightRecorderXCCL.cpp new file mode 100644 index 0000000000..29fccd9907 --- /dev/null +++ b/src/xccl/FlightRecorderXCCL.cpp @@ -0,0 +1,21 @@ +#ifdef USE_C10D_XCCL + +#include +#include +#include + +namespace c10d { + +template <> +float getDurationFromEvent( + at::xpu::XPUEvent& xcclStartEvent, + at::xpu::XPUEvent& xcclEndEvent) { + TORCH_CHECK( + xcclEndEvent.query(), + "getDuration can only be called after work is succeeded.") + return xcclStartEvent.elapsed_time(xcclEndEvent); +} + +template struct FlightRecorder; +} // namespace c10d +#endif // USE_C10D_XCCL diff --git a/src/xccl/ProcessGroupXCCL.cpp b/src/xccl/ProcessGroupXCCL.cpp index c778bf5968..8fd4902404 100644 --- a/src/xccl/ProcessGroupXCCL.cpp +++ b/src/xccl/ProcessGroupXCCL.cpp @@ -1,10 +1,13 @@ #ifdef USE_C10D_XCCL #include +#include #include namespace c10d { +using FlightRecorderXCCL = FlightRecorder; + namespace { #if defined(CCL_MAJOR_VERSION) && \ @@ -243,6 +246,7 @@ ProcessGroupXCCL::WorkXCCL::WorkXCCL( workStartTime_(std::chrono::steady_clock::now()), seq_(seq), isP2P_(isP2P) { + xcclStartEvent_ = std::make_shared(); xcclEndEvent_ = std::make_shared(); stashed_for_allocator_safety_ = std::make_shared(); } @@ -250,6 +254,7 @@ ProcessGroupXCCL::WorkXCCL::WorkXCCL( ProcessGroupXCCL::WorkXCCL::WorkXCCL(const WorkXCCL& w) : Work(w.rank_, w.opType_), device_(w.device_), + xcclStartEvent_(w.xcclStartEvent_), xcclEndEvent_(w.xcclEndEvent_), blockingWait_(w.blockingWait_), workStartTime_(w.workStartTime_), @@ -331,13 +336,19 @@ const std::string& ProcessGroupXCCL::logPrefix() const { ProcessGroupXCCL::ProcessGroupXCCL( const c10::intrusive_ptr& store, int rank, - int size) + int size, + c10::intrusive_ptr options) : Backend(rank, size), store_(store), + options_(std::move(options)), xcclCommCounter_(0), local_id_(process_group_id++) { logPrefix_ = createLogPrefix(); blockingWait_ = getCvarBool(TORCH_XCCL_BLOCKING_WAIT, false); + this->setGroupUid(options_->group_name); + + FlightRecorderXCCL::get()->record_pg_ranks( + std::make_tuple(pg_uid_, pg_desc_), groupRanks()); init(); const std::string OFF = "OFF"; std::string torch_distributed_debug = @@ -354,6 +365,15 @@ ProcessGroupXCCL::ProcessGroupXCCL( ProcessGroupXCCL::~ProcessGroupXCCL() = default; +const std::vector& ProcessGroupXCCL::groupRanks() const { + if (options_->global_ranks_in_group.empty()) { + static std::vector globalRanks(size_); + std::iota(globalRanks.begin(), globalRanks.end(), 0); + return globalRanks; + } + return options_->global_ranks_in_group; +} + void ProcessGroupXCCL::setSequenceNumberForGroup() {} uint64_t ProcessGroupXCCL::getSequenceNumberForGroup() { @@ -377,6 +397,21 @@ c10::intrusive_ptr ProcessGroupXCCL::initWork( profilingTitle, profilingTitle != nullptr ? std::optional>(inputs) : std::nullopt); + + r->trace_id_ = FlightRecorderXCCL::get()->record( + local_id_, + std::make_tuple(pg_uid_, pg_desc_), // PG name tuple + seqCollective_, + seqP2P_, + op_id_, + profilingTitle ? profilingTitle : "", + inputs, + outputs, + r->xcclStartEvent_.get(), + r->xcclEndEvent_.get(), + options_->timeout, + pgStatus_, + isP2P); return r; } @@ -489,6 +524,9 @@ void ProcessGroupXCCL::groupEnd() { --xcclActiveGroupCounter_; } +ProcessGroupXCCL::Options::Options() + : Backend::Options(XCCL_BACKEND_NAME) {} + static constexpr int CoalActive = 0x01, CoalColl = 0x02, CoalP2P = 0x04; void ProcessGroupXCCL::startCoalescing() { coalescedDevice_.set_index(-1); @@ -498,6 +536,24 @@ void ProcessGroupXCCL::startCoalescing() { groupStart(); } +void ProcessGroupXCCL::setStartedPgStatus( + c10::intrusive_ptr work) { + pgStatus_->lastStartedSeq = static_cast(work->getSequencenumber()); + pgStatus_->lastStartedWorkName = opTypeToString(work->opType_); + pgStatus_->lastStartedNumelIn = work->numelIn_; + pgStatus_->lastStartedNumelOut = work->numelOut_; +} + +// TODO: Add queue/thread to wait for completed work and mark status +// void ProcessGroupXCCL::setCompletedPgStatus( +// c10::intrusive_ptr work) { +// pgStatus_->lastCompletedSeq = static_cast(work->getSequencenumber()); +// pgStatus_->lastCompletedWorkName = opTypeToString(work->opType_); +// pgStatus_->lastCompletedNumelIn = work->numelIn_; +// pgStatus_->lastCompletedNumelOut = work->numelOut_; +// FlightRecorderXCCL::get()->retire_id(work.trace_id_, true); +// } + c10::intrusive_ptr ProcessGroupXCCL::endCoalescing(OpType optype) { if (coalescedComm_ == nullptr) { // There is no actual work being coalesced, return here @@ -531,6 +587,7 @@ c10::intrusive_ptr ProcessGroupXCCL::endCoalescing(OpType optype) { groupEnd(); work->xcclEndEvent_->record(stream); + setStartedPgStatus(work); coalescing_state_ = 0; coalescedComm_ = nullptr; @@ -563,6 +620,7 @@ c10::intrusive_ptr ProcessGroupXCCL::collective( if ((coalescing_state_ & CoalColl) == 0) { seqCollective_++; } + op_id_++; coalescing_state_ |= CoalColl; if (coalescedDevice_.index() < 0) { coalescedDevice_ = device; @@ -605,6 +663,22 @@ c10::intrusive_ptr ProcessGroupXCCL::collective( c10::intrusive_ptr work; work = initWork(device, rank_, opType, false, profilingTitle, inputs, outputs); + if (coalescing_state_) { + FlightRecorderXCCL::get()->record( + local_id_, + std::make_tuple(pg_uid_, pg_desc_), // PG name tuple + seqCollective_, + seqP2P_, + op_id_, + profilingTitle ? profilingTitle : "", + inputs, + outputs, + nullptr, + nullptr, + options_->timeout, + pgStatus_, + false); + } work->outputs_ = std::make_shared>(outputs); @@ -640,6 +714,16 @@ c10::intrusive_ptr ProcessGroupXCCL::collective( work->future_->markCompleted(at::IValue(*work->outputs_)); work->blockingWait_ = blockingWait_; + work->numelIn_ = 0; + work->numelOut_ = 0; + for (const auto& input : inputs) { + work->numelIn_ += input.numel(); + } + for (const auto& output : outputs) { + work->numelOut_ += output.numel(); + } + setStartedPgStatus(work); + return asyncOp ? work : nullptr; } @@ -672,6 +756,7 @@ c10::intrusive_ptr ProcessGroupXCCL::pointToPoint( } } + op_id_++; auto comm = getXCCLComm(key, device, opType, p2pRank, isSendRecvSelf); if (coalescing_state_ & CoalActive) { @@ -703,6 +788,21 @@ c10::intrusive_ptr ProcessGroupXCCL::pointToPoint( work->outputs_ = std::make_shared>(); work->outputs_->push_back(tensor); + work->trace_id_ = FlightRecorderXCCL::get()->record( + local_id_, + std::make_tuple(pg_uid_, pg_desc_), // PG name tuple + seqCollective_, + seqP2P_, + op_id_, + profilingTitle, + {tensor}, + {tensor}, + work->xcclStartEvent_.get(), + work->xcclEndEvent_.get(), + options_->timeout, + pgStatus_, + true); + c10::OptionalDeviceGuard gpuGuard(device); c10::xpu::XPUCachingAllocator::recordStream( @@ -718,8 +818,25 @@ c10::intrusive_ptr ProcessGroupXCCL::pointToPoint( work->future_ = c10::make_intrusive( c10::ListType::create(c10::TensorType::get()), devices); work->future_->markCompleted(at::IValue(*work->outputs_)); + + work->numelIn_ = work->numelOut_ = tensor.numel(); + setStartedPgStatus(work); return work; } else { + FlightRecorderXCCL::get()->record( + local_id_, + std::make_tuple(pg_uid_, pg_desc_), // PG name tuple + seqCollective_, + seqP2P_, + op_id_, + profilingTitle, + {tensor}, + {tensor}, + nullptr, + nullptr, + options_->timeout, + pgStatus_, + true); c10::OptionalDeviceGuard gpuGuard(device); c10::xpu::XPUCachingAllocator::recordStream( @@ -2096,3 +2213,13 @@ c10::intrusive_ptr ProcessGroupXCCL::alltoall( } // namespace c10d #endif // USE_C10D_XCCL + +void printNcclCommProxyTrace( + const std::string& dumpReason, + const std::unordered_map& dumpMap) { + LOG(INFO) << "Dumping nccl comm trace, reason: " << dumpReason; + for (auto& [key, value] : dumpMap) { + LOG(INFO) << "key: " << key << ", value: " << value; + } + LOG(INFO) << "----------------------"; +} diff --git a/src/xccl/ProcessGroupXCCL.hpp b/src/xccl/ProcessGroupXCCL.hpp index e085efb536..84bc96a2a7 100644 --- a/src/xccl/ProcessGroupXCCL.hpp +++ b/src/xccl/ProcessGroupXCCL.hpp @@ -19,6 +19,7 @@ #include #include #include +#include #include namespace c10d { @@ -91,12 +92,16 @@ class TORCH_API ProcessGroupXCCL : public Backend { protected: at::Device device_; + std::shared_ptr xcclStartEvent_; std::shared_ptr xcclEndEvent_; bool isBarrierOp_{false}; bool blockingWait_{false}; std::chrono::time_point workStartTime_; uint64_t seq_; bool isP2P_; + std::optional trace_id_; + size_t numelIn_ = -1; + size_t numelOut_ = -1; private: std::shared_ptr> outputs_; @@ -105,7 +110,22 @@ class TORCH_API ProcessGroupXCCL : public Backend { friend class ProcessGroupXCCL; }; - ProcessGroupXCCL(const c10::intrusive_ptr& store, int rank, int size); + struct Options : public Backend::Options { + explicit Options(); + + static c10::intrusive_ptr create() { + return c10::make_intrusive(); + } + + std::vector global_ranks_in_group; + std::string group_name; + }; + + ProcessGroupXCCL( + const c10::intrusive_ptr& store, + int rank, + int size, + c10::intrusive_ptr options = Options::create()); C10_DEPRECATED ProcessGroupXCCL( const c10::intrusive_ptr& store, @@ -368,6 +388,9 @@ class TORCH_API ProcessGroupXCCL : public Backend { const std::string& logPrefix() const; c10::DeviceIndex guessDeviceId() const; + const std::vector& groupRanks() const; + + void setStartedPgStatus(c10::intrusive_ptr work); protected: std::unordered_map> @@ -375,6 +398,7 @@ class TORCH_API ProcessGroupXCCL : public Backend { std::unordered_map xcclEventsMap_; std::unordered_map> devXCCLCommMap_; c10::intrusive_ptr store_; + const c10::intrusive_ptr options_; uint64_t xcclCommCounter_{0}; std::mutex mutex_; std::set usedDeviceIdxs_; @@ -387,8 +411,11 @@ class TORCH_API ProcessGroupXCCL : public Backend { static thread_local uint64_t xcclActiveGroupCounter_; uint64_t seqCollective_{0}; uint64_t seqP2P_{0}; + uint64_t op_id_{0}; size_t local_id_; std::string logPrefix_; + std::shared_ptr pgStatus_ = + std::make_shared(); private: std::mutex kvs_mutex; From a75e8c0c0df1208994a46dc517f22f6c9893451f Mon Sep 17 00:00:00 2001 From: frost-intel Date: Mon, 16 Jun 2025 15:40:37 +0000 Subject: [PATCH 02/15] Build with FR --- src/xccl/ProcessGroupXCCL.cpp | 114 ++++++++++++++++++++++++++++++---- src/xccl/ProcessGroupXCCL.hpp | 102 ++++++++++++++++++++++++++++++ 2 files changed, 205 insertions(+), 11 deletions(-) diff --git a/src/xccl/ProcessGroupXCCL.cpp b/src/xccl/ProcessGroupXCCL.cpp index 8fd4902404..2aa55cc2e7 100644 --- a/src/xccl/ProcessGroupXCCL.cpp +++ b/src/xccl/ProcessGroupXCCL.cpp @@ -345,6 +345,7 @@ ProcessGroupXCCL::ProcessGroupXCCL( local_id_(process_group_id++) { logPrefix_ = createLogPrefix(); blockingWait_ = getCvarBool(TORCH_XCCL_BLOCKING_WAIT, false); + traceBufferSize_ = getCvarInt(TORCH_XCCL_TRACE_BUFFER_SIZE, 2000); this->setGroupUid(options_->group_name); FlightRecorderXCCL::get()->record_pg_ranks( @@ -361,9 +362,110 @@ ProcessGroupXCCL::ProcessGroupXCCL( << "XCCL version: " << XcclVersion << ", TORCH_XCCL_BLOCKING_WAIT: " << blockingWait_ << ", TORCH_DISTRIBUTED_DEBUG: " << torch_distributed_debug; + + heartbeatMonitor_ = std::make_unique(this); + heartbeatMonitor_->start(); +} + +ProcessGroupXCCL::~ProcessGroupXCCL() { + heartbeatMonitor_->stop(); + heartbeatMonitor_->join(); } -ProcessGroupXCCL::~ProcessGroupXCCL() = default; +bool ProcessGroupXCCL::dumpDebuggingInfo(bool includeStackTrace /*=true*/) { + STATIC_SCOPED_WAIT_COUNTER(pytorch.ProcessGroupXCCL__dumpDebuggingInfo); + static std::mutex writeDebugInfoMutex; + std::lock_guard lock(writeDebugInfoMutex); + LOG(ERROR) + << logPrefix() + << "ProcessGroupNCCL preparing to dump debug info. Include stack trace: " + << includeStackTrace; + if (traceBufferSize_ > 0) { + // TODO: dump_xccl_trace + auto xcclDumpMap = std::unordered_map< + std::string, + std::unordered_map>(); + auto xcclTrace = FlightRecorderXCCL::get()->dump( + xcclDumpMap, true, includeStackTrace, false); + DebugInfoWriter& writer = DebugInfoWriter::getWriter(local_id_); + LOG(INFO) << logPrefix() << "ProcessGroupXCCL dumping xccl trace to " + << writer.getWriterTarget(); + writer.write(xcclTrace); + LOG(INFO) << logPrefix() << "Flight Recorder trace successfully dumped."; + return true; + } + return false; +} + +ProcessGroupXCCL::HeartbeatMonitor::HeartbeatMonitor(ProcessGroupXCCL* pg) { + pg_ = pg; + coordCheckIntervalMilSec_ = getCvarInt(TORCH_XCCL_COORD_CHECK_MILSEC, 1000); + LOG(INFO) + << pg_->logPrefix() << "HeartbeatMonitor environments: " + << "TORCH_XCCL_COOR_CHECK_MILSEC: " << coordCheckIntervalMilSec_; +} + +std::string ProcessGroupXCCL::HeartbeatMonitor::getXCCLTimeoutErrorMsg(const std::string& extraMsg) { + return c10::str( + pg_->logPrefix(), + "Received a dump signal due to a collective timeout from ", + extraMsg, + " and we will try our best to dump the debug info. ", + "Last enqueued XCCL work: ", + pg_->pgStatus_->lastEnqueuedSeq, + ", last completed XCCL work: ", + pg_->pgStatus_->lastCompletedSeq, + "."); +} + +void ProcessGroupXCCL::HeartbeatMonitor::stop() { + terminateHeartbeatMonitorThread_.store(true); + monitorWakeUpCV_.notify_one(); +} + +void ProcessGroupXCCL::HeartbeatMonitor::start() { + TORCH_CHECK( + !xcclHeartbeatMonitorThread_.joinable(), + "HeartbeatMonitor thread already started"); + xcclHeartbeatMonitorThread_ = + std::thread(&ProcessGroupXCCL::HeartbeatMonitor::runLoop, this); +} + +void ProcessGroupXCCL::HeartbeatMonitor::join() { + if (xcclHeartbeatMonitorThread_.joinable()) { + xcclHeartbeatMonitorThread_.join(); + LOG(INFO) << pg_->logPrefix() + << "ProcessGroupNCCL heart beat monitor thread joined."; + } +} + +void ProcessGroupXCCL::HeartbeatMonitor::runLoop() { + c10::setThreadName("pt_xccl_heartbt"); + + std::string errorMsg; + std::string exitReason; + bool checkDumpSignal = pg_->local_id_ == 0; + int monitorPollInterval = coordCheckIntervalMilSec_; + std::optional dumpPipe = std::nullopt; + if (pg_->local_id_ == 0) { + // DumpPipe is one per-trainer process + dumpPipe.emplace(pg_->local_id_); + while (true) { + std::unique_lock lock(monitorMutex_); + if (monitorWakeUpCV_.wait_for( + lock, std::chrono::milliseconds(monitorPollInterval), [&] { + return terminateHeartbeatMonitorThread_.load(); + })) { + return; + } + if (dumpPipe.has_value() && dumpPipe->shouldDump()) { + std::future fut = std::async(std::launch::async, [this]() { + return this->pg_->dumpDebuggingInfo(); + }); + } + } + } +} const std::vector& ProcessGroupXCCL::groupRanks() const { if (options_->global_ranks_in_group.empty()) { @@ -544,16 +646,6 @@ void ProcessGroupXCCL::setStartedPgStatus( pgStatus_->lastStartedNumelOut = work->numelOut_; } -// TODO: Add queue/thread to wait for completed work and mark status -// void ProcessGroupXCCL::setCompletedPgStatus( -// c10::intrusive_ptr work) { -// pgStatus_->lastCompletedSeq = static_cast(work->getSequencenumber()); -// pgStatus_->lastCompletedWorkName = opTypeToString(work->opType_); -// pgStatus_->lastCompletedNumelIn = work->numelIn_; -// pgStatus_->lastCompletedNumelOut = work->numelOut_; -// FlightRecorderXCCL::get()->retire_id(work.trace_id_, true); -// } - c10::intrusive_ptr ProcessGroupXCCL::endCoalescing(OpType optype) { if (coalescedComm_ == nullptr) { // There is no actual work being coalesced, return here diff --git a/src/xccl/ProcessGroupXCCL.hpp b/src/xccl/ProcessGroupXCCL.hpp index 84bc96a2a7..b7ae3ba920 100644 --- a/src/xccl/ProcessGroupXCCL.hpp +++ b/src/xccl/ProcessGroupXCCL.hpp @@ -27,6 +27,80 @@ static std::vector TORCH_XCCL_BLOCKING_WAIT = { "TORCH_XCCL_BLOCKING_WAIT", "XCCL_BLOCKING_WAIT"}; +static std::vector TORCH_XCCL_LOG_CPP_STACK_ON_UNCLEAN_SHUTDOWN = { + "TORCH_XCCL_LOG_CPP_STACK_ON_UNCLEAN_SHUTDOWN", + "XCCL_LOG_CPP_STACK_ON_UNCLEAN_SHUTDOWN"}; + +static std::vector TORCH_XCCL_TRACE_BUFFER_SIZE = { + "TORCH_XCCL_TRACE_BUFFER_SIZE", + "XCCL_TRACE_BUFFER_SIZE"}; + +static std::vector TORCH_XCCL_COORD_CHECK_MILSEC = { + "TORCH_XCCL_COORD_CHECK_MILSEC", + "XCCL_COORD_CHECK_MILSEC"}; + +static std::vector TORCH_XCCL_DEBUG_INFO_PIPE_FILE = { + "TORCH_XCCL_DEBUG_INFO_PIPE_FILE", + "XCCL_DEBUG_INFO_PIPE_FILE"}; + +#if defined(__linux__) +struct DumpPipe { + DumpPipe(int rank) { + std::string fileStem = + getCvarString(TORCH_XCCL_DEBUG_INFO_PIPE_FILE, ""); + if (fileStem.empty() || + getCvarInt(TORCH_XCCL_TRACE_BUFFER_SIZE, 0) <= 0) { + return; + } + TORCH_CHECK(!fileStem.empty(), "TORCH_XCCL_DEBUG_INFO_PIPE_FILE is empty"); + std::string filename = c10::str(fileStem, rank, ".pipe"); + TORCH_CHECK( + unlink(filename.c_str()) != -1 || errno == ENOENT, + "Error removing existing named pipe ", + filename, + ", Error: ", + std::strerror(errno)); + TORCH_CHECK( + mkfifo(filename.c_str(), 0666) != -1, + "Error creating named pipe ", + filename, + ", Error: ", + std::strerror(errno)); + fd_ = open(filename.c_str(), O_RDONLY | O_NONBLOCK); + LOG(INFO) << "Pipe file " << filename + << " has been opened, write to it to trigger XCCL Debug Dump."; + TORCH_CHECK(fd_ != -1, "Error opening named pipe ", filename); + } + bool shouldDump() { + if (fd_ == -1) { + return false; + } + // NOLINTNEXTLINE(*array*) + char buf[128]{}; + // non-blocking from O_NONBLOCK above. + // Ignore EINTR because we already will poll this + // again later. + ssize_t bytesRead = read(fd_, &buf, 128); + return bytesRead > 0; + } + ~DumpPipe() { + if (fd_ != -1) { + close(fd_); + } + } + + private: + int fd_ = -1; +}; +#else +struct DumpPipe { + DumpPipe(int rank) {} + bool shouldDump() { + return false; + } +}; +#endif + using xcclComm_t = ccl::communicator; constexpr const char* XCCL_BACKEND_NAME = "xccl"; @@ -121,6 +195,28 @@ class TORCH_API ProcessGroupXCCL : public Backend { std::string group_name; }; + class HeartbeatMonitor { + public: + HeartbeatMonitor(ProcessGroupXCCL* pg); + virtual ~HeartbeatMonitor() = default; + + std::string getXCCLTimeoutErrorMsg(const std::string& extraMsg); + void start(); + void join(); + virtual void runLoop(); + void stop(); + + protected: + ProcessGroupXCCL* pg_; + + private: + int coordCheckIntervalMilSec_; + std::condition_variable monitorWakeUpCV_; + std::mutex monitorMutex_; + std::thread xcclHeartbeatMonitorThread_; + std::atomic terminateHeartbeatMonitorThread_{false}; + }; + ProcessGroupXCCL( const c10::intrusive_ptr& store, int rank, @@ -388,10 +484,14 @@ class TORCH_API ProcessGroupXCCL : public Backend { const std::string& logPrefix() const; c10::DeviceIndex guessDeviceId() const; + const int& globalRank() const; + const std::vector& groupRanks() const; void setStartedPgStatus(c10::intrusive_ptr work); + bool dumpDebuggingInfo(bool includeStackTrace = true); + protected: std::unordered_map> xcclStreamsMap_; @@ -416,6 +516,8 @@ class TORCH_API ProcessGroupXCCL : public Backend { std::string logPrefix_; std::shared_ptr pgStatus_ = std::make_shared(); + std::unique_ptr heartbeatMonitor_; + int traceBufferSize_; private: std::mutex kvs_mutex; From a743d459af61c8817c5cb6a3c40d5529756946d6 Mon Sep 17 00:00:00 2001 From: frost-intel Date: Wed, 16 Jul 2025 16:14:50 +0000 Subject: [PATCH 03/15] Fix --- src/xccl/ProcessGroupXCCL.cpp | 30 +++++++++++++++++++++++++----- src/xccl/ProcessGroupXCCL.hpp | 15 ++++----------- 2 files changed, 29 insertions(+), 16 deletions(-) diff --git a/src/xccl/ProcessGroupXCCL.cpp b/src/xccl/ProcessGroupXCCL.cpp index 2aa55cc2e7..967da8e076 100644 --- a/src/xccl/ProcessGroupXCCL.cpp +++ b/src/xccl/ProcessGroupXCCL.cpp @@ -345,7 +345,7 @@ ProcessGroupXCCL::ProcessGroupXCCL( local_id_(process_group_id++) { logPrefix_ = createLogPrefix(); blockingWait_ = getCvarBool(TORCH_XCCL_BLOCKING_WAIT, false); - traceBufferSize_ = getCvarInt(TORCH_XCCL_TRACE_BUFFER_SIZE, 2000); + traceBufferSize_ = getCvarInt({"TORCH_FR_BUFFER_SIZE"}, 2000); this->setGroupUid(options_->group_name); FlightRecorderXCCL::get()->record_pg_ranks( @@ -378,7 +378,7 @@ bool ProcessGroupXCCL::dumpDebuggingInfo(bool includeStackTrace /*=true*/) { std::lock_guard lock(writeDebugInfoMutex); LOG(ERROR) << logPrefix() - << "ProcessGroupNCCL preparing to dump debug info. Include stack trace: " + << "ProcessGroupXCCL preparing to dump debug info. Include stack trace: " << includeStackTrace; if (traceBufferSize_ > 0) { // TODO: dump_xccl_trace @@ -387,7 +387,7 @@ bool ProcessGroupXCCL::dumpDebuggingInfo(bool includeStackTrace /*=true*/) { std::unordered_map>(); auto xcclTrace = FlightRecorderXCCL::get()->dump( xcclDumpMap, true, includeStackTrace, false); - DebugInfoWriter& writer = DebugInfoWriter::getWriter(local_id_); + DebugInfoWriter& writer = DebugInfoWriter::getWriter(rank_); LOG(INFO) << logPrefix() << "ProcessGroupXCCL dumping xccl trace to " << writer.getWriterTarget(); writer.write(xcclTrace); @@ -435,7 +435,7 @@ void ProcessGroupXCCL::HeartbeatMonitor::join() { if (xcclHeartbeatMonitorThread_.joinable()) { xcclHeartbeatMonitorThread_.join(); LOG(INFO) << pg_->logPrefix() - << "ProcessGroupNCCL heart beat monitor thread joined."; + << "ProcessGroupXCCL heart beat monitor thread joined."; } } @@ -449,7 +449,7 @@ void ProcessGroupXCCL::HeartbeatMonitor::runLoop() { std::optional dumpPipe = std::nullopt; if (pg_->local_id_ == 0) { // DumpPipe is one per-trainer process - dumpPipe.emplace(pg_->local_id_); + dumpPipe.emplace(pg_->rank_); while (true) { std::unique_lock lock(monitorMutex_); if (monitorWakeUpCV_.wait_for( @@ -459,6 +459,7 @@ void ProcessGroupXCCL::HeartbeatMonitor::runLoop() { return; } if (dumpPipe.has_value() && dumpPipe->shouldDump()) { + LOG(INFO) << pg_->logPrefix() << "Dump signal received through pipe."; std::future fut = std::async(std::launch::async, [this]() { return this->pg_->dumpDebuggingInfo(); }); @@ -646,6 +647,15 @@ void ProcessGroupXCCL::setStartedPgStatus( pgStatus_->lastStartedNumelOut = work->numelOut_; } +void ProcessGroupXCCL::setCompletedPgStatus( + c10::intrusive_ptr work) { + pgStatus_->lastCompletedSeq = static_cast(work->getSequencenumber()); + pgStatus_->lastCompletedWorkName = opTypeToString(work->opType_); + pgStatus_->lastCompletedNumelIn = work->numelIn_; + pgStatus_->lastCompletedNumelOut = work->numelOut_; + FlightRecorderXCCL::get()->retire_id(work->trace_id_, false); +} + c10::intrusive_ptr ProcessGroupXCCL::endCoalescing(OpType optype) { if (coalescedComm_ == nullptr) { // There is no actual work being coalesced, return here @@ -804,6 +814,11 @@ c10::intrusive_ptr ProcessGroupXCCL::collective( work->future_ = c10::make_intrusive( c10::ListType::create(c10::TensorType::get()), devices); work->future_->markCompleted(at::IValue(*work->outputs_)); + work->future_->addCallback( + [this, work](at::ivalue::Future&) { + this->setCompletedPgStatus(work); + } + ); work->blockingWait_ = blockingWait_; work->numelIn_ = 0; @@ -910,6 +925,11 @@ c10::intrusive_ptr ProcessGroupXCCL::pointToPoint( work->future_ = c10::make_intrusive( c10::ListType::create(c10::TensorType::get()), devices); work->future_->markCompleted(at::IValue(*work->outputs_)); + work->future_->addCallback( + [this, work](at::ivalue::Future&) { + this->setCompletedPgStatus(work); + } + ); work->numelIn_ = work->numelOut_ = tensor.numel(); setStartedPgStatus(work); diff --git a/src/xccl/ProcessGroupXCCL.hpp b/src/xccl/ProcessGroupXCCL.hpp index b7ae3ba920..ab534e485c 100644 --- a/src/xccl/ProcessGroupXCCL.hpp +++ b/src/xccl/ProcessGroupXCCL.hpp @@ -31,28 +31,20 @@ static std::vector TORCH_XCCL_LOG_CPP_STACK_ON_UNCLEAN_SHUTDOWN = { "TORCH_XCCL_LOG_CPP_STACK_ON_UNCLEAN_SHUTDOWN", "XCCL_LOG_CPP_STACK_ON_UNCLEAN_SHUTDOWN"}; -static std::vector TORCH_XCCL_TRACE_BUFFER_SIZE = { - "TORCH_XCCL_TRACE_BUFFER_SIZE", - "XCCL_TRACE_BUFFER_SIZE"}; - static std::vector TORCH_XCCL_COORD_CHECK_MILSEC = { "TORCH_XCCL_COORD_CHECK_MILSEC", "XCCL_COORD_CHECK_MILSEC"}; -static std::vector TORCH_XCCL_DEBUG_INFO_PIPE_FILE = { - "TORCH_XCCL_DEBUG_INFO_PIPE_FILE", - "XCCL_DEBUG_INFO_PIPE_FILE"}; - #if defined(__linux__) struct DumpPipe { DumpPipe(int rank) { std::string fileStem = - getCvarString(TORCH_XCCL_DEBUG_INFO_PIPE_FILE, ""); + getCvarString({"TORCH_FR_DEBUG_INFO_PIPE_FILE"}, ""); if (fileStem.empty() || - getCvarInt(TORCH_XCCL_TRACE_BUFFER_SIZE, 0) <= 0) { + getCvarInt({"TORCH_FR_BUFFER_SIZE"}, 0) <= 0) { return; } - TORCH_CHECK(!fileStem.empty(), "TORCH_XCCL_DEBUG_INFO_PIPE_FILE is empty"); + TORCH_CHECK(!fileStem.empty(), "TORCH_FR_DEBUG_INFO_PIPE_FILE is empty"); std::string filename = c10::str(fileStem, rank, ".pipe"); TORCH_CHECK( unlink(filename.c_str()) != -1 || errno == ENOENT, @@ -489,6 +481,7 @@ class TORCH_API ProcessGroupXCCL : public Backend { const std::vector& groupRanks() const; void setStartedPgStatus(c10::intrusive_ptr work); + void setCompletedPgStatus(c10::intrusive_ptr work); bool dumpDebuggingInfo(bool includeStackTrace = true); From 7abe0b68e95be695207408dcb97430b4a7c8d9ce Mon Sep 17 00:00:00 2001 From: frost-intel Date: Thu, 17 Jul 2025 14:08:40 +0000 Subject: [PATCH 04/15] Lint/comment --- src/xccl/ProcessGroupXCCL.cpp | 97 +++++++++++++---------------------- src/xccl/ProcessGroupXCCL.hpp | 11 +--- 2 files changed, 38 insertions(+), 70 deletions(-) diff --git a/src/xccl/ProcessGroupXCCL.cpp b/src/xccl/ProcessGroupXCCL.cpp index 967da8e076..0bc7d46206 100644 --- a/src/xccl/ProcessGroupXCCL.cpp +++ b/src/xccl/ProcessGroupXCCL.cpp @@ -246,7 +246,6 @@ ProcessGroupXCCL::WorkXCCL::WorkXCCL( workStartTime_(std::chrono::steady_clock::now()), seq_(seq), isP2P_(isP2P) { - xcclStartEvent_ = std::make_shared(); xcclEndEvent_ = std::make_shared(); stashed_for_allocator_safety_ = std::make_shared(); } @@ -254,7 +253,6 @@ ProcessGroupXCCL::WorkXCCL::WorkXCCL( ProcessGroupXCCL::WorkXCCL::WorkXCCL(const WorkXCCL& w) : Work(w.rank_, w.opType_), device_(w.device_), - xcclStartEvent_(w.xcclStartEvent_), xcclEndEvent_(w.xcclEndEvent_), blockingWait_(w.blockingWait_), workStartTime_(w.workStartTime_), @@ -307,6 +305,10 @@ bool ProcessGroupXCCL::WorkXCCL::wait(std::chrono::milliseconds timeout) { return true; } +ProcessGroupXCCL::Options::Options() + : Backend::Options(XCCL_BACKEND_NAME) {} + + static std::atomic process_group_id = 0; constexpr const char* MULTI_DEVICE_ERROR_MSG = @@ -346,8 +348,9 @@ ProcessGroupXCCL::ProcessGroupXCCL( logPrefix_ = createLogPrefix(); blockingWait_ = getCvarBool(TORCH_XCCL_BLOCKING_WAIT, false); traceBufferSize_ = getCvarInt({"TORCH_FR_BUFFER_SIZE"}, 2000); - this->setGroupUid(options_->group_name); + this->setGroupUid(options_->group_name); + // In PGNCCL, the pg ranks are recorded on comm setup in each op, but we just do it here. FlightRecorderXCCL::get()->record_pg_ranks( std::make_tuple(pg_uid_, pg_desc_), groupRanks()); init(); @@ -363,19 +366,19 @@ ProcessGroupXCCL::ProcessGroupXCCL( << ", TORCH_XCCL_BLOCKING_WAIT: " << blockingWait_ << ", TORCH_DISTRIBUTED_DEBUG: " << torch_distributed_debug; + // Heartbeat monitor thread dumps debug info on write to pipe heartbeatMonitor_ = std::make_unique(this); heartbeatMonitor_->start(); } ProcessGroupXCCL::~ProcessGroupXCCL() { heartbeatMonitor_->stop(); + // Wait for all threads to finish before returning heartbeatMonitor_->join(); } bool ProcessGroupXCCL::dumpDebuggingInfo(bool includeStackTrace /*=true*/) { STATIC_SCOPED_WAIT_COUNTER(pytorch.ProcessGroupXCCL__dumpDebuggingInfo); - static std::mutex writeDebugInfoMutex; - std::lock_guard lock(writeDebugInfoMutex); LOG(ERROR) << logPrefix() << "ProcessGroupXCCL preparing to dump debug info. Include stack trace: " @@ -405,19 +408,6 @@ ProcessGroupXCCL::HeartbeatMonitor::HeartbeatMonitor(ProcessGroupXCCL* pg) { << "TORCH_XCCL_COOR_CHECK_MILSEC: " << coordCheckIntervalMilSec_; } -std::string ProcessGroupXCCL::HeartbeatMonitor::getXCCLTimeoutErrorMsg(const std::string& extraMsg) { - return c10::str( - pg_->logPrefix(), - "Received a dump signal due to a collective timeout from ", - extraMsg, - " and we will try our best to dump the debug info. ", - "Last enqueued XCCL work: ", - pg_->pgStatus_->lastEnqueuedSeq, - ", last completed XCCL work: ", - pg_->pgStatus_->lastCompletedSeq, - "."); -} - void ProcessGroupXCCL::HeartbeatMonitor::stop() { terminateHeartbeatMonitorThread_.store(true); monitorWakeUpCV_.notify_one(); @@ -442,24 +432,23 @@ void ProcessGroupXCCL::HeartbeatMonitor::join() { void ProcessGroupXCCL::HeartbeatMonitor::runLoop() { c10::setThreadName("pt_xccl_heartbt"); - std::string errorMsg; - std::string exitReason; - bool checkDumpSignal = pg_->local_id_ == 0; - int monitorPollInterval = coordCheckIntervalMilSec_; std::optional dumpPipe = std::nullopt; + // We only need to dump once per PG, so we use local_id_ == 0 for the first PG if (pg_->local_id_ == 0) { // DumpPipe is one per-trainer process dumpPipe.emplace(pg_->rank_); while (true) { std::unique_lock lock(monitorMutex_); if (monitorWakeUpCV_.wait_for( - lock, std::chrono::milliseconds(monitorPollInterval), [&] { + lock, std::chrono::milliseconds(coordCheckIntervalMilSec_), [&] { return terminateHeartbeatMonitorThread_.load(); })) { return; } + // Write to pipe files for all ranks to dump debug info if (dumpPipe.has_value() && dumpPipe->shouldDump()) { - LOG(INFO) << pg_->logPrefix() << "Dump signal received through pipe."; + LOG(INFO) << pg_->logPrefix() + << "Dump signal received through pipe, triggering FR dump."; std::future fut = std::async(std::launch::async, [this]() { return this->pg_->dumpDebuggingInfo(); }); @@ -477,6 +466,24 @@ const std::vector& ProcessGroupXCCL::groupRanks() const { return options_->global_ranks_in_group; } +void ProcessGroupXCCL::setStartedPgStatus( + c10::intrusive_ptr work) { + pgStatus_->lastStartedSeq = static_cast(work->getSequencenumber()); + pgStatus_->lastStartedWorkName = opTypeToString(work->opType_); + pgStatus_->lastStartedNumelIn = work->numelIn_; + pgStatus_->lastStartedNumelOut = work->numelOut_; +} + +void ProcessGroupXCCL::setCompletedPgStatus( + c10::intrusive_ptr work) { + pgStatus_->lastCompletedSeq = static_cast(work->getSequencenumber()); + pgStatus_->lastCompletedWorkName = opTypeToString(work->opType_); + pgStatus_->lastCompletedNumelIn = work->numelIn_; + pgStatus_->lastCompletedNumelOut = work->numelOut_; + // To avoid complexity, we're not computing duration. + FlightRecorderXCCL::get()->retire_id(work->trace_id_, /*compute_duration*/false); +} + void ProcessGroupXCCL::setSequenceNumberForGroup() {} uint64_t ProcessGroupXCCL::getSequenceNumberForGroup() { @@ -500,7 +507,7 @@ c10::intrusive_ptr ProcessGroupXCCL::initWork( profilingTitle, profilingTitle != nullptr ? std::optional>(inputs) : std::nullopt); - + r->trace_id_ = FlightRecorderXCCL::get()->record( local_id_, std::make_tuple(pg_uid_, pg_desc_), // PG name tuple @@ -510,7 +517,7 @@ c10::intrusive_ptr ProcessGroupXCCL::initWork( profilingTitle ? profilingTitle : "", inputs, outputs, - r->xcclStartEvent_.get(), + nullptr, r->xcclEndEvent_.get(), options_->timeout, pgStatus_, @@ -627,9 +634,6 @@ void ProcessGroupXCCL::groupEnd() { --xcclActiveGroupCounter_; } -ProcessGroupXCCL::Options::Options() - : Backend::Options(XCCL_BACKEND_NAME) {} - static constexpr int CoalActive = 0x01, CoalColl = 0x02, CoalP2P = 0x04; void ProcessGroupXCCL::startCoalescing() { coalescedDevice_.set_index(-1); @@ -639,23 +643,6 @@ void ProcessGroupXCCL::startCoalescing() { groupStart(); } -void ProcessGroupXCCL::setStartedPgStatus( - c10::intrusive_ptr work) { - pgStatus_->lastStartedSeq = static_cast(work->getSequencenumber()); - pgStatus_->lastStartedWorkName = opTypeToString(work->opType_); - pgStatus_->lastStartedNumelIn = work->numelIn_; - pgStatus_->lastStartedNumelOut = work->numelOut_; -} - -void ProcessGroupXCCL::setCompletedPgStatus( - c10::intrusive_ptr work) { - pgStatus_->lastCompletedSeq = static_cast(work->getSequencenumber()); - pgStatus_->lastCompletedWorkName = opTypeToString(work->opType_); - pgStatus_->lastCompletedNumelIn = work->numelIn_; - pgStatus_->lastCompletedNumelOut = work->numelOut_; - FlightRecorderXCCL::get()->retire_id(work->trace_id_, false); -} - c10::intrusive_ptr ProcessGroupXCCL::endCoalescing(OpType optype) { if (coalescedComm_ == nullptr) { // There is no actual work being coalesced, return here @@ -817,8 +804,7 @@ c10::intrusive_ptr ProcessGroupXCCL::collective( work->future_->addCallback( [this, work](at::ivalue::Future&) { this->setCompletedPgStatus(work); - } - ); + }); work->blockingWait_ = blockingWait_; work->numelIn_ = 0; @@ -904,7 +890,7 @@ c10::intrusive_ptr ProcessGroupXCCL::pointToPoint( profilingTitle, {tensor}, {tensor}, - work->xcclStartEvent_.get(), + nullptr, work->xcclEndEvent_.get(), options_->timeout, pgStatus_, @@ -928,8 +914,7 @@ c10::intrusive_ptr ProcessGroupXCCL::pointToPoint( work->future_->addCallback( [this, work](at::ivalue::Future&) { this->setCompletedPgStatus(work); - } - ); + }); work->numelIn_ = work->numelOut_ = tensor.numel(); setStartedPgStatus(work); @@ -2325,13 +2310,3 @@ c10::intrusive_ptr ProcessGroupXCCL::alltoall( } // namespace c10d #endif // USE_C10D_XCCL - -void printNcclCommProxyTrace( - const std::string& dumpReason, - const std::unordered_map& dumpMap) { - LOG(INFO) << "Dumping nccl comm trace, reason: " << dumpReason; - for (auto& [key, value] : dumpMap) { - LOG(INFO) << "key: " << key << ", value: " << value; - } - LOG(INFO) << "----------------------"; -} diff --git a/src/xccl/ProcessGroupXCCL.hpp b/src/xccl/ProcessGroupXCCL.hpp index ab534e485c..fda55b1e91 100644 --- a/src/xccl/ProcessGroupXCCL.hpp +++ b/src/xccl/ProcessGroupXCCL.hpp @@ -27,10 +27,6 @@ static std::vector TORCH_XCCL_BLOCKING_WAIT = { "TORCH_XCCL_BLOCKING_WAIT", "XCCL_BLOCKING_WAIT"}; -static std::vector TORCH_XCCL_LOG_CPP_STACK_ON_UNCLEAN_SHUTDOWN = { - "TORCH_XCCL_LOG_CPP_STACK_ON_UNCLEAN_SHUTDOWN", - "XCCL_LOG_CPP_STACK_ON_UNCLEAN_SHUTDOWN"}; - static std::vector TORCH_XCCL_COORD_CHECK_MILSEC = { "TORCH_XCCL_COORD_CHECK_MILSEC", "XCCL_COORD_CHECK_MILSEC"}; @@ -158,7 +154,6 @@ class TORCH_API ProcessGroupXCCL : public Backend { protected: at::Device device_; - std::shared_ptr xcclStartEvent_; std::shared_ptr xcclEndEvent_; bool isBarrierOp_{false}; bool blockingWait_{false}; @@ -476,13 +471,11 @@ class TORCH_API ProcessGroupXCCL : public Backend { const std::string& logPrefix() const; c10::DeviceIndex guessDeviceId() const; - const int& globalRank() const; + const int& globalRank() const; const std::vector& groupRanks() const; - void setStartedPgStatus(c10::intrusive_ptr work); void setCompletedPgStatus(c10::intrusive_ptr work); - bool dumpDebuggingInfo(bool includeStackTrace = true); protected: @@ -491,7 +484,6 @@ class TORCH_API ProcessGroupXCCL : public Backend { std::unordered_map xcclEventsMap_; std::unordered_map> devXCCLCommMap_; c10::intrusive_ptr store_; - const c10::intrusive_ptr options_; uint64_t xcclCommCounter_{0}; std::mutex mutex_; std::set usedDeviceIdxs_; @@ -507,6 +499,7 @@ class TORCH_API ProcessGroupXCCL : public Backend { uint64_t op_id_{0}; size_t local_id_; std::string logPrefix_; + const c10::intrusive_ptr options_; std::shared_ptr pgStatus_ = std::make_shared(); std::unique_ptr heartbeatMonitor_; From e05e0ce429ee60a193d89311d86fac005d77e95c Mon Sep 17 00:00:00 2001 From: frost-intel Date: Thu, 17 Jul 2025 20:23:12 +0000 Subject: [PATCH 05/15] Typo/lint --- src/xccl/ProcessGroupXCCL.cpp | 2 +- src/xccl/ProcessGroupXCCL.hpp | 1 - 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/src/xccl/ProcessGroupXCCL.cpp b/src/xccl/ProcessGroupXCCL.cpp index 2ec9eb8ace..351e19026f 100644 --- a/src/xccl/ProcessGroupXCCL.cpp +++ b/src/xccl/ProcessGroupXCCL.cpp @@ -405,7 +405,7 @@ ProcessGroupXCCL::HeartbeatMonitor::HeartbeatMonitor(ProcessGroupXCCL* pg) { coordCheckIntervalMilSec_ = getCvarInt(TORCH_XCCL_COORD_CHECK_MILSEC, 1000); LOG(INFO) << pg_->logPrefix() << "HeartbeatMonitor environments: " - << "TORCH_XCCL_COOR_CHECK_MILSEC: " << coordCheckIntervalMilSec_; + << "TORCH_XCCL_COORD_CHECK_MILSEC: " << coordCheckIntervalMilSec_; } void ProcessGroupXCCL::HeartbeatMonitor::stop() { diff --git a/src/xccl/ProcessGroupXCCL.hpp b/src/xccl/ProcessGroupXCCL.hpp index 2948a36cae..36190fb8d9 100644 --- a/src/xccl/ProcessGroupXCCL.hpp +++ b/src/xccl/ProcessGroupXCCL.hpp @@ -472,7 +472,6 @@ class TORCH_API ProcessGroupXCCL : public Backend { c10::DeviceIndex guessDeviceId() const; - const int& globalRank() const; const std::vector& groupRanks() const; void setStartedPgStatus(c10::intrusive_ptr work); void setCompletedPgStatus(c10::intrusive_ptr work); From fa45bd4b29f343b633f78e0f0636c7922dd4840a Mon Sep 17 00:00:00 2001 From: Frost Mitchell Date: Tue, 22 Jul 2025 15:07:15 -0400 Subject: [PATCH 06/15] Move DumpPipe and HeartbeatMonitor to Monitor file --- src/xccl/ProcessGroupXCCL.cpp | 59 +----------------- src/xccl/ProcessGroupXCCL.hpp | 83 +------------------------ src/xccl/ProcessGroupXCCLMonitor.cpp | 63 +++++++++++++++++++ src/xccl/ProcessGroupXCCLMonitor.hpp | 93 ++++++++++++++++++++++++++++ 4 files changed, 159 insertions(+), 139 deletions(-) create mode 100644 src/xccl/ProcessGroupXCCLMonitor.cpp create mode 100644 src/xccl/ProcessGroupXCCLMonitor.hpp diff --git a/src/xccl/ProcessGroupXCCL.cpp b/src/xccl/ProcessGroupXCCL.cpp index 351e19026f..851265c6d5 100644 --- a/src/xccl/ProcessGroupXCCL.cpp +++ b/src/xccl/ProcessGroupXCCL.cpp @@ -367,7 +367,7 @@ ProcessGroupXCCL::ProcessGroupXCCL( << ", TORCH_DISTRIBUTED_DEBUG: " << torch_distributed_debug; // Heartbeat monitor thread dumps debug info on write to pipe - heartbeatMonitor_ = std::make_unique(this); + heartbeatMonitor_ = std::make_unique(this); heartbeatMonitor_->start(); } @@ -400,63 +400,6 @@ bool ProcessGroupXCCL::dumpDebuggingInfo(bool includeStackTrace /*=true*/) { return false; } -ProcessGroupXCCL::HeartbeatMonitor::HeartbeatMonitor(ProcessGroupXCCL* pg) { - pg_ = pg; - coordCheckIntervalMilSec_ = getCvarInt(TORCH_XCCL_COORD_CHECK_MILSEC, 1000); - LOG(INFO) - << pg_->logPrefix() << "HeartbeatMonitor environments: " - << "TORCH_XCCL_COORD_CHECK_MILSEC: " << coordCheckIntervalMilSec_; -} - -void ProcessGroupXCCL::HeartbeatMonitor::stop() { - terminateHeartbeatMonitorThread_.store(true); - monitorWakeUpCV_.notify_one(); -} - -void ProcessGroupXCCL::HeartbeatMonitor::start() { - TORCH_CHECK( - !xcclHeartbeatMonitorThread_.joinable(), - "HeartbeatMonitor thread already started"); - xcclHeartbeatMonitorThread_ = - std::thread(&ProcessGroupXCCL::HeartbeatMonitor::runLoop, this); -} - -void ProcessGroupXCCL::HeartbeatMonitor::join() { - if (xcclHeartbeatMonitorThread_.joinable()) { - xcclHeartbeatMonitorThread_.join(); - LOG(INFO) << pg_->logPrefix() - << "ProcessGroupXCCL heart beat monitor thread joined."; - } -} - -void ProcessGroupXCCL::HeartbeatMonitor::runLoop() { - c10::setThreadName("pt_xccl_heartbt"); - - std::optional dumpPipe = std::nullopt; - // We only need to dump once per PG, so we use local_id_ == 0 for the first PG - if (pg_->local_id_ == 0) { - // DumpPipe is one per-trainer process - dumpPipe.emplace(pg_->rank_); - while (true) { - std::unique_lock lock(monitorMutex_); - if (monitorWakeUpCV_.wait_for( - lock, std::chrono::milliseconds(coordCheckIntervalMilSec_), [&] { - return terminateHeartbeatMonitorThread_.load(); - })) { - return; - } - // Write to pipe files for all ranks to dump debug info - if (dumpPipe.has_value() && dumpPipe->shouldDump()) { - LOG(INFO) << pg_->logPrefix() - << "Dump signal received through pipe, triggering FR dump."; - std::future fut = std::async(std::launch::async, [this]() { - return this->pg_->dumpDebuggingInfo(); - }); - } - } - } -} - const std::vector& ProcessGroupXCCL::groupRanks() const { if (options_->global_ranks_in_group.empty()) { static std::vector globalRanks(size_); diff --git a/src/xccl/ProcessGroupXCCL.hpp b/src/xccl/ProcessGroupXCCL.hpp index 36190fb8d9..aa7a49777a 100644 --- a/src/xccl/ProcessGroupXCCL.hpp +++ b/src/xccl/ProcessGroupXCCL.hpp @@ -21,6 +21,7 @@ #include #include #include +#include namespace c10d { static std::vector TORCH_XCCL_BLOCKING_WAIT = { @@ -31,64 +32,6 @@ static std::vector TORCH_XCCL_COORD_CHECK_MILSEC = { "TORCH_XCCL_COORD_CHECK_MILSEC", "XCCL_COORD_CHECK_MILSEC"}; -#if defined(__linux__) -struct DumpPipe { - DumpPipe(int rank) { - std::string fileStem = - getCvarString({"TORCH_FR_DEBUG_INFO_PIPE_FILE"}, ""); - if (fileStem.empty() || - getCvarInt({"TORCH_FR_BUFFER_SIZE"}, 0) <= 0) { - return; - } - TORCH_CHECK(!fileStem.empty(), "TORCH_FR_DEBUG_INFO_PIPE_FILE is empty"); - std::string filename = c10::str(fileStem, rank, ".pipe"); - TORCH_CHECK( - unlink(filename.c_str()) != -1 || errno == ENOENT, - "Error removing existing named pipe ", - filename, - ", Error: ", - std::strerror(errno)); - TORCH_CHECK( - mkfifo(filename.c_str(), 0666) != -1, - "Error creating named pipe ", - filename, - ", Error: ", - std::strerror(errno)); - fd_ = open(filename.c_str(), O_RDONLY | O_NONBLOCK); - LOG(INFO) << "Pipe file " << filename - << " has been opened, write to it to trigger XCCL Debug Dump."; - TORCH_CHECK(fd_ != -1, "Error opening named pipe ", filename); - } - bool shouldDump() { - if (fd_ == -1) { - return false; - } - // NOLINTNEXTLINE(*array*) - char buf[128]{}; - // non-blocking from O_NONBLOCK above. - // Ignore EINTR because we already will poll this - // again later. - ssize_t bytesRead = read(fd_, &buf, 128); - return bytesRead > 0; - } - ~DumpPipe() { - if (fd_ != -1) { - close(fd_); - } - } - - private: - int fd_ = -1; -}; -#else -struct DumpPipe { - DumpPipe(int rank) {} - bool shouldDump() { - return false; - } -}; -#endif - using xcclComm_t = ccl::communicator; constexpr const char* XCCL_BACKEND_NAME = "xccl"; @@ -182,28 +125,6 @@ class TORCH_API ProcessGroupXCCL : public Backend { std::string group_name; }; - class HeartbeatMonitor { - public: - HeartbeatMonitor(ProcessGroupXCCL* pg); - virtual ~HeartbeatMonitor() = default; - - std::string getXCCLTimeoutErrorMsg(const std::string& extraMsg); - void start(); - void join(); - virtual void runLoop(); - void stop(); - - protected: - ProcessGroupXCCL* pg_; - - private: - int coordCheckIntervalMilSec_; - std::condition_variable monitorWakeUpCV_; - std::mutex monitorMutex_; - std::thread xcclHeartbeatMonitorThread_; - std::atomic terminateHeartbeatMonitorThread_{false}; - }; - ProcessGroupXCCL( const c10::intrusive_ptr& store, int rank, @@ -501,7 +422,7 @@ class TORCH_API ProcessGroupXCCL : public Backend { const c10::intrusive_ptr options_; std::shared_ptr pgStatus_ = std::make_shared(); - std::unique_ptr heartbeatMonitor_; + std::unique_ptr heartbeatMonitor_; int traceBufferSize_; private: diff --git a/src/xccl/ProcessGroupXCCLMonitor.cpp b/src/xccl/ProcessGroupXCCLMonitor.cpp new file mode 100644 index 0000000000..d8fbcc4679 --- /dev/null +++ b/src/xccl/ProcessGroupXCCLMonitor.cpp @@ -0,0 +1,63 @@ +#ifdef USE_C10D_XCCL + +#include +namespace c10d { + +HeartbeatMonitorXCCL::HeartbeatMonitorXCCL(ProcessGroupXCCL* pg) { + pg_ = pg; + coordCheckIntervalMilSec_ = getCvarInt(TORCH_XCCL_COORD_CHECK_MILSEC, 1000); + LOG(INFO) + << pg_->logPrefix() << "HeartbeatMonitor environments: " + << "TORCH_XCCL_COORD_CHECK_MILSEC: " << coordCheckIntervalMilSec_; +} + +void HeartbeatMonitorXCCL::stop() { + terminateHeartbeatMonitorThread_.store(true); + monitorWakeUpCV_.notify_one(); +} + +void HeartbeatMonitorXCCL::start() { + TORCH_CHECK( + !xcclHeartbeatMonitorThread_.joinable(), + "HeartbeatMonitor thread already started"); + xcclHeartbeatMonitorThread_ = + std::thread(&HeartbeatMonitorXCCL::runLoop, this); +} + +void HeartbeatMonitorXCCL::join() { + if (xcclHeartbeatMonitorThread_.joinable()) { + xcclHeartbeatMonitorThread_.join(); + LOG(INFO) << pg_->logPrefix() + << "ProcessGroupXCCL heart beat monitor thread joined."; + } +} + +void HeartbeatMonitorXCCL::runLoop() { + c10::setThreadName("pt_xccl_heartbt"); + + std::optional dumpPipe = std::nullopt; + // We only need to dump once per PG, so we use local_id_ == 0 for the first PG + if (pg_->local_id_ == 0) { + // DumpPipe is one per-trainer process + dumpPipe.emplace(pg_->rank_); + while (true) { + std::unique_lock lock(monitorMutex_); + if (monitorWakeUpCV_.wait_for( + lock, std::chrono::milliseconds(coordCheckIntervalMilSec_), [&] { + return terminateHeartbeatMonitorThread_.load(); + })) { + return; + } + // Write to pipe files for all ranks to dump debug info + if (dumpPipe.has_value() && dumpPipe->shouldDump()) { + LOG(INFO) << pg_->logPrefix() + << "Dump signal received through pipe, triggering FR dump."; + std::future fut = std::async(std::launch::async, [this]() { + return this->pg_->dumpDebuggingInfo(); + }); + } + } + } +} + +} // namespace c10d diff --git a/src/xccl/ProcessGroupXCCLMonitor.hpp b/src/xccl/ProcessGroupXCCLMonitor.hpp new file mode 100644 index 0000000000..d2b298b804 --- /dev/null +++ b/src/xccl/ProcessGroupXCCLMonitor.hpp @@ -0,0 +1,93 @@ +#pragma once + +#include +#include +#include +#include + +#ifdef USE_C10D_XCCL +namespace c10d { + +// This definition will later be moved to a common header for ProcessGroups NCCL/Gloo/XCCL +#if defined(__linux__) +struct DumpPipe { + DumpPipe(int rank) { + std::string fileStem = + getCvarString({"TORCH_FR_DEBUG_INFO_PIPE_FILE"}, ""); + if (fileStem.empty() || + getCvarInt({"TORCH_FR_BUFFER_SIZE"}, 0) <= 0) { + return; + } + TORCH_CHECK(!fileStem.empty(), "TORCH_FR_DEBUG_INFO_PIPE_FILE is empty"); + std::string filename = c10::str(fileStem, rank, ".pipe"); + TORCH_CHECK( + unlink(filename.c_str()) != -1 || errno == ENOENT, + "Error removing existing named pipe ", + filename, + ", Error: ", + std::strerror(errno)); + TORCH_CHECK( + mkfifo(filename.c_str(), 0666) != -1, + "Error creating named pipe ", + filename, + ", Error: ", + std::strerror(errno)); + fd_ = open(filename.c_str(), O_RDONLY | O_NONBLOCK); + LOG(INFO) << "Pipe file " << filename + << " has been opened, write to it to trigger ProcessGroup Debug Dump."; + TORCH_CHECK(fd_ != -1, "Error opening named pipe ", filename); + } + bool shouldDump() { + if (fd_ == -1) { + return false; + } + // NOLINTNEXTLINE(*array*) + char buf[128]{}; + // non-blocking from O_NONBLOCK above. + // Ignore EINTR because we already will poll this + // again later. + ssize_t bytesRead = read(fd_, &buf, 128); + return bytesRead > 0; + } + ~DumpPipe() { + if (fd_ != -1) { + close(fd_); + } + } + + private: + int fd_ = -1; +}; +#else +struct DumpPipe { + DumpPipe(int rank) {} + bool shouldDump() { + return false; + } +}; +#endif + +class ProcessGroupXCCL; +class HeartbeatMonitorXCCL { + public: + HeartbeatMonitorXXCL(ProcessGroupXCCL* pg); + virtual ~HeartbeatMonitorXCCL() = default; + + std::string getXCCLTimeoutErrorMsg(const std::string& extraMsg); + void start(); + void join(); + virtual void runLoop(); + void stop(); + + protected: + ProcessGroupXCCL* pg_; + + private: + int coordCheckIntervalMilSec_; + std::condition_variable monitorWakeUpCV_; + std::mutex monitorMutex_; + std::thread xcclHeartbeatMonitorThread_; + std::atomic terminateHeartbeatMonitorThread_{false}; +}; +} +#endif // USE_C10D_XCCL From 0b602ae93106df76315d24861e1e8d361b1c57b7 Mon Sep 17 00:00:00 2001 From: frost-intel Date: Tue, 22 Jul 2025 20:32:17 +0000 Subject: [PATCH 07/15] Build fixes --- src/xccl/ProcessGroupXCCL.hpp | 2 ++ src/xccl/ProcessGroupXCCLMonitor.cpp | 5 ++++- src/xccl/ProcessGroupXCCLMonitor.hpp | 2 +- 3 files changed, 7 insertions(+), 2 deletions(-) diff --git a/src/xccl/ProcessGroupXCCL.hpp b/src/xccl/ProcessGroupXCCL.hpp index aa7a49777a..186d66947f 100644 --- a/src/xccl/ProcessGroupXCCL.hpp +++ b/src/xccl/ProcessGroupXCCL.hpp @@ -425,6 +425,8 @@ class TORCH_API ProcessGroupXCCL : public Backend { std::unique_ptr heartbeatMonitor_; int traceBufferSize_; + friend class HeartbeatMonitorXCCL; + private: std::mutex kvs_mutex; diff --git a/src/xccl/ProcessGroupXCCLMonitor.cpp b/src/xccl/ProcessGroupXCCLMonitor.cpp index d8fbcc4679..07eb601c79 100644 --- a/src/xccl/ProcessGroupXCCLMonitor.cpp +++ b/src/xccl/ProcessGroupXCCLMonitor.cpp @@ -1,5 +1,6 @@ #ifdef USE_C10D_XCCL +#include #include namespace c10d { @@ -39,7 +40,7 @@ void HeartbeatMonitorXCCL::runLoop() { // We only need to dump once per PG, so we use local_id_ == 0 for the first PG if (pg_->local_id_ == 0) { // DumpPipe is one per-trainer process - dumpPipe.emplace(pg_->rank_); + dumpPipe.emplace(pg_->getRank()); while (true) { std::unique_lock lock(monitorMutex_); if (monitorWakeUpCV_.wait_for( @@ -61,3 +62,5 @@ void HeartbeatMonitorXCCL::runLoop() { } } // namespace c10d + +#endif // USE_C10D_XCCL diff --git a/src/xccl/ProcessGroupXCCLMonitor.hpp b/src/xccl/ProcessGroupXCCLMonitor.hpp index d2b298b804..629a13025f 100644 --- a/src/xccl/ProcessGroupXCCLMonitor.hpp +++ b/src/xccl/ProcessGroupXCCLMonitor.hpp @@ -70,7 +70,7 @@ struct DumpPipe { class ProcessGroupXCCL; class HeartbeatMonitorXCCL { public: - HeartbeatMonitorXXCL(ProcessGroupXCCL* pg); + HeartbeatMonitorXCCL(ProcessGroupXCCL* pg); virtual ~HeartbeatMonitorXCCL() = default; std::string getXCCLTimeoutErrorMsg(const std::string& extraMsg); From 326f0c5c9f8970cf7a41b96c29a1badcbf0f257f Mon Sep 17 00:00:00 2001 From: frost-intel Date: Wed, 23 Jul 2025 20:20:50 +0000 Subject: [PATCH 08/15] Add test (draft) --- src/xccl/ProcessGroupXCCL.cpp | 25 +- src/xccl/ProcessGroupXCCL.hpp | 16 +- test/xpu/distributed/test_c10d_xccl.py | 623 ++++++++++++++++++++++++- 3 files changed, 650 insertions(+), 14 deletions(-) diff --git a/src/xccl/ProcessGroupXCCL.cpp b/src/xccl/ProcessGroupXCCL.cpp index 851265c6d5..5182489c21 100644 --- a/src/xccl/ProcessGroupXCCL.cpp +++ b/src/xccl/ProcessGroupXCCL.cpp @@ -202,6 +202,17 @@ void syncStream( } // namespace +std::string dump_xccl_trace( + bool includeCollectives, + bool includeStackTraces, + bool onlyActive) { + auto xcclDumpMap = std::unordered_map< + std::string, + std::unordered_map>(); + return FlightRecorderXCCL::get()->dump( + xcclDumpMap, includeCollectives, includeStackTraces, onlyActive); +} + constexpr int64_t kSynchronizeBusyWaitMillis = 10; thread_local uint64_t ProcessGroupXCCL::xcclActiveGroupCounter_ = 0; @@ -385,11 +396,7 @@ bool ProcessGroupXCCL::dumpDebuggingInfo(bool includeStackTrace /*=true*/) { << includeStackTrace; if (traceBufferSize_ > 0) { // TODO: dump_xccl_trace - auto xcclDumpMap = std::unordered_map< - std::string, - std::unordered_map>(); - auto xcclTrace = FlightRecorderXCCL::get()->dump( - xcclDumpMap, true, includeStackTrace, false); + auto xcclTrace = dump_xccl_trace(true, includeStackTrace, false); DebugInfoWriter& writer = DebugInfoWriter::getWriter(rank_); LOG(INFO) << logPrefix() << "ProcessGroupXCCL dumping xccl trace to " << writer.getWriterTarget(); @@ -2267,6 +2274,14 @@ c10::intrusive_ptr ProcessGroupXCCL::alltoall( "xccl:all_to_all"); } +std::string getXcclVersion() { + auto xccl_version = ccl::get_library_version(); + std::string versionString = std::to_string(xccl_version.major) + "." + + std::to_string(xccl_version.minor) + "." + + std::to_string(xccl_version.update); + return versionString; +} + } // namespace c10d #endif // USE_C10D_XCCL diff --git a/src/xccl/ProcessGroupXCCL.hpp b/src/xccl/ProcessGroupXCCL.hpp index 186d66947f..8e8e40f0ca 100644 --- a/src/xccl/ProcessGroupXCCL.hpp +++ b/src/xccl/ProcessGroupXCCL.hpp @@ -464,18 +464,18 @@ class TORCH_API ProcessGroupXCCL : public Backend { return kvs; } }; + +// Dumps the comm traces and additional information about the ProcessGroup. +TORCH_API std::string dump_xccl_trace( + bool includeCollectives, + bool includeStackTraces, + bool onlyActive); + +TORCH_API std::string getXcclVersion(); } // namespace c10d namespace { -inline std::string getXcclVersion() { - auto xccl_version = ccl::get_library_version(); - std::string versionString = std::to_string(xccl_version.major) + "." + - std::to_string(xccl_version.minor) + "." + - std::to_string(xccl_version.update); - return versionString; -} - inline std::string reduceOpToString(c10d::ReduceOp op) { switch (op) { case c10d::ReduceOp::SUM: diff --git a/test/xpu/distributed/test_c10d_xccl.py b/test/xpu/distributed/test_c10d_xccl.py index 0625a6993f..0874a90853 100644 --- a/test/xpu/distributed/test_c10d_xccl.py +++ b/test/xpu/distributed/test_c10d_xccl.py @@ -1,10 +1,14 @@ # Owner(s): ["oncall: distributed"] +import json import math import os +import pickle import sys +import tempfile +import threading import time -from datetime import timedelta +from datetime import datetime, timedelta from enum import auto, Enum from unittest import mock @@ -19,6 +23,7 @@ import torch.testing._internal.common_utils as common from torch.testing._internal.common_distributed import MultiProcessTestCase from torch.testing._internal.common_utils import ( + parametrize, retry_on_connect_failures, run_tests, skip_but_pass_in_sandcastle_if, @@ -551,6 +556,622 @@ def test_all_gather_into_tensor(self): ) +class XCCLTraceTestBase(MultiProcessTestCase): + def setUp(self): + super().setUp() + os.environ["TORCH_FR_BUFFER_SIZE"] = "1000" + self.tempdir = tempfile.TemporaryDirectory() + os.environ["TORCH_FR_DEBUG_INFO_PIPE_FILE"] = self._trace_basename() + self._spawn_processes() + + @classmethod + def _run( + cls, + parent_conn, + rank: int, + test_name: str, + file_name: str, + parent_pipe, + **kwargs, + ) -> None: + cls.parent = parent_conn + super()._run(rank, test_name, file_name, parent_pipe) + + @property + def local_device(self): + return torch.device("xpu", self.rank_to_GPU[self.rank][0]) + + def _join_processes(self, fn): + # We need to patch sys.exit() as skip_if will use sys.exit() and + # the exit code from the this process will not be caught. + with mock.patch("sys.exit"): + fn() + super()._join_processes(fn) + + def _spawn_processes(self) -> None: + proc = torch.multiprocessing.get_context("spawn").Process + self.children_pipes = [] + parent_pipes = [] + for _ in range(self.world_size): + parent_conn, child_conn = torch.multiprocessing.Pipe() + self.children_pipes.append(child_conn) + parent_pipes.append(parent_conn) + piter = iter(parent_pipes) + + def wrap(*positional, args, **kwargs): + args = (next(piter), *args) + return proc(*positional, args=args, **kwargs) + + self._start_processes(wrap) + + def _create_process_group_xccl( + self, timeout=timedelta(seconds=600), device_id=None + ): + store = c10d.FileStore(self.file_name, self.world_size) + c10d.init_process_group( + "xccl", + world_size=self.world_size, + rank=self.rank, + store=store, + timeout=timeout, + device_id=device_id, + ) + pg = c10d.distributed_c10d._get_default_group() + return pg + + def tearDown(self): + super().tearDown() + try: + os.remove(self.file_name) + except OSError: + pass + + @property + def world_size(self): + return 2 + + @property + def rank_to_GPU(self): + # return rank to GPU map + return init_multigpu_helper(self.world_size, "xccl") + + def _trace_basename(self): + # we pass the base to the env, and the dump util will append rank + return os.path.join(self.tempdir.name, "trace_") + + def _trace_name(self, rank): + return self._trace_basename() + str(rank) + + def started_or_scheduled(self, timing_enabled): + return "started" if timing_enabled else "scheduled" + + +class XCCLTraceTest(XCCLTraceTestBase): + def _verify_trace(self, t, include_collectives, timing_enabled, is_json): + ver = t["version"] + self.assertEqual(ver, "2.9") + xccl_version = t["nccl_version"] + torch_xccl_version = torch.distributed.get_xccl_version() + self.assertEqual(xccl_version, ".".join(str(v) for v in torch_xccl_version)) + pg_config = t["pg_config"] + self.assertEqual(len(pg_config), 1) + default_pg_info = pg_config["0"] + self.assertIn("name", default_pg_info) + self.assertIn("desc", default_pg_info) + self.assertIn("ranks", default_pg_info) + pg_status = t["pg_status"] + self.assertEqual(len(pg_status), 1) + self.assertEqual(str(pg_status["0"]["last_enqueued_collective"]), "2") + self.assertEqual(str(pg_status["0"]["last_completed_collective"]), "2") + self.assertEqual( + str(pg_status["0"]["last_started_collective"]), + "2" if timing_enabled else "-1", + ) + global_ranks = pg_config["0"]["ranks"] + self.assertEqual(len(json.loads(global_ranks)), self.world_size) + if include_collectives: + self.assertEqual(len(t["entries"]), 2) + t = t["entries"] + last = t[-1] + self.assertEqual(last["thread_id"], str(threading.current_thread().ident)) + self.assertEqual(last["thread_name"], "fr_test_thread") + self.assertEqual(last["process_group"], ("0", "default_pg")) + self.assertEqual(last["state"], "completed") + s = last["time_discovered_started_ns"] + f = last["time_discovered_completed_ns"] + self.assertEqual(last["record_id"], 1) + self.assertIsNotNone(f) + if timing_enabled: + self.assertIsNotNone(s) + self.assertTrue(s <= f) + # we don't collect stack traces in JSON at the moment + if not is_json: + self.assertIn("test_c10d_xccl.py", str(last["frames"])) + self.assertEqual(last["input_sizes"], ((3, 4),)) + self.assertEqual(last["input_dtypes"], ["Float"]) + self.assertEqual(last["output_sizes"], ((3, 4),)) + self.assertEqual(last["output_dtypes"], ["Float"]) + self.assertEqual(last["collective_seq_id"], 2) + self.assertEqual(last["timeout_ms"], 600000) + now = datetime.now() + event_created_time = datetime.fromtimestamp( + last["time_created_ns"] / 1000000000 + ) + before_test = now - timedelta(minutes=1) + self.assertTrue(before_test < event_created_time < now) + if timing_enabled: + # very loose bounds, measured 0.036 ms on devgpu + self.assertTrue(0 < last["duration_ms"] < 100) + else: + self.assertTrue("duration_ms" not in last) + else: + self.assertTrue("entries" not in t) + + def load_libpthread_or_libc(self): + import ctypes.util + + for base in ("pthread", "c"): + path = ctypes.util.find_library(base) + if path: + try: + return ctypes.CDLL(path) + except OSError: + continue + raise RuntimeError("Could not load pthread or libc") + + # Directly set thread name using threading.current_thread().name does not work + # because we use pthread_getname_np to get the thread’s OS-level name in C++ + def set_thread_name(self, name): + import ctypes + + lib = self.load_libpthread_or_libc() + pthread_self = lib.pthread_self + pthread_self.restype = ctypes.c_void_p + pthread_setname_np = lib.pthread_setname_np + pthread_setname_np.argtypes = [ctypes.c_void_p, ctypes.c_char_p] + + # Get current pthread handle + tid = pthread_self() + + # Set name + pthread_setname_np(tid, name.encode()) + + @requires_xccl() + @skip_if_lt_x_gpu(2) + @parametrize("timing_enabled", [True, False]) + @parametrize("include_collectives", [True, False]) + def test_short_pickle(self, timing_enabled, include_collectives): + if self.rank == self.MAIN_PROCESS_RANK: + return + pg = self._create_process_group_xccl() + if timing_enabled: + pg._enable_collectives_timing() + device = self.local_device + self.set_thread_name("fr_test_thread") + a = torch.full((3, 4), float(self.rank), device=device) + for _ in range(2): + f = pg.allreduce(a) + f.wait() + torch.xpu.synchronize(device=device) + # gah ok so now the duration_ms is populated best-effort since it can only happen outside "dump()" api + time.sleep(1) + t = pickle.loads( + torch._C._distributed_c10d._dump_xccl_trace( + includeCollectives=include_collectives + ) + ) + self._verify_trace( + t, + include_collectives=include_collectives, + timing_enabled=timing_enabled, + is_json=True, + ) + dist.destroy_process_group() + + @requires_xccl() + @skip_if_lt_x_gpu(2) + def test_dump_pipe(self): + def open_file_with_timeout(file_path, mode, timeout=1.0): + start_time = time.time() + while time.time() - start_time < timeout: + if os.path.exists(file_path): + return open(file_path, mode) + time.sleep(0.1) + raise FileNotFoundError + + if self.rank == self.MAIN_PROCESS_RANK: + for c in self.children_pipes: + self.assertEqual(c.recv(), "next") + + dump_file = self._trace_name(rank=0) + pipe_file = dump_file + ".pipe" + with open_file_with_timeout(pipe_file, "w") as f: + f.write("1\n") + with open_file_with_timeout(dump_file, "rb", timeout=10.0) as f: + self.assertTrue("all_reduce" in str(pickle.load(f))) + + for c in self.children_pipes: + c.send("next") + return + + pg = self._create_process_group_xccl() + device = self.local_device + a = torch.full((3, 4), float(self.rank), device=device) + for _ in range(2): + f = pg.allreduce(a) + f.wait() + torch.xpu.synchronize(device=device) + self.parent.send("next") + self.parent.recv() + + @requires_xccl() + @skip_if_lt_x_gpu(2) + def test_long(self): + os.environ["TORCH_FR_TRACE_BUFFER_SIZE"] = "10" + if self.rank == self.MAIN_PROCESS_RANK: + return + pg = self._create_process_group_xccl() + device = self.local_device + a = torch.full((3, 4), float(self.rank), device=device) + for _ in range(2): + # test some other primitives to make sure + # their strings are valid + xs = [torch.ones(3, 4, device=device)] + pg.broadcast(xs).wait() + pg.allreduce(xs).wait() + pg.reduce(xs).wait() + ys = [[torch.empty(3, 4, device=device) for _ in range(self.world_size)]] + pg.allgather(ys, xs).wait() + pg.reduce_scatter(xs, ys).wait() + f = pg.allreduce(a) + f.wait() + torch.xpu.synchronize(device=device) + t = pickle.loads(torch._C._distributed_c10d._dump_xccl_trace()) + t = t["entries"] + self.assertEqual(len(t), 10) + first = t[0] + last = t[-1] + self.assertEqual(last["profiling_name"], "nccl:all_reduce") + self.assertEqual(last["state"], "completed") + self.assertIn("test_c10d_nccl.py", str(last["frames"])) + self.assertEqual(last["input_sizes"], ((3, 4),)) + self.assertEqual(last["input_dtypes"], ["Float"]) + self.assertEqual(last["output_sizes"], ((3, 4),)) + self.assertEqual(last["output_dtypes"], ["Float"]) + self.assertEqual(last["timeout_ms"], 600000) + self.assertEqual(last["collective_seq_id"] - first["collective_seq_id"], 9) + dist.destroy_process_group() + + @requires_xccl() + @skip_if_lt_x_gpu(2) + def test_barrier_profiling(self): + os.environ["TORCH_FR_TRACE_BUFFER_SIZE"] = "10" + if self.rank == self.MAIN_PROCESS_RANK: + return + pg = self._create_process_group_xccl() + device = self.local_device + a = torch.full((3, 4), float(self.rank), device=device) + f = pg.barrier() + f = pg.allreduce(a) + f.wait() + torch.xpu.synchronize(device=device) + t = pickle.loads(torch._C._distributed_c10d._dump_xccl_trace()) + t = t["entries"] + self.assertEqual(len(t), 2) + first = t[0] + last = t[-1] + self.assertEqual(first["profiling_name"], "nccl:all_reduce_barrier") + self.assertEqual(last["profiling_name"], "nccl:all_reduce") + dist.destroy_process_group() + + @requires_xccl() + @skip_if_lt_x_gpu(2) + def test_trace_while_all_works_retired(self): + os.environ["TORCH_FR_TRACE_BUFFER_SIZE"] = "10" + if self.rank == self.MAIN_PROCESS_RANK: + return + pg = self._create_process_group_xccl() + device = self.local_device + # send more works than the buffer size to overwrite the previous entry + for _ in range(12): + a = [torch.ones(3, 4, device=device)] + pg.broadcast(a).wait() + torch.xpu.synchronize(device=device) + + # wait for all works to be retired + pg._wait_for_pending_works() + t = pickle.loads(torch._C._distributed_c10d._dump_xccl_trace()) + t = t["entries"] + self.assertEqual(len(t), 10) + last = t[-1] + self.assertEqual(last["retired"], True) + self.assertEqual(last["state"], "completed") + + @requires_xccl() + @skip_if_lt_x_gpu(2) + @parametrize("timing_enabled", [True, False]) + @parametrize("only_active", [True, False]) + def test_trace_while_active(self, timing_enabled, only_active): + if self.rank == self.MAIN_PROCESS_RANK: + for c in self.children_pipes: + self.assertEqual(c.recv(), "next") + for c in self.children_pipes: + c.send("next") + return + + pg = self._create_process_group_xccl() + if timing_enabled: + pg._enable_collectives_timing() + device = self.local_device + with torch.xpu.device(device): + a = torch.full((3, 4), float(self.rank), device=device) + + pg.allreduce(a).wait() + e = torch.xpu.Event() + e.record() + if self.rank != 0: + pg.allreduce(a).wait() + e.synchronize() + t = pickle.loads( + torch._C._distributed_c10d._dump_xccl_trace(onlyActive=only_active) + ) + t = t["entries"] + if only_active: + if self.rank == 0: + self.assertEqual(len(t), 0) + else: + self.assertEqual(len(t), 1) + if not only_active: + if self.rank == 0: + self.assertEqual(t[-1]["profiling_name"], "nccl:all_reduce") + self.assertEqual(t[-1]["collective_seq_id"], 1) + self.assertEqual(t[-1]["state"], "completed") + else: + self.assertEqual(t[-1]["profiling_name"], "nccl:all_reduce") + self.assertEqual(t[-1]["collective_seq_id"], 2) + + # ROCm runtime used to call uSleep(20 µs)inside the default‑signal busy-wait loop. + # Now, this sleep is removed which lets the host thread spin continuously + # Therefore, the state can either be scheduled or started before test dumps the trace. + if ( + torch.version.hip + and _get_torch_rocm_version() >= (6, 4) + and timing_enabled + ): + assert t[-1]["state"] in ("scheduled", "started") + else: + self.assertEqual( + t[-1]["state"], self.started_or_scheduled(timing_enabled) + ) + + self.parent.send("next") + self.assertEqual("next", self.parent.recv()) + if self.rank == 0: + pg.allreduce(a).wait() + torch.xpu.synchronize(device=device) + + @requires_xccl() + @skip_if_lt_x_gpu(2) + @parametrize("timing_enabled", [True, False]) + def test_trace_while_stuck(self, timing_enabled): + if self.rank == self.MAIN_PROCESS_RANK: + for c in self.children_pipes: + self.assertEqual(c.recv(), "next") + for c in self.children_pipes: + c.send("next") + return + + pg = self._create_process_group_xccl() + if timing_enabled: + pg._enable_collectives_timing() + + device = self.local_device + with torch.xpu.device(device): + a = torch.full((3, 4), float(self.rank), device=device) + + pg.allreduce(a).wait() + e = torch.xpu.Event() + e.record() + + def gather_trace(): + e.synchronize() + # give the other thread some time to fill the xpu buffer + time.sleep(5) + t = pickle.loads(torch._C._distributed_c10d._dump_xccl_trace()) + t = t["entries"] + self.assertEqual(t[-1]["profiling_name"], "nccl:all_reduce") + if self.rank == 0: + self.assertEqual(t[-1]["collective_seq_id"], 1) + self.assertEqual(t[-1]["state"], "completed") + else: + self.assertEqual(t[-1]["collective_seq_id"], 2) + self.assertEqual( + t[-1]["state"], self.started_or_scheduled(timing_enabled) + ) + self.assertIsNone(t[-1]["time_discovered_completed_ns"]) + # this will eventually cause the missing rank 0 + # to continue which will unblock the non-zero ranks + self.parent.send("next") + + if self.rank != 0: + pg.allreduce(a).wait() + th = threading.Thread(target=gather_trace) + th.start() + # fill the buffer, at around 1024 events + # this will stall + for _ in range(2000): + a = a + a + th.join() + else: + gather_trace() + + self.assertEqual("next", self.parent.recv()) + if self.rank == 0: + pg.allreduce(a).wait() + torch.xpu.synchronize(device=device) + + @requires_xccl() + @skip_if_lt_x_gpu(2) + @parametrize( + "op_sizes_per_coalesce", + [ + [(2, 3)], + [(2, 3), (5, 5), (1,)], + ], + ) + @parametrize("timing_enabled", [True, False]) + def test_batched_send_recv(self, op_sizes_per_coalesce, timing_enabled): + """ + 'WorkEnqueue' was skipped for isendirecv, leading to segfault on dump_entries when update_state tried to use + a destructed Work obj's events + """ + + if self.rank == self.MAIN_PROCESS_RANK: + return + pg = self._create_process_group_xccl() + if timing_enabled: + pg._enable_collectives_timing() + + num_coalesced_ops = 20 + ops_per_coalesce = len(op_sizes_per_coalesce) + for _ in range(num_coalesced_ops): + ops = [] + for input_sizes in op_sizes_per_coalesce: + tensor = torch.zeros(input_sizes).to(self.local_device) + if self.rank == 0: + ops.append(dist.P2POp(dist.irecv, tensor, 1)) + elif self.rank == 1: + tensor *= 2 + ops.append(dist.P2POp(dist.isend, tensor, 0)) + + dist.batch_isend_irecv(ops).pop().wait() + + torch.xpu.synchronize(device=self.local_device) + + if timing_enabled: + # wait for watchdog thread to process the queue of works + time.sleep(1) + + t = pickle.loads(torch._C._distributed_c10d._dump_xccl_trace()) + self.assertEqual(len(t["entries"]), num_coalesced_ops * (ops_per_coalesce + 1)) + + expected_record_id = 0 + expected_seq = 1 + expected_op_id = 1 + for seq in range(num_coalesced_ops): + first_op = seq * (ops_per_coalesce + 1) + coalesced_op = first_op + ops_per_coalesce + for p2p_op_idx, input_sizes in zip( + range(first_op, coalesced_op, 1), op_sizes_per_coalesce + ): + # the indivudal ops inside the coalescing group the individual op metadata, + # but not the timing info coming from the actual coalesced kernel + profiling_name = ( + "nccl:recv 0<-1" if self.rank == 0 else "nccl:send 1->0" + ) + self.assertEqual( + t["entries"][p2p_op_idx]["record_id"], expected_record_id + ) + expected_record_id += 1 + self.assertEqual( + t["entries"][p2p_op_idx]["profiling_name"], profiling_name + ) + # we don't increment collective_seq_id for p2p ops. + self.assertEqual(t["entries"][p2p_op_idx]["collective_seq_id"], 0) + self.assertEqual(t["entries"][p2p_op_idx]["p2p_seq_id"], expected_seq) + self.assertEqual(t["entries"][p2p_op_idx]["op_id"], expected_op_id) + expected_op_id += 1 + self.assertEqual(t["entries"][p2p_op_idx]["input_sizes"], [input_sizes]) + self.assertEqual( + t["entries"][p2p_op_idx]["output_sizes"], [input_sizes] + ) + # duration doesn't get tagged onto individual ops yet, nor is their state updated + self.assertEqual(t["entries"][p2p_op_idx]["state"], "scheduled") + self.assertTrue("duration_ms" not in t["entries"][p2p_op_idx]) + + # the coalesced op has no metadata but indicates that coalescing was used, + # and accurately reflects the timing and state info for the whole group + self.assertEqual( + t["entries"][coalesced_op]["record_id"], expected_record_id + ) + expected_record_id += 1 + self.assertEqual( + t["entries"][coalesced_op]["profiling_name"], "nccl:coalesced" + ) + self.assertEqual(t["entries"][coalesced_op]["p2p_seq_id"], expected_seq) + expected_seq += 1 + self.assertEqual(t["entries"][coalesced_op]["state"], "completed") + self.assertEqual(t["entries"][coalesced_op]["input_sizes"], []) + self.assertEqual(t["entries"][coalesced_op]["output_sizes"], []) + if timing_enabled: + duration = t["entries"][coalesced_op]["duration_ms"] + self.assertTrue(0.001 < duration < 10000, duration) + else: + self.assertTrue("duration_ms" not in t["entries"][coalesced_op]) + self.assertEqual(t["entries"][coalesced_op]["timeout_ms"], 600000) + + @requires_xccl() + @skip_if_lt_x_gpu(2) + @parametrize( + "op_sizes", + [ + [(2, 3)], + [(2, 3), (5, 5), (1,)], + ], + ) + @parametrize("timing_enabled", [True, False]) + def test_individual_send_recv(self, op_sizes, timing_enabled): + """ + 'WorkEnqueue' was skipped for isendirecv, leading to segfault on dump_entries when update_state tried to use + a destructed Work obj's events + """ + + if self.rank == self.MAIN_PROCESS_RANK: + return + pg = self._create_process_group_xccl() + if timing_enabled: + pg._enable_collectives_timing() + num_repeats = 10 + ops_per_repeat = len(op_sizes) + for _ in range(num_repeats): + for input_sizes in op_sizes: + tensor = torch.zeros(input_sizes).to(self.local_device) + if self.rank == 0: + dist.recv(tensor, 1) + elif self.rank == 1: + tensor *= 2 + dist.send(tensor, 0) + + torch.xpu.synchronize(device=self.local_device) + if timing_enabled: + # wait for watchdog thread to process the queue of works + time.sleep(1) + + t = pickle.loads(torch._C._distributed_c10d._dump_xccl_trace()) + self.assertEqual(len(t["entries"]), num_repeats * (ops_per_repeat)) + expected_seq = 1 + expected_op_id = 1 + for seq in range(num_repeats * ops_per_repeat): + input_sizes = op_sizes[seq % ops_per_repeat] + profiling_name = "nccl:recv 0<-1" if self.rank == 0 else "nccl:send 1->0" + self.assertEqual(t["entries"][seq]["profiling_name"], profiling_name) + # we don't increment collective_seq_id for p2p ops. + self.assertEqual(t["entries"][seq]["collective_seq_id"], 0) + self.assertEqual(t["entries"][seq]["p2p_seq_id"], expected_seq) + expected_seq += 1 + self.assertEqual(t["entries"][seq]["op_id"], expected_op_id) + expected_op_id += 1 + self.assertEqual(t["entries"][seq]["input_sizes"], [input_sizes]) + self.assertEqual(t["entries"][seq]["output_sizes"], [input_sizes]) + self.assertEqual(t["entries"][seq]["state"], "completed") + + if timing_enabled: + duration = t["entries"][seq]["duration_ms"] + self.assertTrue(0.001 < duration < 10000, duration) + else: + self.assertTrue("duration_ms" not in t["entries"][seq]) + + class SetDeviceMethod(Enum): TORCH_XPU_SET = auto() # torch.xpu.set_device COLLECTIVE_ARGUMENT = auto() # broadcast_object_list(device=) From 0ee7de7ed2eeb02464992139f161cc2937c4b6cb Mon Sep 17 00:00:00 2001 From: frost-intel Date: Tue, 29 Jul 2025 19:45:44 +0000 Subject: [PATCH 09/15] Fix tests --- src/xccl/ProcessGroupXCCL.cpp | 19 +- test/xpu/distributed/test_c10d_xccl.py | 338 ++----------------------- 2 files changed, 27 insertions(+), 330 deletions(-) diff --git a/src/xccl/ProcessGroupXCCL.cpp b/src/xccl/ProcessGroupXCCL.cpp index 5182489c21..c6ce63cca5 100644 --- a/src/xccl/ProcessGroupXCCL.cpp +++ b/src/xccl/ProcessGroupXCCL.cpp @@ -362,13 +362,14 @@ ProcessGroupXCCL::ProcessGroupXCCL( this->setGroupUid(options_->group_name); // In PGNCCL, the pg ranks are recorded on comm setup in each op, but we just do it here. + const auto XcclVersion = getXcclVersion(); FlightRecorderXCCL::get()->record_pg_ranks( std::make_tuple(pg_uid_, pg_desc_), groupRanks()); + FlightRecorderXCCL::get()->record_accelerator_version(XcclVersion, "xccl_version"); init(); const std::string OFF = "OFF"; std::string torch_distributed_debug = getCvarString({"TORCH_DISTRIBUTED_DEBUG"}, OFF.c_str()); - const auto XcclVersion = getXcclVersion(); LOG(INFO) << logPrefix() << "ProcessGroupXCCL initialization options: " << "size: " << size << ", global rank: " << rank_; @@ -416,12 +417,12 @@ const std::vector& ProcessGroupXCCL::groupRanks() const { return options_->global_ranks_in_group; } -void ProcessGroupXCCL::setStartedPgStatus( +void ProcessGroupXCCL::setEnqueuedPgStatus( c10::intrusive_ptr work) { - pgStatus_->lastStartedSeq = static_cast(work->getSequencenumber()); - pgStatus_->lastStartedWorkName = opTypeToString(work->opType_); - pgStatus_->lastStartedNumelIn = work->numelIn_; - pgStatus_->lastStartedNumelOut = work->numelOut_; + pgStatus_->lastEnqueuedSeq = static_cast(work->getSequencenumber()); + pgStatus_->lastEnqueuedWorkName = opTypeToString(work->opType_); + pgStatus_->lastEnqueuedNumelIn = work->numelIn_; + pgStatus_->lastEnqueuedNumelOut = work->numelOut_; } void ProcessGroupXCCL::setCompletedPgStatus( @@ -626,7 +627,7 @@ c10::intrusive_ptr ProcessGroupXCCL::endCoalescing(OpType optype) { groupEnd(); work->xcclEndEvent_->record(stream); - setStartedPgStatus(work); + setEnqueuedPgStatus(work); coalescing_state_ = 0; coalescedComm_ = nullptr; @@ -765,7 +766,7 @@ c10::intrusive_ptr ProcessGroupXCCL::collective( for (const auto& output : outputs) { work->numelOut_ += output.numel(); } - setStartedPgStatus(work); + setEnqueuedPgStatus(work); return asyncOp ? work : nullptr; } @@ -867,7 +868,7 @@ c10::intrusive_ptr ProcessGroupXCCL::pointToPoint( }); work->numelIn_ = work->numelOut_ = tensor.numel(); - setStartedPgStatus(work); + setEnqueuedPgStatus(work); return work; } else { FlightRecorderXCCL::get()->record( diff --git a/test/xpu/distributed/test_c10d_xccl.py b/test/xpu/distributed/test_c10d_xccl.py index 0874a90853..77ed7d50b1 100644 --- a/test/xpu/distributed/test_c10d_xccl.py +++ b/test/xpu/distributed/test_c10d_xccl.py @@ -13,6 +13,7 @@ from unittest import mock import torch +import torch._C._distributed_c10d import torch.distributed as c10d if not c10d.is_available() or not c10d.is_xccl_available(): @@ -23,6 +24,7 @@ import torch.testing._internal.common_utils as common from torch.testing._internal.common_distributed import MultiProcessTestCase from torch.testing._internal.common_utils import ( + instantiate_parametrized_tests, parametrize, retry_on_connect_failures, run_tests, @@ -561,6 +563,7 @@ def setUp(self): super().setUp() os.environ["TORCH_FR_BUFFER_SIZE"] = "1000" self.tempdir = tempfile.TemporaryDirectory() + os.environ["TORCH_FR_DUMP_TEMP_FILE"] = self._trace_basename() os.environ["TORCH_FR_DEBUG_INFO_PIPE_FILE"] = self._trace_basename() self._spawn_processes() @@ -642,17 +645,17 @@ def _trace_basename(self): def _trace_name(self, rank): return self._trace_basename() + str(rank) - def started_or_scheduled(self, timing_enabled): + def started_or_scheduled(self, timing_enabled=False): return "started" if timing_enabled else "scheduled" class XCCLTraceTest(XCCLTraceTestBase): - def _verify_trace(self, t, include_collectives, timing_enabled, is_json): + def _verify_trace(self, t, include_collectives, is_json, timing_enabled=False): ver = t["version"] self.assertEqual(ver, "2.9") - xccl_version = t["nccl_version"] - torch_xccl_version = torch.distributed.get_xccl_version() - self.assertEqual(xccl_version, ".".join(str(v) for v in torch_xccl_version)) + xccl_version = t["xccl_version"] + torch_xccl_version = torch._C._distributed_c10d.get_xccl_version() + self.assertEqual(xccl_version, torch_xccl_version) pg_config = t["pg_config"] self.assertEqual(len(pg_config), 1) default_pg_info = pg_config["0"] @@ -738,9 +741,8 @@ def set_thread_name(self, name): @requires_xccl() @skip_if_lt_x_gpu(2) - @parametrize("timing_enabled", [True, False]) @parametrize("include_collectives", [True, False]) - def test_short_pickle(self, timing_enabled, include_collectives): + def test_short_pickle(self, include_collectives, timing_enabled=False): if self.rank == self.MAIN_PROCESS_RANK: return pg = self._create_process_group_xccl() @@ -763,8 +765,8 @@ def test_short_pickle(self, timing_enabled, include_collectives): self._verify_trace( t, include_collectives=include_collectives, - timing_enabled=timing_enabled, is_json=True, + timing_enabled=timing_enabled, ) dist.destroy_process_group() @@ -807,7 +809,7 @@ def open_file_with_timeout(file_path, mode, timeout=1.0): @requires_xccl() @skip_if_lt_x_gpu(2) def test_long(self): - os.environ["TORCH_FR_TRACE_BUFFER_SIZE"] = "10" + os.environ["TORCH_FR_BUFFER_SIZE"] = "10" if self.rank == self.MAIN_PROCESS_RANK: return pg = self._create_process_group_xccl() @@ -831,9 +833,9 @@ def test_long(self): self.assertEqual(len(t), 10) first = t[0] last = t[-1] - self.assertEqual(last["profiling_name"], "nccl:all_reduce") + self.assertEqual(last["profiling_name"], "xccl:all_reduce") self.assertEqual(last["state"], "completed") - self.assertIn("test_c10d_nccl.py", str(last["frames"])) + self.assertIn("test_c10d_xccl.py", str(last["frames"])) self.assertEqual(last["input_sizes"], ((3, 4),)) self.assertEqual(last["input_dtypes"], ["Float"]) self.assertEqual(last["output_sizes"], ((3, 4),)) @@ -845,7 +847,7 @@ def test_long(self): @requires_xccl() @skip_if_lt_x_gpu(2) def test_barrier_profiling(self): - os.environ["TORCH_FR_TRACE_BUFFER_SIZE"] = "10" + os.environ["TORCH_FR_BUFFER_SIZE"] = "10" if self.rank == self.MAIN_PROCESS_RANK: return pg = self._create_process_group_xccl() @@ -860,317 +862,11 @@ def test_barrier_profiling(self): self.assertEqual(len(t), 2) first = t[0] last = t[-1] - self.assertEqual(first["profiling_name"], "nccl:all_reduce_barrier") - self.assertEqual(last["profiling_name"], "nccl:all_reduce") + self.assertEqual(first["profiling_name"], "xccl:all_reduce_barrier") + self.assertEqual(last["profiling_name"], "xccl:all_reduce") dist.destroy_process_group() - @requires_xccl() - @skip_if_lt_x_gpu(2) - def test_trace_while_all_works_retired(self): - os.environ["TORCH_FR_TRACE_BUFFER_SIZE"] = "10" - if self.rank == self.MAIN_PROCESS_RANK: - return - pg = self._create_process_group_xccl() - device = self.local_device - # send more works than the buffer size to overwrite the previous entry - for _ in range(12): - a = [torch.ones(3, 4, device=device)] - pg.broadcast(a).wait() - torch.xpu.synchronize(device=device) - - # wait for all works to be retired - pg._wait_for_pending_works() - t = pickle.loads(torch._C._distributed_c10d._dump_xccl_trace()) - t = t["entries"] - self.assertEqual(len(t), 10) - last = t[-1] - self.assertEqual(last["retired"], True) - self.assertEqual(last["state"], "completed") - - @requires_xccl() - @skip_if_lt_x_gpu(2) - @parametrize("timing_enabled", [True, False]) - @parametrize("only_active", [True, False]) - def test_trace_while_active(self, timing_enabled, only_active): - if self.rank == self.MAIN_PROCESS_RANK: - for c in self.children_pipes: - self.assertEqual(c.recv(), "next") - for c in self.children_pipes: - c.send("next") - return - - pg = self._create_process_group_xccl() - if timing_enabled: - pg._enable_collectives_timing() - device = self.local_device - with torch.xpu.device(device): - a = torch.full((3, 4), float(self.rank), device=device) - - pg.allreduce(a).wait() - e = torch.xpu.Event() - e.record() - if self.rank != 0: - pg.allreduce(a).wait() - e.synchronize() - t = pickle.loads( - torch._C._distributed_c10d._dump_xccl_trace(onlyActive=only_active) - ) - t = t["entries"] - if only_active: - if self.rank == 0: - self.assertEqual(len(t), 0) - else: - self.assertEqual(len(t), 1) - if not only_active: - if self.rank == 0: - self.assertEqual(t[-1]["profiling_name"], "nccl:all_reduce") - self.assertEqual(t[-1]["collective_seq_id"], 1) - self.assertEqual(t[-1]["state"], "completed") - else: - self.assertEqual(t[-1]["profiling_name"], "nccl:all_reduce") - self.assertEqual(t[-1]["collective_seq_id"], 2) - - # ROCm runtime used to call uSleep(20 µs)inside the default‑signal busy-wait loop. - # Now, this sleep is removed which lets the host thread spin continuously - # Therefore, the state can either be scheduled or started before test dumps the trace. - if ( - torch.version.hip - and _get_torch_rocm_version() >= (6, 4) - and timing_enabled - ): - assert t[-1]["state"] in ("scheduled", "started") - else: - self.assertEqual( - t[-1]["state"], self.started_or_scheduled(timing_enabled) - ) - - self.parent.send("next") - self.assertEqual("next", self.parent.recv()) - if self.rank == 0: - pg.allreduce(a).wait() - torch.xpu.synchronize(device=device) - - @requires_xccl() - @skip_if_lt_x_gpu(2) - @parametrize("timing_enabled", [True, False]) - def test_trace_while_stuck(self, timing_enabled): - if self.rank == self.MAIN_PROCESS_RANK: - for c in self.children_pipes: - self.assertEqual(c.recv(), "next") - for c in self.children_pipes: - c.send("next") - return - - pg = self._create_process_group_xccl() - if timing_enabled: - pg._enable_collectives_timing() - - device = self.local_device - with torch.xpu.device(device): - a = torch.full((3, 4), float(self.rank), device=device) - - pg.allreduce(a).wait() - e = torch.xpu.Event() - e.record() - - def gather_trace(): - e.synchronize() - # give the other thread some time to fill the xpu buffer - time.sleep(5) - t = pickle.loads(torch._C._distributed_c10d._dump_xccl_trace()) - t = t["entries"] - self.assertEqual(t[-1]["profiling_name"], "nccl:all_reduce") - if self.rank == 0: - self.assertEqual(t[-1]["collective_seq_id"], 1) - self.assertEqual(t[-1]["state"], "completed") - else: - self.assertEqual(t[-1]["collective_seq_id"], 2) - self.assertEqual( - t[-1]["state"], self.started_or_scheduled(timing_enabled) - ) - self.assertIsNone(t[-1]["time_discovered_completed_ns"]) - # this will eventually cause the missing rank 0 - # to continue which will unblock the non-zero ranks - self.parent.send("next") - - if self.rank != 0: - pg.allreduce(a).wait() - th = threading.Thread(target=gather_trace) - th.start() - # fill the buffer, at around 1024 events - # this will stall - for _ in range(2000): - a = a + a - th.join() - else: - gather_trace() - - self.assertEqual("next", self.parent.recv()) - if self.rank == 0: - pg.allreduce(a).wait() - torch.xpu.synchronize(device=device) - - @requires_xccl() - @skip_if_lt_x_gpu(2) - @parametrize( - "op_sizes_per_coalesce", - [ - [(2, 3)], - [(2, 3), (5, 5), (1,)], - ], - ) - @parametrize("timing_enabled", [True, False]) - def test_batched_send_recv(self, op_sizes_per_coalesce, timing_enabled): - """ - 'WorkEnqueue' was skipped for isendirecv, leading to segfault on dump_entries when update_state tried to use - a destructed Work obj's events - """ - - if self.rank == self.MAIN_PROCESS_RANK: - return - pg = self._create_process_group_xccl() - if timing_enabled: - pg._enable_collectives_timing() - - num_coalesced_ops = 20 - ops_per_coalesce = len(op_sizes_per_coalesce) - for _ in range(num_coalesced_ops): - ops = [] - for input_sizes in op_sizes_per_coalesce: - tensor = torch.zeros(input_sizes).to(self.local_device) - if self.rank == 0: - ops.append(dist.P2POp(dist.irecv, tensor, 1)) - elif self.rank == 1: - tensor *= 2 - ops.append(dist.P2POp(dist.isend, tensor, 0)) - - dist.batch_isend_irecv(ops).pop().wait() - - torch.xpu.synchronize(device=self.local_device) - - if timing_enabled: - # wait for watchdog thread to process the queue of works - time.sleep(1) - - t = pickle.loads(torch._C._distributed_c10d._dump_xccl_trace()) - self.assertEqual(len(t["entries"]), num_coalesced_ops * (ops_per_coalesce + 1)) - - expected_record_id = 0 - expected_seq = 1 - expected_op_id = 1 - for seq in range(num_coalesced_ops): - first_op = seq * (ops_per_coalesce + 1) - coalesced_op = first_op + ops_per_coalesce - for p2p_op_idx, input_sizes in zip( - range(first_op, coalesced_op, 1), op_sizes_per_coalesce - ): - # the indivudal ops inside the coalescing group the individual op metadata, - # but not the timing info coming from the actual coalesced kernel - profiling_name = ( - "nccl:recv 0<-1" if self.rank == 0 else "nccl:send 1->0" - ) - self.assertEqual( - t["entries"][p2p_op_idx]["record_id"], expected_record_id - ) - expected_record_id += 1 - self.assertEqual( - t["entries"][p2p_op_idx]["profiling_name"], profiling_name - ) - # we don't increment collective_seq_id for p2p ops. - self.assertEqual(t["entries"][p2p_op_idx]["collective_seq_id"], 0) - self.assertEqual(t["entries"][p2p_op_idx]["p2p_seq_id"], expected_seq) - self.assertEqual(t["entries"][p2p_op_idx]["op_id"], expected_op_id) - expected_op_id += 1 - self.assertEqual(t["entries"][p2p_op_idx]["input_sizes"], [input_sizes]) - self.assertEqual( - t["entries"][p2p_op_idx]["output_sizes"], [input_sizes] - ) - # duration doesn't get tagged onto individual ops yet, nor is their state updated - self.assertEqual(t["entries"][p2p_op_idx]["state"], "scheduled") - self.assertTrue("duration_ms" not in t["entries"][p2p_op_idx]) - - # the coalesced op has no metadata but indicates that coalescing was used, - # and accurately reflects the timing and state info for the whole group - self.assertEqual( - t["entries"][coalesced_op]["record_id"], expected_record_id - ) - expected_record_id += 1 - self.assertEqual( - t["entries"][coalesced_op]["profiling_name"], "nccl:coalesced" - ) - self.assertEqual(t["entries"][coalesced_op]["p2p_seq_id"], expected_seq) - expected_seq += 1 - self.assertEqual(t["entries"][coalesced_op]["state"], "completed") - self.assertEqual(t["entries"][coalesced_op]["input_sizes"], []) - self.assertEqual(t["entries"][coalesced_op]["output_sizes"], []) - if timing_enabled: - duration = t["entries"][coalesced_op]["duration_ms"] - self.assertTrue(0.001 < duration < 10000, duration) - else: - self.assertTrue("duration_ms" not in t["entries"][coalesced_op]) - self.assertEqual(t["entries"][coalesced_op]["timeout_ms"], 600000) - - @requires_xccl() - @skip_if_lt_x_gpu(2) - @parametrize( - "op_sizes", - [ - [(2, 3)], - [(2, 3), (5, 5), (1,)], - ], - ) - @parametrize("timing_enabled", [True, False]) - def test_individual_send_recv(self, op_sizes, timing_enabled): - """ - 'WorkEnqueue' was skipped for isendirecv, leading to segfault on dump_entries when update_state tried to use - a destructed Work obj's events - """ - - if self.rank == self.MAIN_PROCESS_RANK: - return - pg = self._create_process_group_xccl() - if timing_enabled: - pg._enable_collectives_timing() - num_repeats = 10 - ops_per_repeat = len(op_sizes) - for _ in range(num_repeats): - for input_sizes in op_sizes: - tensor = torch.zeros(input_sizes).to(self.local_device) - if self.rank == 0: - dist.recv(tensor, 1) - elif self.rank == 1: - tensor *= 2 - dist.send(tensor, 0) - - torch.xpu.synchronize(device=self.local_device) - if timing_enabled: - # wait for watchdog thread to process the queue of works - time.sleep(1) - - t = pickle.loads(torch._C._distributed_c10d._dump_xccl_trace()) - self.assertEqual(len(t["entries"]), num_repeats * (ops_per_repeat)) - expected_seq = 1 - expected_op_id = 1 - for seq in range(num_repeats * ops_per_repeat): - input_sizes = op_sizes[seq % ops_per_repeat] - profiling_name = "nccl:recv 0<-1" if self.rank == 0 else "nccl:send 1->0" - self.assertEqual(t["entries"][seq]["profiling_name"], profiling_name) - # we don't increment collective_seq_id for p2p ops. - self.assertEqual(t["entries"][seq]["collective_seq_id"], 0) - self.assertEqual(t["entries"][seq]["p2p_seq_id"], expected_seq) - expected_seq += 1 - self.assertEqual(t["entries"][seq]["op_id"], expected_op_id) - expected_op_id += 1 - self.assertEqual(t["entries"][seq]["input_sizes"], [input_sizes]) - self.assertEqual(t["entries"][seq]["output_sizes"], [input_sizes]) - self.assertEqual(t["entries"][seq]["state"], "completed") - - if timing_enabled: - duration = t["entries"][seq]["duration_ms"] - self.assertTrue(0.001 < duration < 10000, duration) - else: - self.assertTrue("duration_ms" not in t["entries"][seq]) - +instantiate_parametrized_tests(XCCLTraceTest) class SetDeviceMethod(Enum): TORCH_XPU_SET = auto() # torch.xpu.set_device From afa2b09b534fa9e5c1c8b1df897be18bdeaa8ccb Mon Sep 17 00:00:00 2001 From: frost-intel Date: Tue, 29 Jul 2025 20:09:13 +0000 Subject: [PATCH 10/15] lint --- src/xccl/ProcessGroupXCCL.hpp | 2 +- src/xccl/ProcessGroupXCCLMonitor.cpp | 132 +++++++++---------- src/xccl/ProcessGroupXCCLMonitor.hpp | 186 +++++++++++++-------------- 3 files changed, 160 insertions(+), 160 deletions(-) diff --git a/src/xccl/ProcessGroupXCCL.hpp b/src/xccl/ProcessGroupXCCL.hpp index 8e8e40f0ca..f099eaea1e 100644 --- a/src/xccl/ProcessGroupXCCL.hpp +++ b/src/xccl/ProcessGroupXCCL.hpp @@ -394,7 +394,7 @@ class TORCH_API ProcessGroupXCCL : public Backend { c10::DeviceIndex guessDeviceId() const; const std::vector& groupRanks() const; - void setStartedPgStatus(c10::intrusive_ptr work); + void setEnqueuedPgStatus(c10::intrusive_ptr work); void setCompletedPgStatus(c10::intrusive_ptr work); bool dumpDebuggingInfo(bool includeStackTrace = true); diff --git a/src/xccl/ProcessGroupXCCLMonitor.cpp b/src/xccl/ProcessGroupXCCLMonitor.cpp index 07eb601c79..cefc6d4022 100644 --- a/src/xccl/ProcessGroupXCCLMonitor.cpp +++ b/src/xccl/ProcessGroupXCCLMonitor.cpp @@ -1,66 +1,66 @@ -#ifdef USE_C10D_XCCL - -#include -#include -namespace c10d { - -HeartbeatMonitorXCCL::HeartbeatMonitorXCCL(ProcessGroupXCCL* pg) { - pg_ = pg; - coordCheckIntervalMilSec_ = getCvarInt(TORCH_XCCL_COORD_CHECK_MILSEC, 1000); - LOG(INFO) - << pg_->logPrefix() << "HeartbeatMonitor environments: " - << "TORCH_XCCL_COORD_CHECK_MILSEC: " << coordCheckIntervalMilSec_; -} - -void HeartbeatMonitorXCCL::stop() { - terminateHeartbeatMonitorThread_.store(true); - monitorWakeUpCV_.notify_one(); -} - -void HeartbeatMonitorXCCL::start() { - TORCH_CHECK( - !xcclHeartbeatMonitorThread_.joinable(), - "HeartbeatMonitor thread already started"); - xcclHeartbeatMonitorThread_ = - std::thread(&HeartbeatMonitorXCCL::runLoop, this); -} - -void HeartbeatMonitorXCCL::join() { - if (xcclHeartbeatMonitorThread_.joinable()) { - xcclHeartbeatMonitorThread_.join(); - LOG(INFO) << pg_->logPrefix() - << "ProcessGroupXCCL heart beat monitor thread joined."; - } -} - -void HeartbeatMonitorXCCL::runLoop() { - c10::setThreadName("pt_xccl_heartbt"); - - std::optional dumpPipe = std::nullopt; - // We only need to dump once per PG, so we use local_id_ == 0 for the first PG - if (pg_->local_id_ == 0) { - // DumpPipe is one per-trainer process - dumpPipe.emplace(pg_->getRank()); - while (true) { - std::unique_lock lock(monitorMutex_); - if (monitorWakeUpCV_.wait_for( - lock, std::chrono::milliseconds(coordCheckIntervalMilSec_), [&] { - return terminateHeartbeatMonitorThread_.load(); - })) { - return; - } - // Write to pipe files for all ranks to dump debug info - if (dumpPipe.has_value() && dumpPipe->shouldDump()) { - LOG(INFO) << pg_->logPrefix() - << "Dump signal received through pipe, triggering FR dump."; - std::future fut = std::async(std::launch::async, [this]() { - return this->pg_->dumpDebuggingInfo(); - }); - } - } - } -} - -} // namespace c10d - -#endif // USE_C10D_XCCL +#ifdef USE_C10D_XCCL + +#include +#include +namespace c10d { + +HeartbeatMonitorXCCL::HeartbeatMonitorXCCL(ProcessGroupXCCL* pg) { + pg_ = pg; + coordCheckIntervalMilSec_ = getCvarInt(TORCH_XCCL_COORD_CHECK_MILSEC, 1000); + LOG(INFO) + << pg_->logPrefix() << "HeartbeatMonitor environments: " + << "TORCH_XCCL_COORD_CHECK_MILSEC: " << coordCheckIntervalMilSec_; +} + +void HeartbeatMonitorXCCL::stop() { + terminateHeartbeatMonitorThread_.store(true); + monitorWakeUpCV_.notify_one(); +} + +void HeartbeatMonitorXCCL::start() { + TORCH_CHECK( + !xcclHeartbeatMonitorThread_.joinable(), + "HeartbeatMonitor thread already started"); + xcclHeartbeatMonitorThread_ = + std::thread(&HeartbeatMonitorXCCL::runLoop, this); +} + +void HeartbeatMonitorXCCL::join() { + if (xcclHeartbeatMonitorThread_.joinable()) { + xcclHeartbeatMonitorThread_.join(); + LOG(INFO) << pg_->logPrefix() + << "ProcessGroupXCCL heart beat monitor thread joined."; + } +} + +void HeartbeatMonitorXCCL::runLoop() { + c10::setThreadName("pt_xccl_heartbt"); + + std::optional dumpPipe = std::nullopt; + // We only need to dump once per PG, so we use local_id_ == 0 for the first PG + if (pg_->local_id_ == 0) { + // DumpPipe is one per-trainer process + dumpPipe.emplace(pg_->getRank()); + while (true) { + std::unique_lock lock(monitorMutex_); + if (monitorWakeUpCV_.wait_for( + lock, std::chrono::milliseconds(coordCheckIntervalMilSec_), [&] { + return terminateHeartbeatMonitorThread_.load(); + })) { + return; + } + // Write to pipe files for all ranks to dump debug info + if (dumpPipe.has_value() && dumpPipe->shouldDump()) { + LOG(INFO) << pg_->logPrefix() + << "Dump signal received through pipe, triggering FR dump."; + std::future fut = std::async(std::launch::async, [this]() { + return this->pg_->dumpDebuggingInfo(); + }); + } + } + } +} + +} // namespace c10d + +#endif // USE_C10D_XCCL diff --git a/src/xccl/ProcessGroupXCCLMonitor.hpp b/src/xccl/ProcessGroupXCCLMonitor.hpp index 629a13025f..8924c4e43e 100644 --- a/src/xccl/ProcessGroupXCCLMonitor.hpp +++ b/src/xccl/ProcessGroupXCCLMonitor.hpp @@ -1,93 +1,93 @@ -#pragma once - -#include -#include -#include -#include - -#ifdef USE_C10D_XCCL -namespace c10d { - -// This definition will later be moved to a common header for ProcessGroups NCCL/Gloo/XCCL -#if defined(__linux__) -struct DumpPipe { - DumpPipe(int rank) { - std::string fileStem = - getCvarString({"TORCH_FR_DEBUG_INFO_PIPE_FILE"}, ""); - if (fileStem.empty() || - getCvarInt({"TORCH_FR_BUFFER_SIZE"}, 0) <= 0) { - return; - } - TORCH_CHECK(!fileStem.empty(), "TORCH_FR_DEBUG_INFO_PIPE_FILE is empty"); - std::string filename = c10::str(fileStem, rank, ".pipe"); - TORCH_CHECK( - unlink(filename.c_str()) != -1 || errno == ENOENT, - "Error removing existing named pipe ", - filename, - ", Error: ", - std::strerror(errno)); - TORCH_CHECK( - mkfifo(filename.c_str(), 0666) != -1, - "Error creating named pipe ", - filename, - ", Error: ", - std::strerror(errno)); - fd_ = open(filename.c_str(), O_RDONLY | O_NONBLOCK); - LOG(INFO) << "Pipe file " << filename - << " has been opened, write to it to trigger ProcessGroup Debug Dump."; - TORCH_CHECK(fd_ != -1, "Error opening named pipe ", filename); - } - bool shouldDump() { - if (fd_ == -1) { - return false; - } - // NOLINTNEXTLINE(*array*) - char buf[128]{}; - // non-blocking from O_NONBLOCK above. - // Ignore EINTR because we already will poll this - // again later. - ssize_t bytesRead = read(fd_, &buf, 128); - return bytesRead > 0; - } - ~DumpPipe() { - if (fd_ != -1) { - close(fd_); - } - } - - private: - int fd_ = -1; -}; -#else -struct DumpPipe { - DumpPipe(int rank) {} - bool shouldDump() { - return false; - } -}; -#endif - -class ProcessGroupXCCL; -class HeartbeatMonitorXCCL { - public: - HeartbeatMonitorXCCL(ProcessGroupXCCL* pg); - virtual ~HeartbeatMonitorXCCL() = default; - - std::string getXCCLTimeoutErrorMsg(const std::string& extraMsg); - void start(); - void join(); - virtual void runLoop(); - void stop(); - - protected: - ProcessGroupXCCL* pg_; - - private: - int coordCheckIntervalMilSec_; - std::condition_variable monitorWakeUpCV_; - std::mutex monitorMutex_; - std::thread xcclHeartbeatMonitorThread_; - std::atomic terminateHeartbeatMonitorThread_{false}; -}; -} -#endif // USE_C10D_XCCL +#pragma once + +#include +#include +#include +#include + +#ifdef USE_C10D_XCCL +namespace c10d { + +// This definition will later be moved to a common header for ProcessGroups NCCL/Gloo/XCCL +#if defined(__linux__) +struct DumpPipe { + DumpPipe(int rank) { + std::string fileStem = + getCvarString({"TORCH_FR_DEBUG_INFO_PIPE_FILE"}, ""); + if (fileStem.empty() || + getCvarInt({"TORCH_FR_BUFFER_SIZE"}, 0) <= 0) { + return; + } + TORCH_CHECK(!fileStem.empty(), "TORCH_FR_DEBUG_INFO_PIPE_FILE is empty"); + std::string filename = c10::str(fileStem, rank, ".pipe"); + TORCH_CHECK( + unlink(filename.c_str()) != -1 || errno == ENOENT, + "Error removing existing named pipe ", + filename, + ", Error: ", + std::strerror(errno)); + TORCH_CHECK( + mkfifo(filename.c_str(), 0666) != -1, + "Error creating named pipe ", + filename, + ", Error: ", + std::strerror(errno)); + fd_ = open(filename.c_str(), O_RDONLY | O_NONBLOCK); + LOG(INFO) << "Pipe file " << filename + << " has been opened, write to it to trigger ProcessGroup Debug Dump."; + TORCH_CHECK(fd_ != -1, "Error opening named pipe ", filename); + } + bool shouldDump() { + if (fd_ == -1) { + return false; + } + // NOLINTNEXTLINE(*array*) + char buf[128]{}; + // non-blocking from O_NONBLOCK above. + // Ignore EINTR because we already will poll this + // again later. + ssize_t bytesRead = read(fd_, &buf, 128); + return bytesRead > 0; + } + ~DumpPipe() { + if (fd_ != -1) { + close(fd_); + } + } + + private: + int fd_ = -1; +}; +#else +struct DumpPipe { + DumpPipe(int rank) {} + bool shouldDump() { + return false; + } +}; +#endif + +class ProcessGroupXCCL; +class HeartbeatMonitorXCCL { + public: + HeartbeatMonitorXCCL(ProcessGroupXCCL* pg); + virtual ~HeartbeatMonitorXCCL() = default; + + std::string getXCCLTimeoutErrorMsg(const std::string& extraMsg); + void start(); + void join(); + virtual void runLoop(); + void stop(); + + protected: + ProcessGroupXCCL* pg_; + + private: + int coordCheckIntervalMilSec_; + std::condition_variable monitorWakeUpCV_; + std::mutex monitorMutex_; + std::thread xcclHeartbeatMonitorThread_; + std::atomic terminateHeartbeatMonitorThread_{false}; +}; +} +#endif // USE_C10D_XCCL From 9af46bcac5d484b337a93f80f9e5158bad4f97eb Mon Sep 17 00:00:00 2001 From: frost-intel Date: Wed, 30 Jul 2025 13:37:36 +0000 Subject: [PATCH 11/15] lint --- test/xpu/distributed/test_c10d_xccl.py | 1 + 1 file changed, 1 insertion(+) diff --git a/test/xpu/distributed/test_c10d_xccl.py b/test/xpu/distributed/test_c10d_xccl.py index 694d1b58d5..e143be041e 100644 --- a/test/xpu/distributed/test_c10d_xccl.py +++ b/test/xpu/distributed/test_c10d_xccl.py @@ -940,6 +940,7 @@ def test_barrier_profiling(self): self.assertEqual(last["profiling_name"], "xccl:all_reduce") dist.destroy_process_group() + instantiate_parametrized_tests(XCCLTraceTest) instantiate_parametrized_tests(ProcessGroupXCCLTest) From 29b5b11da23e782e339187596d021043b2d494f2 Mon Sep 17 00:00:00 2001 From: frost-intel Date: Wed, 6 Aug 2025 19:11:39 +0000 Subject: [PATCH 12/15] Change record version API --- src/xccl/ProcessGroupXCCL.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/xccl/ProcessGroupXCCL.cpp b/src/xccl/ProcessGroupXCCL.cpp index dc333f9d93..7c7324211f 100644 --- a/src/xccl/ProcessGroupXCCL.cpp +++ b/src/xccl/ProcessGroupXCCL.cpp @@ -366,7 +366,7 @@ ProcessGroupXCCL::ProcessGroupXCCL( const auto XcclVersion = getXcclVersion(); FlightRecorderXCCL::get()->record_pg_ranks( std::make_tuple(pg_uid_, pg_desc_), groupRanks()); - FlightRecorderXCCL::get()->record_accelerator_version(XcclVersion, "xccl_version"); + FlightRecorderXCCL::get()->record_accelerator_version(XcclVersion); enableNanCheck_ = getCvarBool(TORCH_XCCL_NAN_CHECK, false); init(); const std::string OFF = "OFF"; @@ -412,7 +412,7 @@ bool ProcessGroupXCCL::dumpDebuggingInfo(bool includeStackTrace /*=true*/) { } const std::vector& ProcessGroupXCCL::groupRanks() const { - if (options_->global_ranks_in_group.empty()) { + if (options_->global_ranks_in_group.empty() && local_id_ == 0) { static std::vector globalRanks(size_); std::iota(globalRanks.begin(), globalRanks.end(), 0); return globalRanks; From 64e99ceced52260ac3df0eb430af35d0c08c3e61 Mon Sep 17 00:00:00 2001 From: frost-intel Date: Thu, 7 Aug 2025 19:41:06 +0000 Subject: [PATCH 13/15] Remove test for different PR --- test/xpu/distributed/test_c10d_xccl.py | 396 +------------------------ 1 file changed, 1 insertion(+), 395 deletions(-) diff --git a/test/xpu/distributed/test_c10d_xccl.py b/test/xpu/distributed/test_c10d_xccl.py index e143be041e..0625a6993f 100644 --- a/test/xpu/distributed/test_c10d_xccl.py +++ b/test/xpu/distributed/test_c10d_xccl.py @@ -1,21 +1,14 @@ # Owner(s): ["oncall: distributed"] -import json import math import os -import pickle -import random -import signal import sys -import tempfile -import threading import time -from datetime import datetime, timedelta +from datetime import timedelta from enum import auto, Enum from unittest import mock import torch -import torch._C._distributed_c10d import torch.distributed as c10d if not c10d.is_available() or not c10d.is_xccl_available(): @@ -26,9 +19,6 @@ import torch.testing._internal.common_utils as common from torch.testing._internal.common_distributed import MultiProcessTestCase from torch.testing._internal.common_utils import ( - instantiate_parametrized_tests, - IS_SANDCASTLE, - parametrize, retry_on_connect_failures, run_tests, skip_but_pass_in_sandcastle_if, @@ -220,15 +210,6 @@ def _create_process_group_xccl( def setUp(self): super().setUp() - TEST_NAN_ASSERT_RETURN = 0 if IS_SANDCASTLE else -signal.SIGABRT - self.special_return_code_checks = { - self.test_nan_assert_float16.__wrapped__: TEST_NAN_ASSERT_RETURN, - self.test_nan_assert_float32.__wrapped__: TEST_NAN_ASSERT_RETURN, - self.test_nan_assert_float64.__wrapped__: TEST_NAN_ASSERT_RETURN, - self.test_nan_assert_bfloat16.__wrapped__: TEST_NAN_ASSERT_RETURN, - self.test_nan_assert_float8_e4m3fn.__wrapped__: TEST_NAN_ASSERT_RETURN, - self.test_nan_assert_float8_e5m2.__wrapped__: TEST_NAN_ASSERT_RETURN, - } self._spawn_processes() def tearDown(self): @@ -307,68 +288,6 @@ def test_set_process_group_desc(self): pg_2 = c10d.new_group([0, 1]) self.assertEqual(pg_2.group_desc, "undefined") - @requires_xccl() - @parametrize( - "type", - [ - torch.float16, - torch.float32, - torch.float64, - torch.bfloat16, - torch.float8_e4m3fn, - torch.float8_e5m2, - ], - ) - def test_nan_assert(self, type): - # Expecting a device-side error when NaN is detected - os.environ["TORCH_XCCL_NAN_CHECK"] = "1" - pg = self._create_process_group_xccl() - device = self.rank_to_GPU[self.rank][0] - # Cover different buffer sizes - if type == torch.float64: - size = (1024,) # 1K elements - elif type == torch.float32: - size = (1024, 1024) # 1M elements - elif type == torch.float16: - size = (1024, 1024, 1024) # 1G elements - else: - size = (1,) # 1 element - - # Note: currently we cannot fill values into a FP8 tensor, thus we - # create the NaN tensor in float32 type and cast it to FP8 - if type == torch.float8_e4m3fn or type == torch.float8_e5m2: - init_type = torch.float32 - else: - init_type = type - - nan_tensor = torch.zeros(*size, dtype=init_type, device=device) - # randomly pick an nan element - index = tuple([random.randrange(size[i]) for i in range(len(size))]) - nan_tensor[index] = float("nan") - if init_type != type: - # Now cast to the targeted dtype - nan_tensor = nan_tensor.to(type) - - output = torch.empty(self.world_size, *size, dtype=type, device=device) - - # # confirm enable/disable flag works - # backend._set_enable_nan_check(False) - # # Note: using all-gather here bc some NCCL/SM version does not support - # # FP8 reduction - # # temporarily skip due to https://github.com/pytorch/pytorch/issues/153479 - # # pg._allgather_base(output, nan_tensor) - - # backend._set_enable_nan_check(True) - try: - pg._allgather_base(output, nan_tensor) - except Exception: - sys.exit(signal.SIGABRT) - - dist.destroy_process_group() - - # reset env - os.environ["TORCH_XCCL_NAN_CHECK"] = "0" - class CommTest(MultiProcessTestCase): @property @@ -632,319 +551,6 @@ def test_all_gather_into_tensor(self): ) -class XCCLTraceTestBase(MultiProcessTestCase): - def setUp(self): - super().setUp() - os.environ["TORCH_FR_BUFFER_SIZE"] = "1000" - self.tempdir = tempfile.TemporaryDirectory() - os.environ["TORCH_FR_DUMP_TEMP_FILE"] = self._trace_basename() - os.environ["TORCH_FR_DEBUG_INFO_PIPE_FILE"] = self._trace_basename() - self._spawn_processes() - - @classmethod - def _run( - cls, - parent_conn, - rank: int, - test_name: str, - file_name: str, - parent_pipe, - **kwargs, - ) -> None: - cls.parent = parent_conn - super()._run(rank, test_name, file_name, parent_pipe) - - @property - def local_device(self): - return torch.device("xpu", self.rank_to_GPU[self.rank][0]) - - def _join_processes(self, fn): - # We need to patch sys.exit() as skip_if will use sys.exit() and - # the exit code from the this process will not be caught. - with mock.patch("sys.exit"): - fn() - super()._join_processes(fn) - - def _spawn_processes(self) -> None: - proc = torch.multiprocessing.get_context("spawn").Process - self.children_pipes = [] - parent_pipes = [] - for _ in range(self.world_size): - parent_conn, child_conn = torch.multiprocessing.Pipe() - self.children_pipes.append(child_conn) - parent_pipes.append(parent_conn) - piter = iter(parent_pipes) - - def wrap(*positional, args, **kwargs): - args = (next(piter), *args) - return proc(*positional, args=args, **kwargs) - - self._start_processes(wrap) - - def _create_process_group_xccl( - self, timeout=timedelta(seconds=600), device_id=None - ): - store = c10d.FileStore(self.file_name, self.world_size) - c10d.init_process_group( - "xccl", - world_size=self.world_size, - rank=self.rank, - store=store, - timeout=timeout, - device_id=device_id, - ) - pg = c10d.distributed_c10d._get_default_group() - return pg - - def tearDown(self): - super().tearDown() - try: - os.remove(self.file_name) - except OSError: - pass - - @property - def world_size(self): - return 2 - - @property - def rank_to_GPU(self): - # return rank to GPU map - return init_multigpu_helper(self.world_size, "xccl") - - def _trace_basename(self): - # we pass the base to the env, and the dump util will append rank - return os.path.join(self.tempdir.name, "trace_") - - def _trace_name(self, rank): - return self._trace_basename() + str(rank) - - def started_or_scheduled(self, timing_enabled=False): - return "started" if timing_enabled else "scheduled" - - -class XCCLTraceTest(XCCLTraceTestBase): - def _verify_trace(self, t, include_collectives, is_json, timing_enabled=False): - ver = t["version"] - self.assertEqual(ver, "2.9") - xccl_version = t["xccl_version"] - torch_xccl_version = torch._C._distributed_c10d.get_xccl_version() - self.assertEqual(xccl_version, torch_xccl_version) - pg_config = t["pg_config"] - self.assertEqual(len(pg_config), 1) - default_pg_info = pg_config["0"] - self.assertIn("name", default_pg_info) - self.assertIn("desc", default_pg_info) - self.assertIn("ranks", default_pg_info) - pg_status = t["pg_status"] - self.assertEqual(len(pg_status), 1) - self.assertEqual(str(pg_status["0"]["last_enqueued_collective"]), "2") - self.assertEqual(str(pg_status["0"]["last_completed_collective"]), "2") - self.assertEqual( - str(pg_status["0"]["last_started_collective"]), - "2" if timing_enabled else "-1", - ) - global_ranks = pg_config["0"]["ranks"] - self.assertEqual(len(json.loads(global_ranks)), self.world_size) - if include_collectives: - self.assertEqual(len(t["entries"]), 2) - t = t["entries"] - last = t[-1] - self.assertEqual(last["thread_id"], str(threading.current_thread().ident)) - self.assertEqual(last["thread_name"], "fr_test_thread") - self.assertEqual(last["process_group"], ("0", "default_pg")) - self.assertEqual(last["state"], "completed") - s = last["time_discovered_started_ns"] - f = last["time_discovered_completed_ns"] - self.assertEqual(last["record_id"], 1) - self.assertIsNotNone(f) - if timing_enabled: - self.assertIsNotNone(s) - self.assertTrue(s <= f) - # we don't collect stack traces in JSON at the moment - if not is_json: - self.assertIn("test_c10d_xccl.py", str(last["frames"])) - self.assertEqual(last["input_sizes"], ((3, 4),)) - self.assertEqual(last["input_dtypes"], ["Float"]) - self.assertEqual(last["output_sizes"], ((3, 4),)) - self.assertEqual(last["output_dtypes"], ["Float"]) - self.assertEqual(last["collective_seq_id"], 2) - self.assertEqual(last["timeout_ms"], 600000) - now = datetime.now() - event_created_time = datetime.fromtimestamp( - last["time_created_ns"] / 1000000000 - ) - before_test = now - timedelta(minutes=1) - self.assertTrue(before_test < event_created_time < now) - if timing_enabled: - # very loose bounds, measured 0.036 ms on devgpu - self.assertTrue(0 < last["duration_ms"] < 100) - else: - self.assertTrue("duration_ms" not in last) - else: - self.assertTrue("entries" not in t) - - def load_libpthread_or_libc(self): - import ctypes.util - - for base in ("pthread", "c"): - path = ctypes.util.find_library(base) - if path: - try: - return ctypes.CDLL(path) - except OSError: - continue - raise RuntimeError("Could not load pthread or libc") - - # Directly set thread name using threading.current_thread().name does not work - # because we use pthread_getname_np to get the thread’s OS-level name in C++ - def set_thread_name(self, name): - import ctypes - - lib = self.load_libpthread_or_libc() - pthread_self = lib.pthread_self - pthread_self.restype = ctypes.c_void_p - pthread_setname_np = lib.pthread_setname_np - pthread_setname_np.argtypes = [ctypes.c_void_p, ctypes.c_char_p] - - # Get current pthread handle - tid = pthread_self() - - # Set name - pthread_setname_np(tid, name.encode()) - - @requires_xccl() - @skip_if_lt_x_gpu(2) - @parametrize("include_collectives", [True, False]) - def test_short_pickle(self, include_collectives, timing_enabled=False): - if self.rank == self.MAIN_PROCESS_RANK: - return - pg = self._create_process_group_xccl() - if timing_enabled: - pg._enable_collectives_timing() - device = self.local_device - self.set_thread_name("fr_test_thread") - a = torch.full((3, 4), float(self.rank), device=device) - for _ in range(2): - f = pg.allreduce(a) - f.wait() - torch.xpu.synchronize(device=device) - # gah ok so now the duration_ms is populated best-effort since it can only happen outside "dump()" api - time.sleep(1) - t = pickle.loads( - torch._C._distributed_c10d._dump_xccl_trace( - includeCollectives=include_collectives - ) - ) - self._verify_trace( - t, - include_collectives=include_collectives, - is_json=True, - timing_enabled=timing_enabled, - ) - dist.destroy_process_group() - - @requires_xccl() - @skip_if_lt_x_gpu(2) - def test_dump_pipe(self): - def open_file_with_timeout(file_path, mode, timeout=1.0): - start_time = time.time() - while time.time() - start_time < timeout: - if os.path.exists(file_path): - return open(file_path, mode) - time.sleep(0.1) - raise FileNotFoundError - - if self.rank == self.MAIN_PROCESS_RANK: - for c in self.children_pipes: - self.assertEqual(c.recv(), "next") - - dump_file = self._trace_name(rank=0) - pipe_file = dump_file + ".pipe" - with open_file_with_timeout(pipe_file, "w") as f: - f.write("1\n") - with open_file_with_timeout(dump_file, "rb", timeout=10.0) as f: - self.assertTrue("all_reduce" in str(pickle.load(f))) - - for c in self.children_pipes: - c.send("next") - return - - pg = self._create_process_group_xccl() - device = self.local_device - a = torch.full((3, 4), float(self.rank), device=device) - for _ in range(2): - f = pg.allreduce(a) - f.wait() - torch.xpu.synchronize(device=device) - self.parent.send("next") - self.parent.recv() - - @requires_xccl() - @skip_if_lt_x_gpu(2) - def test_long(self): - os.environ["TORCH_FR_BUFFER_SIZE"] = "10" - if self.rank == self.MAIN_PROCESS_RANK: - return - pg = self._create_process_group_xccl() - device = self.local_device - a = torch.full((3, 4), float(self.rank), device=device) - for _ in range(2): - # test some other primitives to make sure - # their strings are valid - xs = [torch.ones(3, 4, device=device)] - pg.broadcast(xs).wait() - pg.allreduce(xs).wait() - pg.reduce(xs).wait() - ys = [[torch.empty(3, 4, device=device) for _ in range(self.world_size)]] - pg.allgather(ys, xs).wait() - pg.reduce_scatter(xs, ys).wait() - f = pg.allreduce(a) - f.wait() - torch.xpu.synchronize(device=device) - t = pickle.loads(torch._C._distributed_c10d._dump_xccl_trace()) - t = t["entries"] - self.assertEqual(len(t), 10) - first = t[0] - last = t[-1] - self.assertEqual(last["profiling_name"], "xccl:all_reduce") - self.assertEqual(last["state"], "completed") - self.assertIn("test_c10d_xccl.py", str(last["frames"])) - self.assertEqual(last["input_sizes"], ((3, 4),)) - self.assertEqual(last["input_dtypes"], ["Float"]) - self.assertEqual(last["output_sizes"], ((3, 4),)) - self.assertEqual(last["output_dtypes"], ["Float"]) - self.assertEqual(last["timeout_ms"], 600000) - self.assertEqual(last["collective_seq_id"] - first["collective_seq_id"], 9) - dist.destroy_process_group() - - @requires_xccl() - @skip_if_lt_x_gpu(2) - def test_barrier_profiling(self): - os.environ["TORCH_FR_BUFFER_SIZE"] = "10" - if self.rank == self.MAIN_PROCESS_RANK: - return - pg = self._create_process_group_xccl() - device = self.local_device - a = torch.full((3, 4), float(self.rank), device=device) - f = pg.barrier() - f = pg.allreduce(a) - f.wait() - torch.xpu.synchronize(device=device) - t = pickle.loads(torch._C._distributed_c10d._dump_xccl_trace()) - t = t["entries"] - self.assertEqual(len(t), 2) - first = t[0] - last = t[-1] - self.assertEqual(first["profiling_name"], "xccl:all_reduce_barrier") - self.assertEqual(last["profiling_name"], "xccl:all_reduce") - dist.destroy_process_group() - - -instantiate_parametrized_tests(XCCLTraceTest) -instantiate_parametrized_tests(ProcessGroupXCCLTest) - - class SetDeviceMethod(Enum): TORCH_XPU_SET = auto() # torch.xpu.set_device COLLECTIVE_ARGUMENT = auto() # broadcast_object_list(device=) From e3a167dea7dd398f6667f8c1f979dac8d1174779 Mon Sep 17 00:00:00 2001 From: frost-intel Date: Thu, 7 Aug 2025 19:44:15 +0000 Subject: [PATCH 14/15] Test --- test/xpu/distributed/test_c10d_xccl.py | 79 ++++++++++++++++++++++++++ 1 file changed, 79 insertions(+) diff --git a/test/xpu/distributed/test_c10d_xccl.py b/test/xpu/distributed/test_c10d_xccl.py index 0625a6993f..916524073c 100644 --- a/test/xpu/distributed/test_c10d_xccl.py +++ b/test/xpu/distributed/test_c10d_xccl.py @@ -2,6 +2,8 @@ import math import os +import random +import signal import sys import time from datetime import timedelta @@ -19,6 +21,9 @@ import torch.testing._internal.common_utils as common from torch.testing._internal.common_distributed import MultiProcessTestCase from torch.testing._internal.common_utils import ( + instantiate_parametrized_tests, + IS_SANDCASTLE, + parametrize, retry_on_connect_failures, run_tests, skip_but_pass_in_sandcastle_if, @@ -210,6 +215,15 @@ def _create_process_group_xccl( def setUp(self): super().setUp() + TEST_NAN_ASSERT_RETURN = 0 if IS_SANDCASTLE else -signal.SIGABRT + self.special_return_code_checks = { + self.test_nan_assert_float16.__wrapped__: TEST_NAN_ASSERT_RETURN, + self.test_nan_assert_float32.__wrapped__: TEST_NAN_ASSERT_RETURN, + self.test_nan_assert_float64.__wrapped__: TEST_NAN_ASSERT_RETURN, + self.test_nan_assert_bfloat16.__wrapped__: TEST_NAN_ASSERT_RETURN, + self.test_nan_assert_float8_e4m3fn.__wrapped__: TEST_NAN_ASSERT_RETURN, + self.test_nan_assert_float8_e5m2.__wrapped__: TEST_NAN_ASSERT_RETURN, + } self._spawn_processes() def tearDown(self): @@ -288,6 +302,68 @@ def test_set_process_group_desc(self): pg_2 = c10d.new_group([0, 1]) self.assertEqual(pg_2.group_desc, "undefined") + @requires_xccl() + @parametrize( + "type", + [ + torch.float16, + torch.float32, + torch.float64, + torch.bfloat16, + torch.float8_e4m3fn, + torch.float8_e5m2, + ], + ) + def test_nan_assert(self, type): + # Expecting a device-side error when NaN is detected + os.environ["TORCH_XCCL_NAN_CHECK"] = "1" + pg = self._create_process_group_xccl() + device = self.rank_to_GPU[self.rank][0] + # Cover different buffer sizes + if type == torch.float64: + size = (1024,) # 1K elements + elif type == torch.float32: + size = (1024, 1024) # 1M elements + elif type == torch.float16: + size = (1024, 1024, 1024) # 1G elements + else: + size = (1,) # 1 element + + # Note: currently we cannot fill values into a FP8 tensor, thus we + # create the NaN tensor in float32 type and cast it to FP8 + if type == torch.float8_e4m3fn or type == torch.float8_e5m2: + init_type = torch.float32 + else: + init_type = type + + nan_tensor = torch.zeros(*size, dtype=init_type, device=device) + # randomly pick an nan element + index = tuple([random.randrange(size[i]) for i in range(len(size))]) + nan_tensor[index] = float("nan") + if init_type != type: + # Now cast to the targeted dtype + nan_tensor = nan_tensor.to(type) + + output = torch.empty(self.world_size, *size, dtype=type, device=device) + + # # confirm enable/disable flag works + # backend._set_enable_nan_check(False) + # # Note: using all-gather here bc some NCCL/SM version does not support + # # FP8 reduction + # # temporarily skip due to https://github.com/pytorch/pytorch/issues/153479 + # # pg._allgather_base(output, nan_tensor) + + # backend._set_enable_nan_check(True) + try: + pg._allgather_base(output, nan_tensor) + except Exception: + sys.exit(signal.SIGABRT) + + dist.destroy_process_group() + + # reset env + os.environ["TORCH_XCCL_NAN_CHECK"] = "0" + class CommTest(MultiProcessTestCase): @property @@ -551,6 +627,9 @@ def test_all_gather_into_tensor(self): ) +instantiate_parametrized_tests(ProcessGroupXCCLTest) + + class SetDeviceMethod(Enum): TORCH_XPU_SET = auto() # torch.xpu.set_device COLLECTIVE_ARGUMENT = auto() # broadcast_object_list(device=) From 8031f8ea351a637c99b9a2d7cb7932b9b78b1914 Mon Sep 17 00:00:00 2001 From: frost-intel Date: Thu, 7 Aug 2025 20:01:00 +0000 Subject: [PATCH 15/15] UT for XCCL FlightRecorder --- test/xpu/distributed/test_c10d_xccl.py | 317 ++++++++++++++++++++++++- 1 file changed, 316 insertions(+), 1 deletion(-) diff --git a/test/xpu/distributed/test_c10d_xccl.py b/test/xpu/distributed/test_c10d_xccl.py index 916524073c..e143be041e 100644 --- a/test/xpu/distributed/test_c10d_xccl.py +++ b/test/xpu/distributed/test_c10d_xccl.py @@ -1,16 +1,21 @@ # Owner(s): ["oncall: distributed"] +import json import math import os +import pickle import random import signal import sys +import tempfile +import threading import time -from datetime import timedelta +from datetime import datetime, timedelta from enum import auto, Enum from unittest import mock import torch +import torch._C._distributed_c10d import torch.distributed as c10d if not c10d.is_available() or not c10d.is_xccl_available(): @@ -627,6 +632,316 @@ def test_all_gather_into_tensor(self): ) +class XCCLTraceTestBase(MultiProcessTestCase): + def setUp(self): + super().setUp() + os.environ["TORCH_FR_BUFFER_SIZE"] = "1000" + self.tempdir = tempfile.TemporaryDirectory() + os.environ["TORCH_FR_DUMP_TEMP_FILE"] = self._trace_basename() + os.environ["TORCH_FR_DEBUG_INFO_PIPE_FILE"] = self._trace_basename() + self._spawn_processes() + + @classmethod + def _run( + cls, + parent_conn, + rank: int, + test_name: str, + file_name: str, + parent_pipe, + **kwargs, + ) -> None: + cls.parent = parent_conn + super()._run(rank, test_name, file_name, parent_pipe) + + @property + def local_device(self): + return torch.device("xpu", self.rank_to_GPU[self.rank][0]) + + def _join_processes(self, fn): + # We need to patch sys.exit() as skip_if will use sys.exit() and + # the exit code from the this process will not be caught. + with mock.patch("sys.exit"): + fn() + super()._join_processes(fn) + + def _spawn_processes(self) -> None: + proc = torch.multiprocessing.get_context("spawn").Process + self.children_pipes = [] + parent_pipes = [] + for _ in range(self.world_size): + parent_conn, child_conn = torch.multiprocessing.Pipe() + self.children_pipes.append(child_conn) + parent_pipes.append(parent_conn) + piter = iter(parent_pipes) + + def wrap(*positional, args, **kwargs): + args = (next(piter), *args) + return proc(*positional, args=args, **kwargs) + + self._start_processes(wrap) + + def _create_process_group_xccl( + self, timeout=timedelta(seconds=600), device_id=None + ): + store = c10d.FileStore(self.file_name, self.world_size) + c10d.init_process_group( + "xccl", + world_size=self.world_size, + rank=self.rank, + store=store, + timeout=timeout, + device_id=device_id, + ) + pg = c10d.distributed_c10d._get_default_group() + return pg + + def tearDown(self): + super().tearDown() + try: + os.remove(self.file_name) + except OSError: + pass + + @property + def world_size(self): + return 2 + + @property + def rank_to_GPU(self): + # return rank to GPU map + return init_multigpu_helper(self.world_size, "xccl") + + def _trace_basename(self): + # we pass the base to the env, and the dump util will append rank + return os.path.join(self.tempdir.name, "trace_") + + def _trace_name(self, rank): + return self._trace_basename() + str(rank) + + def started_or_scheduled(self, timing_enabled=False): + return "started" if timing_enabled else "scheduled" + + +class XCCLTraceTest(XCCLTraceTestBase): + def _verify_trace(self, t, include_collectives, is_json, timing_enabled=False): + ver = t["version"] + self.assertEqual(ver, "2.9") + xccl_version = t["xccl_version"] + torch_xccl_version = torch._C._distributed_c10d.get_xccl_version() + self.assertEqual(xccl_version, torch_xccl_version) + pg_config = t["pg_config"] + self.assertEqual(len(pg_config), 1) + default_pg_info = pg_config["0"] + self.assertIn("name", default_pg_info) + self.assertIn("desc", default_pg_info) + self.assertIn("ranks", default_pg_info) + pg_status = t["pg_status"] + self.assertEqual(len(pg_status), 1) + self.assertEqual(str(pg_status["0"]["last_enqueued_collective"]), "2") + self.assertEqual(str(pg_status["0"]["last_completed_collective"]), "2") + self.assertEqual( + str(pg_status["0"]["last_started_collective"]), + "2" if timing_enabled else "-1", + ) + global_ranks = pg_config["0"]["ranks"] + self.assertEqual(len(json.loads(global_ranks)), self.world_size) + if include_collectives: + self.assertEqual(len(t["entries"]), 2) + t = t["entries"] + last = t[-1] + self.assertEqual(last["thread_id"], str(threading.current_thread().ident)) + self.assertEqual(last["thread_name"], "fr_test_thread") + self.assertEqual(last["process_group"], ("0", "default_pg")) + self.assertEqual(last["state"], "completed") + s = last["time_discovered_started_ns"] + f = last["time_discovered_completed_ns"] + self.assertEqual(last["record_id"], 1) + self.assertIsNotNone(f) + if timing_enabled: + self.assertIsNotNone(s) + self.assertTrue(s <= f) + # we don't collect stack traces in JSON at the moment + if not is_json: + self.assertIn("test_c10d_xccl.py", str(last["frames"])) + self.assertEqual(last["input_sizes"], ((3, 4),)) + self.assertEqual(last["input_dtypes"], ["Float"]) + self.assertEqual(last["output_sizes"], ((3, 4),)) + self.assertEqual(last["output_dtypes"], ["Float"]) + self.assertEqual(last["collective_seq_id"], 2) + self.assertEqual(last["timeout_ms"], 600000) + now = datetime.now() + event_created_time = datetime.fromtimestamp( + last["time_created_ns"] / 1000000000 + ) + before_test = now - timedelta(minutes=1) + self.assertTrue(before_test < event_created_time < now) + if timing_enabled: + # very loose bounds, measured 0.036 ms on devgpu + self.assertTrue(0 < last["duration_ms"] < 100) + else: + self.assertTrue("duration_ms" not in last) + else: + self.assertTrue("entries" not in t) + + def load_libpthread_or_libc(self): + import ctypes.util + + for base in ("pthread", "c"): + path = ctypes.util.find_library(base) + if path: + try: + return ctypes.CDLL(path) + except OSError: + continue + raise RuntimeError("Could not load pthread or libc") + + # Directly set thread name using threading.current_thread().name does not work + # because we use pthread_getname_np to get the thread’s OS-level name in C++ + def set_thread_name(self, name): + import ctypes + + lib = self.load_libpthread_or_libc() + pthread_self = lib.pthread_self + pthread_self.restype = ctypes.c_void_p + pthread_setname_np = lib.pthread_setname_np + pthread_setname_np.argtypes = [ctypes.c_void_p, ctypes.c_char_p] + + # Get current pthread handle + tid = pthread_self() + + # Set name + pthread_setname_np(tid, name.encode()) + + @requires_xccl() + @skip_if_lt_x_gpu(2) + @parametrize("include_collectives", [True, False]) + def test_short_pickle(self, include_collectives, timing_enabled=False): + if self.rank == self.MAIN_PROCESS_RANK: + return + pg = self._create_process_group_xccl() + if timing_enabled: + pg._enable_collectives_timing() + device = self.local_device + self.set_thread_name("fr_test_thread") + a = torch.full((3, 4), float(self.rank), device=device) + for _ in range(2): + f = pg.allreduce(a) + f.wait() + torch.xpu.synchronize(device=device) + # gah ok so now the duration_ms is populated best-effort since it can only happen outside "dump()" api + time.sleep(1) + t = pickle.loads( + torch._C._distributed_c10d._dump_xccl_trace( + includeCollectives=include_collectives + ) + ) + self._verify_trace( + t, + include_collectives=include_collectives, + is_json=True, + timing_enabled=timing_enabled, + ) + dist.destroy_process_group() + + @requires_xccl() + @skip_if_lt_x_gpu(2) + def test_dump_pipe(self): + def open_file_with_timeout(file_path, mode, timeout=1.0): + start_time = time.time() + while time.time() - start_time < timeout: + if os.path.exists(file_path): + return open(file_path, mode) + time.sleep(0.1) + raise FileNotFoundError + + if self.rank == self.MAIN_PROCESS_RANK: + for c in self.children_pipes: + self.assertEqual(c.recv(), "next") + + dump_file = self._trace_name(rank=0) + pipe_file = dump_file + ".pipe" + with open_file_with_timeout(pipe_file, "w") as f: + f.write("1\n") + with open_file_with_timeout(dump_file, "rb", timeout=10.0) as f: + self.assertTrue("all_reduce" in str(pickle.load(f))) + + for c in self.children_pipes: + c.send("next") + return + + pg = self._create_process_group_xccl() + device = self.local_device + a = torch.full((3, 4), float(self.rank), device=device) + for _ in range(2): + f = pg.allreduce(a) + f.wait() + torch.xpu.synchronize(device=device) + self.parent.send("next") + self.parent.recv() + + @requires_xccl() + @skip_if_lt_x_gpu(2) + def test_long(self): + os.environ["TORCH_FR_BUFFER_SIZE"] = "10" + if self.rank == self.MAIN_PROCESS_RANK: + return + pg = self._create_process_group_xccl() + device = self.local_device + a = torch.full((3, 4), float(self.rank), device=device) + for _ in range(2): + # test some other primitives to make sure + # their strings are valid + xs = [torch.ones(3, 4, device=device)] + pg.broadcast(xs).wait() + pg.allreduce(xs).wait() + pg.reduce(xs).wait() + ys = [[torch.empty(3, 4, device=device) for _ in range(self.world_size)]] + pg.allgather(ys, xs).wait() + pg.reduce_scatter(xs, ys).wait() + f = pg.allreduce(a) + f.wait() + torch.xpu.synchronize(device=device) + t = pickle.loads(torch._C._distributed_c10d._dump_xccl_trace()) + t = t["entries"] + self.assertEqual(len(t), 10) + first = t[0] + last = t[-1] + self.assertEqual(last["profiling_name"], "xccl:all_reduce") + self.assertEqual(last["state"], "completed") + self.assertIn("test_c10d_xccl.py", str(last["frames"])) + self.assertEqual(last["input_sizes"], ((3, 4),)) + self.assertEqual(last["input_dtypes"], ["Float"]) + self.assertEqual(last["output_sizes"], ((3, 4),)) + self.assertEqual(last["output_dtypes"], ["Float"]) + self.assertEqual(last["timeout_ms"], 600000) + self.assertEqual(last["collective_seq_id"] - first["collective_seq_id"], 9) + dist.destroy_process_group() + + @requires_xccl() + @skip_if_lt_x_gpu(2) + def test_barrier_profiling(self): + os.environ["TORCH_FR_BUFFER_SIZE"] = "10" + if self.rank == self.MAIN_PROCESS_RANK: + return + pg = self._create_process_group_xccl() + device = self.local_device + a = torch.full((3, 4), float(self.rank), device=device) + f = pg.barrier() + f = pg.allreduce(a) + f.wait() + torch.xpu.synchronize(device=device) + t = pickle.loads(torch._C._distributed_c10d._dump_xccl_trace()) + t = t["entries"] + self.assertEqual(len(t), 2) + first = t[0] + last = t[-1] + self.assertEqual(first["profiling_name"], "xccl:all_reduce_barrier") + self.assertEqual(last["profiling_name"], "xccl:all_reduce") + dist.destroy_process_group() + + +instantiate_parametrized_tests(XCCLTraceTest) instantiate_parametrized_tests(ProcessGroupXCCLTest)