I wanted to understand the difference between py_num_workers and num_threads that you specify while using the pipeline_def function using Nvidia DALI. I need to run a single pipeline, with each thread working on a single element of a batch at a time. Understandably, increasing the num_threads argument indeed improves the performance as expected (With the division of labor in batch processing being as I described previously?) But increasing the py_num_workers does not increase performance, however, I can see new processes being created( Using htop).
For reference I’ve included the code I’m using, in case it is of interest:
def image_loader(filename, directIO=False):
flags = os.O_RDONLY | os.O_DIRECT if directIO else os.O_RDONLY
image_fd = os.open(filename, flags)
fo = os.fdopen(image_fd, 'rb+', buffering=0)
filesize = 512 * math.ceil(os.path.getsize(filename) / 512)
image_mmap = mmap.mmap(-1, filesize)
fo.readinto(image_mmap)
img = np.frombuffer(image_mmap.read(), dtype=np.uint8)
return img
class ExternalInputCallable:
def __init__(self, batch_size):
self.images_dir = "/home/keshavpc-csl/Desktop/datasets/artificial_dataset"
self.batch_size = batch_size
self.samples = []
for sample in os.listdir(self.images_dir):
self.samples.append((os.path.join(self.images_dir, sample), 1))
self.full_iterations = len(self.samples) // batch_size
def __call__(self, sample_info):
sample_idx = sample_info.idx_in_epoch
if sample_info.iteration >= self.full_iterations:
raise StopIteration()
jpeg_filename, label = self.samples[sample_idx]
encoded_img = image_loader(jpeg_filename, directIO=False)
return encoded_img, np.array(label,dtype=np.int32)
BATCH_SIZE=32
EPOCHS=5
@pipeline_def(batch_size=BATCH_SIZE, num_threads=4, device_id=types.CPU_ONLY_DEVICE_ID, py_num_workers=16, py_start_method='spawn')
def parallel_pipeline():
jpegs, labels = fn.external_source(source=ExternalInputCallable(BATCH_SIZE),
num_outputs=2, batch=False,
parallel=True, dtype=[types.UINT8, types.INT32])
decode = fn.decoders.image(jpegs, device="cpu")
return decode, labels
if __name__=="__main__":
pipe = parallel_pipeline()
times = []
pipe.build()
dali_iter = DALIClassificationIterator(pipe, last_batch_padded=True, last_batch_policy=LastBatchPolicy.PARTIAL)
for i in tqdm(range(EPOCHS)):
begin = time.time()
for i, data in enumerate(dali_iter):
data = None
print(f"Average epoch time: {np.round(np.mean(times), 2)}")
Also, would be great if someone could tell how I would access the worker_id launched via num_threads, as os.getpid() will obviously only yield the pid of the main process.