-
Notifications
You must be signed in to change notification settings - Fork 48
Nvls channel setup inside the container #477
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Comments
AFAIK, it requires execute the command on host/vm side... Can you access the vm? BTW, the pytorch doesn't support nvls right now, it's good to know more details about your case |
I don't have access to the VM. It looks like NCCL doesn't require this. I set the following environment variable and then ran the allreduce operation: export NCCL_ALGO="allreduce:nvls"
# NCCL version 2.21.5+cuda12.4 It didn’t give me any errors. Am I missing anything? |
I think pytorch still working on nvls support, it requires memory registration before running communication collectives. pytorch/pytorch#136567. You can test the latency/busBW for allreduce, if it reaches the number like this for 1GB data, then NVLS is enabled. Otherwise, I think it just ignores the environment variable
|
Just to confirm, there is no other way to use SHARP without creating an IMEX channel on an intranode H100 setup, right? node-0:493516:493645 [1] NCCL INFO Connected NVLS tree
node-0:493516:493645 [1] NCCL INFO threadThresholds 8/8/64 | 64/8/64 | 512 | 512
node-0:493516:493645 [1] NCCL INFO 24 coll channels, 16 nvls channels, 32 p2p channels, 32 p2p channels per peer
node-0:493516:493647 [3] NCCL INFO Connected NVLS tree
node-0:493516:493647 [3] NCCL INFO threadThresholds 8/8/64 | 64/8/64 | 512 | 512
node-0:493516:493647 [3] NCCL INFO 24 coll channels, 16 nvls channels, 32 p2p channels, 32 p2p channels per peer
node-0:493516:493648 [4] NCCL INFO Connected NVLS tree
node-0:493516:493648 [4] NCCL INFO threadThresholds 8/8/64 | 64/8/64 | 512 | 512
node-0:493516:493648 [4] NCCL INFO 24 coll channels, 16 nvls channels, 32 p2p channels, 32 p2p channels per peer
node-0:493516:493644 [0] NCCL INFO Connected NVLS tree
node-0:493516:493646 [2] NCCL INFO Connected NVLS tree
node-0:493516:493646 [2] NCCL INFO threadThresholds 8/8/64 | 64/8/64 | 512 | 512
node-0:493516:493646 [2] NCCL INFO 24 coll channels, 16 nvls channels, 32 p2p channels, 32 p2p channels per peer
node-0:493516:493644 [0] NCCL INFO threadThresholds 8/8/64 | 64/8/64 | 512 | 512
node-0:493516:493644 [0] NCCL INFO 24 coll channels, 16 nvls channels, 32 p2p channels, 32 p2p channels per peer
node-0:493516:493649 [5] NCCL INFO Connected NVLS tree
node-0:493516:493649 [5] NCCL INFO threadThresholds 8/8/64 | 64/8/64 | 512 | 512
node-0:493516:493649 [5] NCCL INFO 24 coll channels, 16 nvls channels, 32 p2p channels, 32 p2p channels per peer
node-0:493516:493650 [6] NCCL INFO Connected NVLS tree
node-0:493516:493650 [6] NCCL INFO threadThresholds 8/8/64 | 64/8/64 | 512 | 512
node-0:493516:493650 [6] NCCL INFO 24 coll channels, 16 nvls channels, 32 p2p channels, 32 p2p channels per peer
node-0:493516:493651 [7] NCCL INFO Connected NVLS tree
node-0:493516:493651 [7] NCCL INFO threadThresholds 8/8/64 | 64/8/64 | 512 | 512
node-0:493516:493651 [7] NCCL INFO 24 coll channels, 16 nvls channels, 32 p2p channels, 32 p2p channels per peer
#
# out-of-place in-place
# size count type redop root time algbw busbw #wrong time algbw busbw #wrong
# (B) (elements) (us) (GB/s) (GB/s) (us) (GB/s) (GB/s)
268435456 67108864 float sum -1 1138.6 235.76 412.58 0 1139.5 235.57 412.24 0
536870912 134217728 float sum -1 2192.9 244.82 428.43 0 2199.2 244.12 427.21 0
1073741824 268435456 float sum -1 4174.9 257.19 450.08 0 4177.6 257.02 449.79 0
2147483648 536870912 float sum -1 8041.8 267.04 467.32 0 8032.8 267.34 467.84 0 I am getting this using nccl-tests. |
I checked the other algos also. #!/bin/bash
NCCL_DEBUG_SUBSYS="INIT,ENV,TUNING" NCCL_DEBUG="INFO"
NCCL_ALGO=Ring ./build/all_reduce_perf -b 256M -e 2G -f 2 -g 8 > results_ring.txt
NCCL_ALGO=Tree ./build/all_reduce_perf -b 256M -e 2G -f 2 -g 8 > results_tree.txt
NCCL_ALGO=NVLS ./build/all_reduce_perf -b 256M -e 2G -f 2 -g 8 > results_nvls.txt
export NCCL_DEBUG=INFO
# nThread 1 nGpus 8 minBytes 268435456 maxBytes 2147483648 step: 2(factor) warmup iters: 5 iters: 20 agg iters: 1 validation: 1 graph: 0
#
# Using devices
# Rank 0 Group 0 Pid 493516 on node-0 device 0 [0001:00:00] NVIDIA H100 80GB HBM3
# Rank 1 Group 0 Pid 493516 on node-0 device 1 [0002:00:00] NVIDIA H100 80GB HBM3
# Rank 2 Group 0 Pid 493516 on node-0 device 2 [0003:00:00] NVIDIA H100 80GB HBM3
# Rank 3 Group 0 Pid 493516 on node-0 device 3 [0008:00:00] NVIDIA H100 80GB HBM3
# Rank 4 Group 0 Pid 493516 on node-0 device 4 [0009:00:00] NVIDIA H100 80GB HBM3
# Rank 5 Group 0 Pid 493516 on node-0 device 5 [000a:00:00] NVIDIA H100 80GB HBM3
# Rank 6 Group 0 Pid 493516 on node-0 device 6 [000b:00:00] NVIDIA H100 80GB HBM3
# Rank 7 Group 0 Pid 493516 on node-0 device 7 [000c:00:00] NVIDIA H100 80GB HBM3
node-0:493516:493516 [0] NCCL INFO cudaDriverVersion 12040
NCCL version 2.19.4+cuda12.4
# NVLS
# out-of-place in-place
# size count type redop root time algbw busbw #wrong time algbw busbw #wrong
# (B) (elements) (us) (GB/s) (GB/s) (us) (GB/s) (GB/s)
268435456 67108864 float sum -1 1141.1 235.23 411.66 0 1139.9 235.50 412.12 0
536870912 134217728 float sum -1 2195.5 244.53 427.93 0 2192.0 244.93 428.62 0
1073741824 268435456 float sum -1 4183.2 256.68 449.19 0 4175.4 257.16 450.03 0
2147483648 536870912 float sum -1 8041.4 267.05 467.34 0 8031.7 267.38 467.91 0
# Ring
#
# out-of-place in-place
# size count type redop root time algbw busbw #wrong time algbw busbw #wrong
# (B) (elements) (us) (GB/s) (GB/s) (us) (GB/s) (GB/s)
268435456 67108864 float sum -1 1305.7 205.58 359.77 0 1306.7 205.42 359.49 0
536870912 134217728 float sum -1 2573.4 208.62 365.09 0 2575.1 208.48 364.85 0
1073741824 268435456 float sum -1 5105.9 210.29 368.01 0 5109.6 210.14 367.75 0
2147483648 536870912 float sum -1 10194 210.66 368.66 0 10180 210.95 369.17 0
# Tree
# out-of-place in-place
# size count type redop root time algbw busbw #wrong time algbw busbw #wrong
# (B) (elements) (us) (GB/s) (GB/s) (us) (GB/s) (GB/s)
268435456 67108864 float sum -1 2076.5 129.27 226.23 0 2057.7 130.45 228.29 0
536870912 134217728 float sum -1 3401.8 157.82 276.19 0 3407.1 157.57 275.75 0
1073741824 268435456 float sum -1 6746.8 159.15 278.51 0 6744.5 159.20 278.60 0
2147483648 536870912 float sum -1 13071 164.30 287.52 0 13098 163.95 286.91 0 Using Pytorch
|
Now, in msccl++, we cannot use NVLS without IMEX channel. But seems your torch program worked with nvls support. Could you share you torch script and torch version you used? We want to know if we can enable the nvls in pytorch with some changes. |
(vllm) aiscuser@node-0:~/c2_overlap$ python3
Python 3.12.9 | packaged by Anaconda, Inc. | (main, Feb 6 2025, 18:56:27) [GCC 11.2.0] on linux
Type "help", "copyright", "credits" or "license" for more information.
>>> import torch
>>> torch.__version__
'2.5.1+cu124' import os
import time
import argparse
import numpy as np
import torch
import torch.distributed as dist
import torch.cuda.nvtx as nvtx
import torch.multiprocessing as mp
from torch.cuda import Event
def init_process(rank, world_size, args, backend='nccl'):
"""Initialize the distributed process."""
os.environ['MASTER_ADDR'] = 'localhost'
os.environ['MASTER_PORT'] = '29500'
dist.init_process_group(backend, rank=rank, world_size=world_size)
run(rank, world_size, args)
dist.destroy_process_group()
def run(rank, world_size, args):
"""Run the benchmark on a single process."""
# Set device
torch.cuda.set_device(rank)
device = torch.device(f"cuda:{rank}")
# Print information about the device
if rank == 0:
print(f"Running on {torch.cuda.get_device_name(device)}")
print(f"Number of GPUs: {world_size}")
print(f"Tensor size: {args.tokens} elements ({(args.tokens * 8192 * 2) / (1024**3):.2f} GB for bfloat16)")
print(f"Number of iterations: {args.iters}")
print(f"Number of warmup iterations: {args.warmup}")
print(f"Number of trials: {args.trials}")
# Create CUDA streams
measure_stream = torch.cuda.Stream()
spin_stream = torch.cuda.Stream()
# Create tensors
tensor = torch.rand(args.tokens, 8192, dtype=torch.bfloat16, device=device)
# Create a GPU flag tensor for synchronization
flag = torch.zeros(1, dtype=torch.int32, device=device)
# Define a simple spin kernel as a PyTorch operation
def spin_wait():
with torch.cuda.stream(spin_stream):
# Keep the GPU busy until flag is updated
while flag.item() == 0:
torch.cuda._sleep(100) # Sleep for a short time to reduce polling overhead
# Warmup
for _ in range(args.warmup):
dist.all_reduce(tensor)
# Synchronize before starting measurements
torch.cuda.synchronize()
# Create events for timing
timings = []
for trial in range(args.trials):
torch.cuda.synchronize()
# Measurement code
with torch.cuda.stream(measure_stream):
# Create events for timing
start_event = Event(enable_timing=True)
end_event = Event(enable_timing=True)
# Record start event
start_event.record(measure_stream)
# Run iterations of all_reduce
for _ in range(args.iters):
dist.all_reduce(tensor)
# Record end event
end_event.record(measure_stream)
# Wait for the measurement stream to finish
measure_stream.synchronize()
# Calculate elapsed time (ms)
elapsed_time = start_event.elapsed_time(end_event)
# Calculate bandwidth (GB/s)
tensor_size_bytes = tensor.numel() * tensor.element_size()
total_bytes = tensor_size_bytes * args.iters * 2 # factor of 2 for all-reduce (send and receive)
bandwidth = (total_bytes / (elapsed_time / 1000)) / (1024**3) # GB/s
# Calculate latency (us)
latency = (elapsed_time * 1000) / args.iters # us
timings.append((elapsed_time, bandwidth, latency))
if rank == 0:
print(f"Trial {trial+1}: Elapsed time = {elapsed_time:.3f} ms, "
f"Bandwidth = {bandwidth:.3f} GB/s, "
f"Latency = {latency:.3f} us")
# Report median results
if rank == 0:
elapsed_times, bandwidths, latencies = zip(*timings)
median_time = np.median(elapsed_times)
median_bandwidth = np.median(bandwidths)
median_latency = np.median(latencies)
print("\nMedian Results:")
print(f"Elapsed time = {median_time:.3f} ms")
print(f"Bandwidth = {median_bandwidth:.3f} GB/s")
print(f"Latency = {median_latency:.3f} us")
def main():
parser = argparse.ArgumentParser(description='PyTorch All-Reduce Benchmark')
parser.add_argument('--tokens', type=int, default=65536,
help='Tensor size in number of elements')
parser.add_argument('--iters', type=int, default=10,
help='Number of iterations per trial')
parser.add_argument('--trials', type=int, default=10,
help='Number of trials to run')
parser.add_argument('--warmup', type=int, default=5,
help='Number of warmup iterations')
args = parser.parse_args()
world_size = 8
mp.set_start_method('spawn')
processes = []
for rank in range(world_size):
p = mp.Process(target=init_process, args=(rank, world_size, args))
p.start()
processes.append(p)
for p in processes:
p.join()
if __name__ == "__main__":
main() |
As per #1632, we don't require imex channel for the intra-node setup. Please let me know once you integrate pytorch support. |
Just checked the NCCL code, if memory not registered, NCCL will use a temp buffer to copy data to that buffer, do nvls allreduce then copy data to output buffer. Now we are working on the NVLS support |
Hi @rajagond, we already fixed this issue at branch: import os
import argparse
import numpy as np
import torch
import torch.distributed as dist
import torch.multiprocessing as mp
from torch.cuda import Event
from mscclpp import RawGpuBuffer
from mscclpp.utils import GpuBuffer
def init_process(rank, world_size, args, backend='nccl'):
"""Initialize the distributed process."""
os.environ['MASTER_ADDR'] = 'localhost'
os.environ['MASTER_PORT'] = '29500'
dist.init_process_group(backend, rank=rank, world_size=world_size)
run(rank, world_size, args)
dist.destroy_process_group()
def run(rank, world_size, args):
"""Run the benchmark on a single process."""
# Set device
torch.cuda.set_device(rank)
device = torch.device(f"cuda:{rank}")
# Print information about the device
if rank == 0:
print(f"Running on {torch.cuda.get_device_name(device)}")
print(f"Number of GPUs: {world_size}")
print(f"Tensor size: {args.tokens} elements ({(args.tokens * 8192 * 2) / (1024**3):.2f} GB for bfloat16)")
print(f"Number of iterations: {args.iters}")
print(f"Number of warmup iterations: {args.warmup}")
print(f"Number of trials: {args.trials}")
# Create CUDA streams
measure_stream = torch.cuda.Stream()
spin_stream = torch.cuda.Stream()
buffer = RawGpuBuffer(args.tokens * 8192 * 2) # 2 bytes for bfloat16
dl_pack = buffer.to_dlpack(str(torch.bfloat16))
tensor = torch.utils.dlpack.from_dlpack(dl_pack)
tensor.uniform_()
# Warmup
for _ in range(args.warmup):
dist.all_reduce(tensor)
# Synchronize before starting measurements
torch.cuda.synchronize()
# Create events for timing
timings = []
# dist.barrier()
for trial in range(args.trials):
torch.cuda.synchronize()
# Measurement code
with torch.cuda.stream(measure_stream):
# Create events for timing
start_event = Event(enable_timing=True)
end_event = Event(enable_timing=True)
torch.cuda.synchronize()
# Record start event
start_event.record(measure_stream)
# Run iterations of all_reduce
for _ in range(args.iters):
dist.all_reduce(tensor)
# Record end event
end_event.record(measure_stream)
measure_stream.synchronize()
# Calculate elapsed time (ms)
elapsed_time = start_event.elapsed_time(end_event)
# Calculate bandwidth (GB/s)
tensor_size_bytes = tensor.numel() * tensor.element_size()
total_bytes = tensor_size_bytes * args.iters * 7 * 2 / 8
bandwidth = (total_bytes / (elapsed_time / 1000)) / (1024**3) # GB/s
# Calculate latency (us)
latency = (elapsed_time * 1000) / args.iters # us
timings.append((elapsed_time, bandwidth, latency))
if rank == 0:
print(f"Trial {trial+1}: Elapsed time = {elapsed_time:.3f} ms, "
f"Bandwidth = {bandwidth:.3f} GB/s, "
f"Latency = {latency:.3f} us")
# Report median results
if rank == 0:
elapsed_times, bandwidths, latencies = zip(*timings)
median_time = np.median(elapsed_times)
median_bandwidth = np.median(bandwidths)
median_latency = np.median(latencies)
print("\nMedian Results:")
print(f"Elapsed time = {median_time:.3f} ms")
print(f"Bandwidth = {median_bandwidth:.3f} GB/s")
print(f"Latency = {median_latency:.3f} us")
def main():
parser = argparse.ArgumentParser(description='PyTorch All-Reduce Benchmark')
parser.add_argument('--tokens', type=int, default=65536,
help='Tensor size in number of elements')
parser.add_argument('--iters', type=int, default=10,
help='Number of iterations per trial')
parser.add_argument('--trials', type=int, default=10,
help='Number of trials to run')
parser.add_argument('--warmup', type=int, default=5,
help='Number of warmup iterations')
args = parser.parse_args()
world_size = 8
mp.set_start_method('spawn')
processes = []
for rank in range(world_size):
p = mp.Process(target=init_process, args=(rank, world_size, args))
p.start()
processes.append(p)
for p in processes:
p.join()
if __name__ == "__main__":
main() Run with nccl via command mkdir -p ${EXECUTION_PLAN_DIR}
python3 ${MSCCLPP_HOME}/python/examples/allreduce_nvls.py 8 8 > ${EXECUTION_PLAN_DIR}/allreduce.json |
The perf for nccl:
The perf for MSCCL++
|
Hi,
am trying to use MSCCLPP inside a Singularity container with NVLS support. The setup below doesn't work inside the container. Is there a possible workaround? Thanks!
The text was updated successfully, but these errors were encountered: