Difference between py_num_workers and num_threads

I wanted to understand the difference between py_num_workers and num_threads that you specify while using the pipeline_def function using Nvidia DALI. I need to run a single pipeline, with each thread working on a single element of a batch at a time. Understandably, increasing the num_threads argument indeed improves the performance as expected (With the division of labor in batch processing being as I described previously?) But increasing the py_num_workers does not increase performance, however, I can see new processes being created( Using htop).

For reference I’ve included the code I’m using, in case it is of interest:

def image_loader(filename, directIO=False):
    flags = os.O_RDONLY | os.O_DIRECT if directIO else os.O_RDONLY
    image_fd = os.open(filename, flags)
    fo = os.fdopen(image_fd, 'rb+', buffering=0)
    filesize = 512 * math.ceil(os.path.getsize(filename) / 512)
    image_mmap = mmap.mmap(-1, filesize)
    fo.readinto(image_mmap)
    img = np.frombuffer(image_mmap.read(), dtype=np.uint8)
    return img

class ExternalInputCallable:
    def __init__(self, batch_size):
        self.images_dir = "/home/keshavpc-csl/Desktop/datasets/artificial_dataset"
        self.batch_size = batch_size
        self.samples = []
        for sample in os.listdir(self.images_dir):
            self.samples.append((os.path.join(self.images_dir, sample), 1))
    
        self.full_iterations = len(self.samples) // batch_size

    def __call__(self, sample_info):
        sample_idx = sample_info.idx_in_epoch
        if sample_info.iteration >= self.full_iterations:
            raise StopIteration()
        jpeg_filename, label = self.samples[sample_idx]
        encoded_img = image_loader(jpeg_filename, directIO=False)
        return encoded_img, np.array(label,dtype=np.int32)

BATCH_SIZE=32
EPOCHS=5
@pipeline_def(batch_size=BATCH_SIZE, num_threads=4, device_id=types.CPU_ONLY_DEVICE_ID, py_num_workers=16, py_start_method='spawn')
def parallel_pipeline():
    jpegs, labels = fn.external_source(source=ExternalInputCallable(BATCH_SIZE),
                                       num_outputs=2, batch=False,
                                       parallel=True, dtype=[types.UINT8, types.INT32])
    decode = fn.decoders.image(jpegs, device="cpu")
    return decode, labels

if __name__=="__main__":
    pipe = parallel_pipeline()
    times = []
    pipe.build()
    dali_iter = DALIClassificationIterator(pipe, last_batch_padded=True, last_batch_policy=LastBatchPolicy.PARTIAL)
    for i in tqdm(range(EPOCHS)):
        begin = time.time()
        for i, data in enumerate(dali_iter):
            data = None
    print(f"Average epoch time: {np.round(np.mean(times), 2)}")

Also, would be great if someone could tell how I would access the worker_id launched via num_threads, as os.getpid() will obviously only yield the pid of the main process.

Hi,

This forum talks more about updates and issues related to TensorRT.
Please reach out to Issues · NVIDIA/DALI · GitHub for better help.

Thank you.