Update Scene Asynchronously

Hi all! I hope to write an Omniverse app that will allow me to programmatically modify objects in the scene in real-time. As a proof-of-concept, I started to write an extension for Create that simply creates an Xform with a sphere and updates its translation on a fixed interval (kind of like the setInterval function in JS or InvokeRepeating in Unity). The on_startup method creates a thread that moves the sphere slightly every 0.1 seconds. Problem is that the SetTranslate line throws an error.

What I really want to do is create some objects in a scene and have them move according to data from an Apache Kafka stream. This is a proof-of-concept to figure out how to move objects asynchronously in a scene (without having to interact with UI).

My script and the error are below for reference. I feel like I could be going about this all wrong. The UI part for instance is not necessary. Is there a better way to update a scene asynchronously?

extension.py:

from threading import Timer
from datetime import datetime

import omni.ext
import omni.ui as ui
from pxr import Usd, UsdGeom

class RepeatedTimer():
    """
    Call function on a given interval.

    Relies on threading.Timer, which runs a function once after a
    certain number of seconds. This creates a Timer object on a
    regular interval to execute a given function repeatedly.

    Courtesy of https://stackoverflow.com/a/13151299/7195376
    """
    def __init__(self, interval, function, *args, **kwargs):
        self._timer = None
        self.interval = interval
        self.function = function
        self.args = args
        self.kwargs = kwargs
        self.is_running = False
        self.start()

    def _run(self):
        self.is_running = False
        self.start()
        self.function(*self.args, **self.kwargs)

    def start(self):
        if not self.is_running:
            self._timer = Timer(self.interval, self._run)
            self._timer.start()
            self.is_running = True

    def stop(self):
        self._timer.cancel()
        self.is_running = False

class MyExtension(omni.ext.IExt):
    def on_startup(self, ext_id):
        self._window = ui.Window("My Window", width=300, height=300)
        with self._window.frame:
            with ui.VStack():
                poseLabel = ui.Label('Pose')

        usd_context = omni.usd.get_context()
        stage = usd_context.get_stage()
        sphereXform = UsdGeom.Xform.Define(stage, '/sphere')
        spherePrim = UsdGeom.Sphere.Define(stage, '/sphere/spherePrim')
        spherePrim.GetRadiusAttr().Set(10)
        spherePrim.GetDisplayColorAttr().Set([(0, 1, 0)])

        pose = {
            'pose': (0, 0, 0),
            'interval': 0.1,
            'minVal': -10,
            'maxVal': 10
        }
        def bounce(val):
            newVal = val + pose['interval']
            if newVal > pose['maxVal'] or newVal < pose['minVal']:
                pose['interval'] = -pose['interval']
                newVal = val + pose['interval']
            return newVal

        def update_position():
            usd_context = omni.usd.get_context()
            stage = usd_context.get_stage()
            sphereXform = stage.GetPrimAtPath('/sphere/spherePrim')
            pose['pose'] = tuple(bounce(v) for v in pose['pose'])
            UsdGeom.XformCommonAPI(sphereXform).SetTranslate((pose['pose']))
            poseLabel.text = str(pose['pose'])

        self.fnTimer = RepeatedTimer(0.1, update_position)

    def on_shutdown(self):
        print('Goodbye')
        self.fnTimer.stop()

error:

2021-12-16 20:42:22 [755,239ms] [Error] [omni.kit.app.impl] [py stderr]: Exception in thread Thread-261:
Traceback (most recent call last):
  File "/home/bricklayer/.local/share/ov/pkg/deps/4a4ac5b063f92375d0d74eb0f2c3ac9b/python/lib/python3.7/threading.py", line 926, in _bootstrap_inner
    self.run()
  File "/home/bricklayer/.local/share/ov/pkg/deps/4a4ac5b063f92375d0d74eb0f2c3ac9b/python/lib/python3.7/threading.py", line 1177, in run
    self.function(*self.args, **self.kwargs)
  File "/home/bricklayer/Workspace/Omniverse/Extensions/UpdatePosition/exts/omni.MasonM.UpdatePosition/omni/MasonM/UpdatePosition/extension.py", line 30, in _run
    self.function(*self.args, **self.kwargs)
  File "/home/bricklayer/Workspace/Omniverse/Extensions/UpdatePosition/exts/omni.MasonM.UpdatePosition/omni/MasonM/UpdatePosition/extension.py", line 74, in update_position
    UsdGeom.XformCommonAPI(sphereXform).SetTranslate((pose['pose']))
  File "/home/bricklayer/.local/share/ov/pkg/deps/4a4ac5b063f92375d0d74eb0f2c3ac9b/extscore/omni.usd.libs/pxr/Trace/__init__.py", line 78, in invoke
    return func(*args, **kwargs)
  File "/home/bricklayer/.local/share/ov/pkg/deps/4a4ac5b063f92375d0d74eb0f2c3ac9b/extscore/omni.usd/omni/usd/_impl/utils.py", line 796, in _on_usd_changed
    self.__prim_changed_task = asyncio.ensure_future(self._update_usd_cache_state())
  File "/home/bricklayer/.local/share/ov/pkg/deps/4a4ac5b063f92375d0d74eb0f2c3ac9b/python/lib/python3.7/asyncio/tasks.py", line 607, in ensure_future
    loop = events.get_event_loop()
  File "/home/bricklayer/.local/share/ov/pkg/deps/4a4ac5b063f92375d0d74eb0f2c3ac9b/python/lib/python3.7/asyncio/events.py", line 644, in get_event_loop
    % threading.current_thread().name)
RuntimeError: There is no current event loop in thread 'Thread-261'.

@jetson_mason You should not access stage in another thread. There is way to subscribe frame event, so you can execute what you want per frame:

subscription_handle = omni.kit.app.get_app().get_update_event_stream().create_subscription_to_pop(on_update, name="ANY_CUSTOMIZED_SUBSCRIPTION_NAME")

Where on_update is the callback that receives a single param to mark the time. Also, subscription_handle must be hold until you want to release it by setting it to None.

Hi @rozhang , thanks for your help! I removed the RepeatedTimer statement and added the following to my script.

def on_update(p):
    print(f'{p} ({type(p)})')

subscription_handle = omni.kit.app.get_app().get_update_event_stream().create_subscription_to_pop(on_update, name="UPDATE_SUB")
print(subscription_handle)

However, I am not seeing any output in the console tab from the on_update function. My understanding is that on_update should be called on every frame, like Update in Unity, right? Do you know why I am not seeing any output?

Where is your subscription_handle saved? You should make sure it’s persistent across your update cycle before you want to stop it. In your case, it looks to me it’s like a local variable and will be released once it’s out of your function scope?

You’re right! I forgot to store it as a property. Adding self.subscription_handle = ... fixed it right away. Thank you!

You’re welcome.