nvshmem4py got some bugs here:
for
t = nvshmem.core.tensor(shape, dtype)
t_p = nvshmem.core.get_peer_tensor(t, pe)
nvshmem.core.free_tensor(t)
nvshmem.core.free_tensor(t_p). # free peer_tensor or not, you got error anyway.
# then allocate and get peer again
t = nvshmem.core.tensor(another_shape, dtype)
t_p = nvshmem.core.get_peer_tensor(t, pe)
nvshmem.core.free_tensor(t)
it’s because the NvshememResource.get_peer_buffer logic:
line 184 only checks if buffer is recorded in the _mem_references, but not check the buffer size: if the size is different from the cache and the buffer’s size is wrong:
if i did not call nvshmem.core.free_tensor(t_p), then the shape is wrong.
if I called nvshmem.core.free_tensor(t_p), then buffer.mr is None
here is the python re-produce code
import torch.distributed as dist
import torch
import triton
import triton.language as tl
import nvshmem.core
import os
from cuda.core.experimental import Device, system
import nvshmem.core.utils
from torch.profiler import record_function
class PyTorchStreamWrapper:
def __init__(self, pt_stream):
self.pt_stream = pt_stream
self.handle = pt_stream.cuda_stream
def __cuda_stream__(self):
stream_id = self.pt_stream.cuda_stream
return (0, stream_id) # Return format required by CUDA Python
def torchrun_uid_init():
"""
Initialize NVSHMEM using UniqueID with `torchrun` as the launcher
"""
# Set Torch device
local_rank = int(os.environ["LOCAL_RANK"])
torch.cuda.set_device(local_rank)
device = torch.device("cuda", local_rank)
# nvshmem4py requires a cuda.core Device at init time
global dev
dev = Device(device.index)
dev.set_current()
global stream
# Get PyTorch's current stream
pt_stream = torch.cuda.current_stream()
stream = PyTorchStreamWrapper(pt_stream)
# Initialize torch.distributed process group
world_size = torch.cuda.device_count()
dist.init_process_group(
backend="cpu:gloo,cuda:nccl", rank=local_rank, world_size=world_size, device_id=device
)
# Extract rank, nranks from process group
num_ranks = dist.get_world_size()
rank_id = dist.get_rank()
# Create an empty uniqueid for all ranks
uniqueid = nvshmem.core.get_unique_id(empty=True)
if rank_id == 0:
# Rank 0 gets a real uniqueid
uniqueid = nvshmem.core.get_unique_id()
broadcast_objects = [uniqueid]
else:
broadcast_objects = [None]
# We use torch.distributed.broadcast_object_list to send the UID to all ranks
dist.broadcast_object_list(broadcast_objects, src=0)
dist.barrier()
nvshmem.core.init(
device=dev,
uid=broadcast_objects[0],
rank=rank_id,
nranks=num_ranks,
initializer_method="uid",
)
def test_get_peer_memory():
import random
for _ in range(10):
M = random.randint(16, 1024)
shape = (M, 1024)
t = nvshmem.core.tensor(shape, dtype=torch.float16)
rank = int(os.getenv("RANK"))
if rank == 0:
t_peer = nvshmem.core.get_peer_tensor(t, 1)
nvshmem.core.free_tensor(t)
# comment this if or not, you got error anyway.
if rank == 0:
nvshmem.core.free_tensor(t_peer)
if __name__ == "__main__":
torchrun_uid_init()
nvshmem.core.utils._configure_logging(level="DEBUG")
test_get_peer_memory()
nvshmem.core.finalize()
dist.destroy_process_group()


