Why does multiprocessing only works with fork?

Hello,

I was experimenting with some multiprocessing in an Omniverse extension. I can start basic processes and communicate back with Omniverse using queues and coroutine which is great. Now I want to do some pointcloud processing in the background using open3d, the thing with open3d is that it will not work properly when using multiprocessing if the start method is set to “fork” but will work perfectly using “forkserver”. However, when I can’t start any process in Omniverse unless I am using fork… is there a reason this happens and is there any work around ?

See this basic extension to replicate the issue (just change forkserver to fork and everything works)

import omni.ext
import omni.ui as ui
import omni.kit.app
import multiprocessing as mp
import time
import logging
import asyncio
import queue

logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)


def do_some_processing(cb_queue):
    for i in range(5):
        time.sleep(2)
        cb_queue.put(f"Proccessed {i}")


async def poll_queue(cb_queue):
    while True:
        try:
            message = cb_queue.get_nowait()
            if message is None:
                logger.info("Stopped")
                break
            logger.info(message)
        except queue.Empty:
            await asyncio.sleep(1)


class TechofficeAnthonyPgExtension(omni.ext.IExt):
    def on_startup(self, ext_id):
        # OTHER CODE
        ctx = mp.get_context("forkserver")
        self.q = ctx.Queue()
        self.poller = None

        # UI CODE
        self._window = ui.Window("My Window", width=300, height=300)
        with self._window.frame:
            def btn_fn():
                process = ctx.Process(target=do_some_processing, args=(self.q, ))
                process.start()
                self.poller = asyncio.ensure_future(poll_queue(self.q))
            with ui.VStack():
                with ui.HStack():
                    ui.Button("Start process", clicked_fn=btn_fn)

    def on_shutdown(self):
        if self.poller:
            self.q.put(None)

Hi @anthony.yaghi. Some questions for you:

  1. What happens when you use forkserver do you get an error or does something else happen?
  2. I’m guessing you’re on Linux since you’re using fork and forkserver?

I’m on Windows. I tried using spawn instead with your example and it blocked the UI thread for me and never returned.

Hello,

Yes I am on Ubuntu 20.04. When I use forkserver I get the following error at first.

2023-03-29 08:07:38 [Error] [omni.kit.app._impl] [py stderr]: /home/anthony/.local/share/ov/pkg/isaac_sim-2022.2.1/kit/python/lib/python3.7/multiprocessing/semaphore_tracker.py:55: UserWarning: semaphore_tracker: process died unexpectedly, relaunching. Some semaphores might leak.
2023-03-29 08:07:38 [Error] [omni.kit.app._impl] warnings.warn('semaphore_tracker: process died unexpectedly, ’
2023-03-29 08:07:38 [Error] [omni.kit.app._impl]

If I stop the extension and start it again, then the error will not show for a second time, but nothing happens so I am not sure the process is starting at all.

Thank you for the support.

Hi @anthony.yaghi. Sorry for the delay on this. We’ve logged an internal ticket for this: OM-90478. Thanks for reporting it!