Omniverse Physx Time Stepping Blocking

Hello, I found out that If I call the fetch_results from a different thread it blocks the USD Composer, the other two mechanisms for calling it safely are 1. by pushing it in event stream bus, or 2. by calling from UI thread.

How can I manage it to call it from a different thread to make a custom busy loop to do time stepping?

I’m afraid the event stream bus Is not very accurate to achieve this task efficiently, because it goes at its own pace.

from threading import Thread
import asyncio

import carb
import carb.events
import omni.kit.app
import omni.ext
import omni.ui as ui
from omni.physx import get_physx_simulation_interface, acquire_physx_interface

STEP_EVENT_TYPE = carb.events.type_from_string("my.extension.STEP_EVENT")

class MyExtension(omni.ext.IExt):
    def on_startup(self, ext_id):
        self.bus = omni.kit.app.get_app().get_message_bus_event_stream()
        self.step_event_subscription = self.bus.create_subscription_to_pop_by_type(STEP_EVENT_TYPE, self.on_step_event)
        self.physx_sim_interface = get_physx_simulation_interface()
        self.current_time = 0

        self._window = ui.Window("Test stepping", width=250, height=150)
        with self._window.frame:
            with ui.VStack():
                ui.Button("Step", clicked_fn=self.step_click)
                ui.Button("ThreadStep (NOT SAFE!!!)", clicked_fn=self.threadStepBlocking_click)
                ui.Button("ThreadStep", clicked_fn=self.threadStepWorking_click)

        def step_click(self):
        self.step()

      def threadStepBlocking_click(self):
          local_thread = Thread(target=self.step, daemon=True)
          local_thread.start()
  
      def threadStepWorking_click(self):
          local_thread = Thread(target=self.push_event, daemon=True)
          local_thread.start()
  
      def step(self):
          self.current_time += 0.1
          self.physx_sim_interface.simulate(1/60, self.current_time)
          self.physx_sim_interface.fetch_results()
  
      def push_event(self):
          self.bus.push(STEP_EVENT_TYPE)
  
      def on_step_event(self, event):
          self.step()
  
      def on_shutdown(self):
          pass


Hi,
The fetch_results will by default write to USD, which is possible only from the UI thread indeed.
There are a couple of options, you can skip fetch_results if you dont want display the results. Then basically once the simulation is done you would have to display it from the UI thread though with the fetch_results.

In theory one could also use omni.physx.fabric to display the results, that should be possible even from a different thread.
One would have to attach omni.physx.fabric to a state (that would detach it from automated updates from UI thread) then update manually.
Something like:

from omni.physxfabric import get_physx_fabric_interface

get_physx_fabric_interface().attach_stage(self.stage_id)

then to update it
get_physx_fabric_interface().update(stepno * self.dt, self.dt)

Enabling the omni.physx.fabric should disable the USD updates, so you can call the fetch_results and then the fabric update to get the data to fabric, then those data would get rendered.

Regards,
Ales

@AlesBorovicka I attatched the stage like this

        self.usd_context = omni.usd.get_context()
        if not self.usd_context:
            self.usd_context = omni.usd.create_context()
        self.physx_fabric = get_physx_fabric_interface()
        self.physx_fabric.attach_stage(self.usd_context.get_stage_id())

But if I do the attatch_stage, then the calls from the simulation interface doesn’t do nothing, in case I would like to fetch_results

        self.physx_sim_interface = get_physx_simulation_interface()
        self.physx_sim_interface.fetch_results()

Yes, this would not do anything (nothing should move, you have to update the fabric to get the data from PhysX.
Call:

get_physx_fabric_interface().update(stepno * self.dt, self.dt)

After the self.physx_sim_interface.fetch_results()

Hi @AlesBorovicka
For me, the threadFabricStep_click is not making a box fall in USD Composer.

from threading import Thread
import asyncio

import carb
import carb.events
import omni.kit.app
import omni.ext
import omni.ui as ui
from omni.physx import get_physx_simulation_interface, acquire_physx_interface
from omni.physxfabric import get_physx_fabric_interface


STEP_EVENT_TYPE = carb.events.type_from_string("my.extension.STEP_EVENT")

class MyExtension(omni.ext.IExt):
    def on_startup(self, ext_id):
        self.bus = omni.kit.app.get_app().get_message_bus_event_stream()
        self.step_event_subscription = self.bus.create_subscription_to_pop_by_type(STEP_EVENT_TYPE, self.on_step_event)
        self.physx_sim_interface = get_physx_simulation_interface()

        self.usd_context = omni.usd.get_context()
        if not self.usd_context:
            self.usd_context = omni.usd.create_context()
        stage = self.usd_context.get_stage()

        self.physx_fabric = get_physx_fabric_interface()
        self.physx_fabric.attach_stage(self.usd_context.get_stage_id())
        self.current_time = 0
        self.current_step = 0

        self._window = ui.Window("Test stepping", width=250, height=150)
        with self._window.frame:
            with ui.VStack():
                ui.Button("Step", clicked_fn=self.step_click)
                ui.Button("ThreadStep (NOT SAFE!!!)", clicked_fn=self.threadStepBlocking_click)
                ui.Button("ThreadStep", clicked_fn=self.threadStepWorking_click)
                ui.Button("ThreadFabricStep", clicked_fn=self.threadFabricStep_click)

    def step_click(self):
        self.step()

    def threadStepBlocking_click(self):
        local_thread = Thread(target=self.step, daemon=True)
        local_thread.start()

    def threadStepWorking_click(self):
        local_thread = Thread(target=self.push_event, daemon=True)
        local_thread.start()

    def threadFabricStep_click(self):
        local_thread = Thread(target=self.stepFabric, daemon=True)
        local_thread.start()
    
    def step(self):
        self.current_time += 0.1
        self.physx_sim_interface.simulate(1/60, self.current_time)
        self.physx_sim_interface.fetch_results()

    def stepFabric(self):
        self.current_step += 1
        self.physx_fabric.update(self.current_step * 1/60, 1/60)
        self.physx_sim_interface.fetch_results()

    def push_event(self):
        self.bus.push(STEP_EVENT_TYPE)

    def on_step_event(self, event):
        self.step()

    def on_shutdown(self):
        pass


Hi,
I managed to get it working, but I had to make few changes:

from threading import Thread
import asyncio

import carb
import carb.events
import omni.kit.app
import omni.ext
import omni.ui as ui
from omni.physx import get_physx_simulation_interface, acquire_physx_interface
from omni.physxfabric import get_physx_fabric_interface


STEP_EVENT_TYPE = carb.events.type_from_string("my.extension.STEP_EVENT")

class MyExtension(omni.ext.IExt):
    def on_startup(self, ext_id):
        self.bus = omni.kit.app.get_app().get_message_bus_event_stream()
        self.step_event_subscription = self.bus.create_subscription_to_pop_by_type(STEP_EVENT_TYPE, self.on_step_event)
        self.physx_sim_interface = get_physx_simulation_interface()

        self.usd_context = omni.usd.get_context()
        if not self.usd_context:
            self.usd_context = omni.usd.create_context()
        stage = self.usd_context.get_stage()

        self.physx_fabric = get_physx_fabric_interface()
        self.physx_fabric.attach_stage(self.usd_context.get_stage_id())
        self.current_time = 0
        self.current_step = 0

        self._window = ui.Window("Test stepping", width=250, height=150)
        with self._window.frame:
            with ui.VStack():
                ui.Button("Step", clicked_fn=self.step_click)
                ui.Button("ThreadStep (NOT SAFE!!!)", clicked_fn=self.threadStepBlocking_click)
                ui.Button("ThreadStep", clicked_fn=self.threadStepWorking_click)
                ui.Button("ThreadFabricStep", clicked_fn=self.threadFabricStep_click)
                ui.Button("Attach", clicked_fn=self.attach_click)

    def step_click(self):
        self.step()

    def attach_click(self):
        self.physx_sim_interface.attach_stage(self.usd_context.get_stage_id())
        self.physx_fabric.attach_stage(self.usd_context.get_stage_id())

    def threadStepBlocking_click(self):
        local_thread = Thread(target=self.step, daemon=True)
        local_thread.start()

    def threadStepWorking_click(self):
        local_thread = Thread(target=self.push_event, daemon=True)
        local_thread.start()

    def threadFabricStep_click(self):
        local_thread = Thread(target=self.stepFabric, daemon=True)
        local_thread.start()
    
    def step(self):
        self.current_time += 0.1
        self.physx_sim_interface.simulate(1/60, self.current_time)
        self.physx_sim_interface.fetch_results()

    def stepFabric(self):
        self.current_step += 1
        
        self.physx_sim_interface.simulate(1/60, self.current_time)
        self.physx_sim_interface.fetch_results()
        self.physx_fabric.update(self.current_step * 1/60, 1/60)        

    def push_event(self):
        self.bus.push(STEP_EVENT_TYPE)

    def on_step_event(self, event):
        self.step()

    def on_shutdown(self):
        pass

So:

  1. I had to add button for attach, unless you do attach for the physx_sim_interface, parsing wont happen, so there is nothing in physics until you attach (happens automatically when you press play). Also the fabric attach needs to be called to in order to create the fabric structures, so I added an explicit button for that, this basically populates PhysX and Fabric in order to have simulation working
  2. The stepping was not right. So fabric update is just about updating the transformations from physx to fabric. The simulation has to be stepped, hence first simulation is stepped then fabric gets updated.

Regards,
Ales

1 Like

Thanks, note if I click Play/Stop in USD Composer and then I click ThreadFabricStep again, USD Composer freezes. Also would be good to reset the simulation

My original idea was to have 3 custom buttons (Start/Step/Stop) and when it starts, in another Thread it performs a while loop that calls the step method with a custom delta time. Also get to know when did that simulation step finished computing.

Play/Stop vs custom stepping are two different system fighting for the same resource, so unless things are properly unattached and attached again it might not work.
The thing is that omni.physx itself is not tied to Kit update loop or anything. It needs something to attach it to the stage which is either in kit or in memory or anywhere. Either its manually controlled through IPhysxSimulation or there is additional optional connection to Kit that connects to the UsdContext stage. But those things are mutually exclusive pretty much.

For the custom start/step/stop this is implemented in Physics debug window, if you open Window->Simulation->Debug there is start/step/stop and if you check the python code you can see how to store initial transforms and how to reset.

Regards,
Ales

@AlesBorovicka Yes, the physxDebugView.py was my initial inspiration.

I got here, and the step_simulation works, unless its previously started from a thread in Start, so I avoid blocking the UI, but ends up freezing USD Composer.

from threading import Thread, Event
import asyncio
import time

import carb
import carb.events
import omni.kit.app
import omni.ext
import omni.ui as ui
from omni.physx import get_physx_simulation_interface, acquire_physx_interface
from omni.timeline import get_timeline_interface

class MyExtension(omni.ext.IExt):
    def on_startup(self, ext_id):
        self.physx_interface = acquire_physx_interface()
        self.physx_sim_interface = get_physx_simulation_interface()
        self.timeline = get_timeline_interface()

        self.usd_context = omni.usd.get_context()
        if not self.usd_context:
            self.usd_context = omni.usd.create_context()
        stage = self.usd_context.get_stage()

        self.current_time = 0
        self.current_step = 0

        self.simulation_running = Event()
        self.simulation_thread = None

        self._window = ui.Window("Test stepping", width=250, height=150)
        with self._window.frame:
            with ui.VStack():
                ui.Button("Start Simulation", clicked_fn=self.start_simulation)
                ui.Button("Stop Simulation", clicked_fn=self.stop_simulation)
                ui.Button("Step Simulation", clicked_fn=self.step_simulation_click)

    def simulation_loop(self):
        if self.timeline.is_playing():
            self.timeline.pause()
        else:
            self.timeline.play()
        last_time = time.time()
        while self.simulation_running.is_set():
            current_time = time.time()
            delta_time = max(current_time - last_time, 0.001)
            last_time = current_time
            self.step_simulation(delta_time)

    def start_simulation(self):
        if not self.simulation_running.is_set():
            self.simulation_running.set()
            self.simulation_thread = Thread(target=self.simulation_loop, daemon=True)
            self.simulation_thread.start()
            print("Simulation started.")

    def stop_simulation(self):
        if self.simulation_running.is_set():
            self.simulation_running.clear()
            self.simulation_thread.join()
            if not self.timeline.is_stopped():
                self.timeline.stop()
            else:
                self.timeline.set_current_time(self.timeline.get_start_time())
            self.physx_interface.reset_simulation()   
        print("Simulation stopped.")

    def step_simulation(self, delta_time):
        self.current_time += delta_time
        self.current_step += 1
        if self.timeline.is_playing():
            self.timeline.pause()
        self.timeline.set_current_time(self.timeline.get_current_time() + delta_time)
        self.physx_interface.update_simulation(delta_time, self.timeline.get_current_time())
        self.physx_interface.update_transformations(False, True, True)

    def step_simulation_click(self):
        self.step_simulation(4)

    def on_shutdown(self):
        pass


Regards

I guess this should do the trick:

from threading import Thread
import asyncio

import carb
import carb.events
import omni.kit.app
import omni.ext
import omni.ui as ui
from omni.physx import get_physx_simulation_interface, acquire_physx_interface
from omni.physxfabric import get_physx_fabric_interface


STEP_EVENT_TYPE = carb.events.type_from_string("my.extension.STEP_EVENT")

class MyExtension(omni.ext.IExt):
    def on_startup(self, ext_id):
        self.bus = omni.kit.app.get_app().get_message_bus_event_stream()
        self.step_event_subscription = self.bus.create_subscription_to_pop_by_type(STEP_EVENT_TYPE, self.on_step_event)
        self.physx_sim_interface = get_physx_simulation_interface()

        self.usd_context = omni.usd.get_context()
        if not self.usd_context:
            self.usd_context = omni.usd.create_context()
        stage = self.usd_context.get_stage()

        self.physx_fabric = get_physx_fabric_interface()
        self.physx_fabric.attach_stage(self.usd_context.get_stage_id())
        self.current_time = 0
        self.current_step = 0

        self._window = ui.Window("Test stepping", width=250, height=150)
        with self._window.frame:
            with ui.VStack():
                ui.Button("Step", clicked_fn=self.step_click)
                ui.Button("ThreadStep (NOT SAFE!!!)", clicked_fn=self.threadStepBlocking_click)
                ui.Button("ThreadStep", clicked_fn=self.threadStepWorking_click)
                ui.Button("ThreadFabricStep", clicked_fn=self.threadFabricStep_click)
                ui.Button("Start", clicked_fn=self.start_click)
                ui.Button("Stop", clicked_fn=self.stop_click)

    def step_click(self):
        self.step()

    def start_click(self):
        self.physx_sim_interface.attach_stage(self.usd_context.get_stage_id())
        self.physx_fabric.attach_stage(self.usd_context.get_stage_id())

    def stop_click(self):
        self.physx_sim_interface.detach_stage()
        self.physx_fabric.detach_stage()        
        self.physx_fabric.attach_stage(self.usd_context.get_stage_id())

    def threadStepBlocking_click(self):
        local_thread = Thread(target=self.step, daemon=True)
        local_thread.start()

    def threadStepWorking_click(self):
        local_thread = Thread(target=self.push_event, daemon=True)
        local_thread.start()

    def threadFabricStep_click(self):
        local_thread = Thread(target=self.stepFabric, daemon=True)
        local_thread.start()
    
    def step(self):
        self.current_time += 0.1
        self.physx_sim_interface.simulate(1/60, self.current_time)
        self.physx_sim_interface.fetch_results()

    def stepFabric(self):
        self.current_step += 1
        
        self.physx_sim_interface.simulate(1/60, self.current_time)
        self.physx_sim_interface.fetch_results()
        self.physx_fabric.update(self.current_step * 1/60, 1/60)        

    def push_event(self):
        self.bus.push(STEP_EVENT_TYPE)

    def on_step_event(self, event):
        self.step()

    def on_shutdown(self):
        pass

If you want the play/stop button working again you need to do the fabric->detach (this will reset the poses) and fabric->attach (this will connect fabric back so that play does not crash) trick.
I will check if we can reattach to the IStageUpdate more gracefully like omni.physx does. There the recovery works fine, but it does not for omni.physx.fabric.

So basically I press: start - step - stop, then play/stop works too.

Regards,
Ales

1 Like

This topic was automatically closed 14 days after the last reply. New replies are no longer allowed.