I want to use UDP to control the audio playback with my own program, and I found the component: C:\Users\Administrator\AppData\Local\ov\pkg\audio2face-2023.2.0\exts\omni.audio2face.ui.player\omni\audio2face\ui\player\scripts\ui.py
import math
import os
import pathlib
from functools import singledispatch
import socket
from typing import List
import numpy as np
import omni.kit.app
import omni.kit.context_menu
import omni.ui as ui
import scipy.ndimage
from omni.audio2face.common import InstanceManagerEventType
from omni.audio2face.player import (
AudioPlayerEventType,
AudioPlayerInstance,
AudioPlayerStreamingInstance,
is_valid_path,
path_join,
)
from omni.audio2face.ui.common import (
BTN_WIDTH,
VSPACING,
BoolSettingWidgetBase,
CategoricalSettingWidgetWithReset,
CollapsableWidget,
FloatRangeWidgetWithReset,
PathWidgetWithReset,
PromptOverwriteFile,
SeparatorStyle,
SimpleWidget,
StringFieldStyle,
StringSettingWidget,
)
from omni.audio2face.utility import RareUpdater, log_error, log_info, log_warn
from omni.kit.window.file_exporter import get_file_exporter
WAVEFORM_HEIGHT = 22 * 2 + VSPACING
RECORDER_BTN_WIDTH = 75
PlayBtnStyle = {"image_url": "resources/glyphs/timeline_play.svg"}
PauseBtnStyle = {"image_url": "resources/glyphs/timeline_pause.svg"}
RewindBtnStyle = {"image_url": "resources/glyphs/timeline_rewind.svg"}
LoopBtnStyle = {"image_url": "resources/glyphs/timeline_loop.svg"}
PlaybackSliderBackgroundStyle = {
"background_color": 0xFF24211F,
"margin_height": 0,
"margin_width": 0,
"border_radius": 0,
}
HandlePlaybackStyle = {"border_radius": 0, "background_color": 0xFFEEEE33}
HandleRecordingStyle = {"border_radius": 0, "background_color": 0xFF3333EE}
HandleStreamingStyle = {"border_radius": 0, "background_color": 0xFF33EE33}
TrackWaveformStyle = {"margin_height": 0, "margin_width": 0, "border_radius": 0}
RangeStartSpacerStyle = {"border_width": 0, "padding": 0, "border_radius": 0, "margin_width": 0}
RangeRectStyle = {
"background_color": 0x30BBAB58,
"padding": 0,
"margin_width": 0,
"margin_height": 0,
"border_radius": 0,
"border_color": 0x70BBAB58,
"border_width": 1,
}
RangeRectRecordingStyle = {
"background_color": 0x305858BB,
"padding": 0,
"margin_width": 0,
"margin_height": 0,
"border_radius": 0,
"border_color": 0x705858BB,
"border_width": 1,
}
RangeRectStreamingStyle = {
"background_color": 0x3058BB58,
"padding": 0,
"margin_width": 0,
"margin_height": 0,
"border_radius": 0,
"border_color": 0x7058BB58,
"border_width": 1,
}
DBMeterRectGreenStyle = {
"background_color": 0x9030AA30,
"padding": 0,
"margin_width": 0,
"margin_height": 0,
"border_radius": 0,
"border_width": 0,
}
DBMeterRectYellowStyle = {
"background_color": 0x9030AAAA,
"padding": 0,
"margin_width": 0,
"margin_height": 0,
"border_radius": 0,
"border_width": 0,
}
DBMeterRectRedStyle = {
"background_color": 0x903030AA,
"padding": 0,
"margin_width": 0,
"margin_height": 0,
"border_radius": 0,
"border_width": 0,
}
DBMeterDarkBarStyle = {
"background_color": 0x90000000,
"padding": 0,
"margin_width": 0,
"margin_height": 0,
"border_radius": 0,
"border_width": 0,
}
DBMeterBorderStyle = {
"background_color": 0x0,
"padding": 0,
"margin_width": 0,
"margin_height": 0,
"border_radius": 0,
"border_color": 0xFF292929,
"border_width": 2,
}
import threading
import socket
def udp_server(self):
def handle_message(message):
try:
if message == "start playback":
self._btn_play._on_toggled()
elif message == "pause playback":
if self._btn_play._state:
self._btn_play._on_toggled()
elif message == "reset audio":
self._on_track_changed()
elif message.startswith("set audio directory:"):
directory = message.split(":", 1)[1]
self._root_path_widget.set_value(directory)
elif message.startswith("set audio:"):
full_path = message.split(":", 1)[1]
directory, track_name = os.path.split(full_path)
self._root_path_widget.set_value(directory)
self._track_widget.set_value(track_name)
except Exception as e:
print(f"Error handling message: {e}")
def server():
udp_ip = "127.0.0.1"
udp_port = 2001
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.bind((udp_ip, udp_port))
print(f"UDP server started at {udp_ip}:{udp_port}")
try:
while True:
data, addr = sock.recvfrom(1024)
message = data.decode('utf-8')
threading.Thread(target=handle_message, args=(message,)).start()
except OSError as e:
print(f"Socket error: {e}. Retrying...")
finally:
sock.close()
#close_port(udp_port)
thread = threading.Thread(target=server)
thread.daemon = True
thread.start()
class PlaybackSliderWidget(SimpleWidget):
def __init__(self, height, on_changed_fn=None, on_changed_from_mouse_fn=None):
super().__init__()
self._height = height
self._on_changed_fn = on_changed_fn
self._on_changed_from_mouse_fn = on_changed_from_mouse_fn
self._max_value = 0.001
self._value = 0.0
self._handle_width = 1
self._pressed = False
self._mouse_catcher = None
self._slider_placer = None
self._handle = None
self._update_sub = omni.kit.app.get_app().get_update_event_stream().create_subscription_to_pop(self._on_update)
def shutdown(self):
self._update_sub = None
self._on_changed_fn = None
self._on_changed_from_mouse_fn = None
self._max_value = 0.001
self._value = 0.0
self._pressed = False
self._mouse_catcher = None
self._slider_placer = None
self._handle = None
super().shutdown()
def set_value(self, value):
if self._pressed:
return # pressed mouse overrides external change of the value
self._value = value
if self._value < 0.0:
self._value = 0.0
elif self._value > self._max_value:
self._value = self._max_value
if self._on_changed_fn is not None:
self._on_changed_fn(self._value)
if self._max_value > 0:
rel_x_perc = self._value / self._max_value
self._set_slider_position(rel_x_perc)
elif self._max_value == 0:
self._set_slider_position(0)
def get_value(self):
return self._value
def set_max(self, max_value):
if max_value < 0:
raise ValueError("Playback Slider max value can't be less than zero")
self._max_value = max_value if max_value > 0 else 0.001
def set_handle_style(self, style):
if self._handle is not None:
self._handle.set_style(style)
def _set_slider_position(self, rel_x_perc):
if self._slider_placer is not None:
self._slider_placer.offset_x = ui.Percent(rel_x_perc * 100.0)
def _on_mouse_moved(self, x, y, _, btn):
if btn == True:
self._update_from_mouse(x)
def _on_mouse_pressed(self, x, y, btn, *args):
if btn == 0:
self._pressed = True
self._update_from_mouse(x)
def _on_mouse_released(self, x, y, btn, *args):
if btn == 0:
self._pressed = False
def _update_from_mouse(self, x):
if self._mouse_catcher is not None and self._slider_placer is not None:
rel_x = x - self._mouse_catcher.screen_position_x
if rel_x < 0:
rel_x = 0
elif rel_x >= self._mouse_catcher.computed_width:
rel_x = self._mouse_catcher.computed_width
rel_x_perc = rel_x / self._mouse_catcher.computed_width
self._set_slider_position(rel_x_perc)
self._value = self._max_value * rel_x_perc
if self._on_changed_fn is not None:
self._on_changed_fn(self._value)
def _build_content(self):
with ui.ZStack():
self._mouse_catcher = ui.Rectangle(
height=self._height,
style={
"background_color": 0x0,
"padding": 0,
"margin_width": 0,
"margin_height": 0,
"border_radius": 0,
"border_color": 0x0,
"border_width": 0,
},
mouse_moved_fn=self._on_mouse_moved,
mouse_pressed_fn=self._on_mouse_pressed,
mouse_released_fn=self._on_mouse_released,
)
with ui.HStack():
self._slider_placer = ui.Placer(draggable=False, stable_size=True)
with self._slider_placer:
with ui.HStack():
self._handle = ui.Rectangle(
width=self._handle_width, height=self._height, style=HandlePlaybackStyle
)
ui.Spacer()
def _on_update(self, *_):
if self._pressed:
if self._on_changed_from_mouse_fn is not None:
self._on_changed_from_mouse_fn(self._value)
class WaveformWidget(SimpleWidget):
def __init__(self, height):
super().__init__()
self._height = height
self._waveform_image_provider = None
self._waveform_image = None
self._canvas = None
self._canvas_width = 1024
self._canvas_height = WAVEFORM_HEIGHT
def shutdown(self):
self._waveform_image_provider = None
self._waveform_image = None
self._canvas = None
super().shutdown()
def update_track_waveform(self, track):
num_samples = track.get_num_samples()
width, height = self._canvas_width, self._canvas_height
ex_factor = 1
width_ex = width * ex_factor
shrink_factor = max(num_samples // width_ex, 1)
if 0:
volume = np.abs(track.data[::shrink_factor][:width_ex])
else:
if num_samples >= shrink_factor * width_ex:
volume = track.data[: shrink_factor * width_ex].reshape(width_ex, shrink_factor)
else:
tmp = np.zeros((shrink_factor * width_ex), np.float32)
tmp[:num_samples] = track.data
volume = tmp.reshape(width_ex, shrink_factor)
volume = np.abs(np.max(volume, axis=1))
# volume /= max(np.max(volume), 1e-8)
if 0: # dB logarithmic scale
volume = np.maximum(volume, 1e-6)
volume = 20.0 * np.log10(volume / 1.0)
volume = np.maximum((volume / 50.0) + 1.0, 0.0) # [-50, 0] dB
volume *= 0.7
canvas = np.zeros((height, width_ex, 4), dtype=np.uint8)
for x in range(canvas.shape[1]):
start = int(round((1.0 - volume[x]) * float(height) / 2))
end = int(round((1.0 + volume[x]) * float(height) / 2))
canvas[start:end, x, :] = [255, 255, 255, 130]
if start == end:
canvas[start : end + 1, x, :] = [255, 255, 255, 60]
if ex_factor > 1:
canvas = scipy.ndimage.zoom(canvas.astype(np.float32), (1, 1.0 / ex_factor, 1), order=1).astype(np.uint8)
self._canvas = canvas.flatten().tolist()
if self._waveform_image_provider is not None:
self._waveform_image_provider.set_bytes_data(self._canvas, [self._canvas_width, self._canvas_height])
def _build_content(self):
self._waveform_image_provider = omni.ui.ByteImageProvider()
if self._canvas is not None:
self._waveform_image_provider.set_bytes_data(self._canvas, [self._canvas_width, self._canvas_height])
with ui.HStack():
self._waveform_image = omni.ui.ImageWithProvider(
self._waveform_image_provider,
height=self._height,
style=TrackWaveformStyle,
fill_policy=omni.ui.IwpFillPolicy.IWP_STRETCH,
)
class TimelineRangeWidget(SimpleWidget):
def __init__(self, height):
super().__init__()
self._height = height
self._rect_range_start = None
self._rect_range = None
def shutdown(self):
self._rect_range_start = None
self._rect_range = None
super().shutdown()
def set_rect_style(self, style):
if self._rect_range is not None:
self._rect_range.set_style(style)
def update_range_rect(self, range_start, range_end, track_len):
if self._rect_range_start is not None and self._rect_range is not None:
if track_len == 0:
start_perc = 0
rect_perc = 0
else:
start_perc = range_start / track_len * 100.0
rect_perc = (range_end - range_start) / track_len * 100.0
self._rect_range_start.width = ui.Percent(start_perc)
self._rect_range.width = ui.Percent(rect_perc)
def _build_content(self):
with ui.HStack(height=self._height):
self._rect_range_start = ui.Spacer(width=omni.ui.Percent(0), style=RangeStartSpacerStyle)
self._rect_range = ui.Rectangle(width=omni.ui.Percent(100), height=self._height, style=RangeRectStyle)
class TimelineWidget(SimpleWidget):
def __init__(self, instance):
super().__init__()
self._instance = instance
self._waveform_widget = WaveformWidget(height=WAVEFORM_HEIGHT)
self._timeline_range_widget = TimelineRangeWidget(height=WAVEFORM_HEIGHT)
self._playback_slider_widget = PlaybackSliderWidget(
height=WAVEFORM_HEIGHT, on_changed_fn=None, on_changed_from_mouse_fn=self._on_changed
)
self._update_sub = omni.kit.app.get_app().get_update_event_stream().create_subscription_to_pop(self._on_update)
def shutdown(self):
self._update_sub = None
self._waveform_widget.shutdown()
self._waveform_widget = None
self._timeline_range_widget.shutdown()
self._timeline_range_widget = None
self._playback_slider_widget.shutdown()
self._playback_slider_widget = None
self._instance = None
super().shutdown()
def set_style(self, style):
if style == "regular":
self._playback_slider_widget.set_handle_style(HandlePlaybackStyle)
self._timeline_range_widget.set_rect_style(RangeRectStyle)
elif style == "streaming":
self._playback_slider_widget.set_handle_style(HandleStreamingStyle)
self._timeline_range_widget.set_rect_style(RangeRectStreamingStyle)
elif style == "recording":
self._playback_slider_widget.set_handle_style(HandleRecordingStyle)
self._timeline_range_widget.set_rect_style(RangeRectRecordingStyle)
def update_track_waveform(self):
track = self._instance.get_player().get_track_ref()
self._waveform_widget.update_track_waveform(track)
def _build_content(self):
with ui.ZStack():
omni.ui.Rectangle(style=PlaybackSliderBackgroundStyle)
self._waveform_widget.build()
self._timeline_range_widget.build()
self._playback_slider_widget.build()
def _on_changed(self, t):
if self._instance.setting("time").initialized():
self._instance.set_setting_val("time", t)
def _on_update(self, *_):
if not getattr(self, '_instance'):
return
if self._instance.setting("range").initialized():
track_len = self._instance.get_player().track.get_length()
range_start, range_end = self._instance.get_setting_val("range")
self._timeline_range_widget.update_range_rect(range_start, range_end, track_len)
self._playback_slider_widget.set_max(track_len)
if self._instance.setting("time").initialized():
t = self._instance.get_setting_val("time")
self._playback_slider_widget.set_value(t)
class TimecodeWidget(SimpleWidget):
def __init__(self, instance):
super().__init__()
self._instance = instance
self._timecode_lbl = None
self._timecode_tms_lbl = None
self._timecode_max_lbl = None
self._timecode_max_tms_lbl = None
self._update_sub = omni.kit.app.get_app().get_update_event_stream().create_subscription_to_pop(self._on_update)
def shutdown(self):
self._update_sub = None
self._timecode_lbl = None
self._timecode_tms_lbl = None
self._timecode_max_lbl = None
self._timecode_max_tms_lbl = None
self._instance = None
super().shutdown()
def _build_content(self):
with ui.HStack(height=22, style={"margin_width": 0}):
ui.Spacer()
self._timecode_lbl = ui.Label("0:00", width=0)
self._timecode_tms_lbl = ui.Label(".00", width=0, style={"color": 0x50FFFFFF})
ui.Label(" | ", style={"color": 0x70FFFFFF})
self._timecode_max_lbl = ui.Label("0:00", width=0)
self._timecode_max_tms_lbl = ui.Label(".00", width=0, style={"color": 0x50FFFFFF})
ui.Spacer()
def _set_timecode(self, t, m_sec_lbl, tms_lbl):
tmss = int(round(t * 100))
secs = tmss // 100
mins = secs // 60
secs_sub = secs % 60
tmss_sub = tmss % 100
m_sec_lbl.text = "{}:{:02d}".format(mins, secs_sub)
tms_lbl.text = ".{:02d}".format(tmss_sub)
def _on_update(self, *_):
if self._timecode_lbl is not None and self._timecode_tms_lbl is not None:
if self._instance.setting("time").initialized():
t = self._instance.get_setting_val("time")
self._set_timecode(t, self._timecode_lbl, self._timecode_tms_lbl)
if self._timecode_max_lbl is not None and self._timecode_max_tms_lbl is not None:
track_len = self._instance.get_player().track.get_length()
self._set_timecode(track_len, self._timecode_max_lbl, self._timecode_max_tms_lbl)
class ButtonPlayPause(BoolSettingWidgetBase):
def __init__(self, setting):
super().__init__(setting, state=False)
self._btn = None
def shutdown(self):
self._btn = None
super().shutdown()
def _build_widget(self):
with ui.HStack(width=BTN_WIDTH, height=22):
self._btn = ui.Button(width=BTN_WIDTH, style=PlayBtnStyle, tooltip="Play/Pause (P)")
self._btn.set_clicked_fn(self._on_toggled)
def _update_from_state(self, is_playing):
if self._btn is not None:
if is_playing == True:
self._btn.set_style(PauseBtnStyle)
else:
self._btn.set_style(PlayBtnStyle)
class ButtonLooping(BoolSettingWidgetBase):
def __init__(self, setting):
super().__init__(setting, state=False)
self._btn = None
def shutdown(self):
self._btn = None
super().shutdown()
def _build_widget(self):
with ui.HStack(width=BTN_WIDTH, height=22):
self._btn = ui.Button(width=BTN_WIDTH, style=LoopBtnStyle, tooltip="Loop (L)")
self._btn.set_clicked_fn(self._on_toggled)
def _update_from_state(self, is_looping):
if self._btn is not None:
self._btn.selected = is_looping
class ButtonRecorder(BoolSettingWidgetBase):
def __init__(self, setting):
super().__init__(setting, state=False)
self._btn = None
def shutdown(self):
self._btn = None
super().shutdown()
def _build_widget(self):
with ui.ZStack(width=BTN_WIDTH):
self._btn = ui.Button(" ", width=BTN_WIDTH)
self._btn.set_clicked_fn(self._on_toggled)
with ui.HStack(width=BTN_WIDTH, height=22):
ui.Spacer(width=11)
ui.Circle(width=10, style={"background_color": 0xA03030AA})
def _update_from_state(self, recorder_enabled):
if self._btn is not None:
self._btn.selected = recorder_enabled
class ButtonMuteUnmute(BoolSettingWidgetBase):
def __init__(self, setting):
super().__init__(setting, state=False)
self._btn = None
def shutdown(self):
self._btn = None
super().shutdown()
def _build_widget(self):
self._btn = ui.Button("MUTE", width=RECORDER_BTN_WIDTH)
self._btn.set_clicked_fn(self._on_toggled)
def _update_from_state(self, muted):
if self._btn is not None:
self._btn.text = "UNMUTE" if muted else "MUTE"
class ButtonRecordPause(BoolSettingWidgetBase):
def __init__(self, setting):
super().__init__(setting, state=False)
self._btn = None
self._btn_overlay = None
def shutdown(self):
self._btn = None
self._btn_overlay = None
super().shutdown()
def _build_widget(self):
with ui.ZStack(width=RECORDER_BTN_WIDTH):
self._btn = ui.Button(" REC", width=RECORDER_BTN_WIDTH)
self._btn_overlay = ui.HStack(width=RECORDER_BTN_WIDTH)
with self._btn_overlay:
ui.Spacer(width=15)
ui.Circle(width=10, style={"background_color": 0xA03030AA})
self._btn.set_clicked_fn(self._on_toggled)
def _update_from_state(self, is_recording):
if self._btn is not None and self._btn_overlay is not None:
if is_recording:
self._btn.text = f"{ui.get_custom_glyph_code('${glyphs}/timeline_pause.svg')} PAUSE"
self._btn_overlay.visible = False
else:
self._btn.text = " REC"
self._btn_overlay.visible = True
class ButtonLiveMode(BoolSettingWidgetBase):
def __init__(self, setting):
super().__init__(setting, state=False)
self._btn = None
def shutdown(self):
self._btn = None
super().shutdown()
def _build_widget(self):
self._btn = ui.Button("LIVE", width=RECORDER_BTN_WIDTH)
self._btn.set_clicked_fn(self._on_toggled)
def _update_from_state(self, live_mode):
if self._btn is not None:
self._btn.selected = live_mode
class DBMeterWidget(SimpleWidget):
def __init__(self):
super().__init__()
self._dBFS_arr_win_size = 8
self._dBFS_min = -60.0
self._dBFS_arr = np.zeros((self._dBFS_arr_win_size), dtype=np.float32) + self._dBFS_min
self._dbmeter_rect_green = None
self._dbmeter_rect_yellow = None
self._dbmeter_rect_red = None
self._dbmeter_dark_bar_spacer = None
self._dbmeter_dark_bar = None
self._dbmeter_border = None
self._txt_dbmeter = None
def shutdown(self):
self._dBFS_arr = None
self._dbmeter_rect_green = None
self._dbmeter_rect_yellow = None
self._dbmeter_rect_red = None
self._dbmeter_dark_bar_spacer = None
self._dbmeter_dark_bar = None
self._dbmeter_border = None
self._txt_dbmeter = None
super().shutdown()
def update(self, magnitude):
max_level = 1.0
if magnitude > 1e-6:
dBFS = 20.0 * math.log10(magnitude / max_level)
else:
dBFS = self._dBFS_min - 0.001
# Smoothing
def window2val(arr):
return -math.sqrt(np.mean(arr ** 2))
if dBFS > window2val(self._dBFS_arr):
self._dBFS_arr = np.zeros((self._dBFS_arr_win_size), dtype=np.float32) + dBFS
else:
self._dBFS_arr = np.roll(self._dBFS_arr, -1)
self._dBFS_arr[-1] = dBFS
dBFS = window2val(self._dBFS_arr)
if self._txt_dbmeter is not None:
if dBFS >= self._dBFS_min:
self._txt_dbmeter.text = "{:.1f} dB".format(dBFS)
else:
self._txt_dbmeter.text = "- Inf dB"
if self._dbmeter_dark_bar is not None and self._dbmeter_dark_bar_spacer is not None:
portion = min(max(1.0 - dBFS / self._dBFS_min, 0.0), 1.0)
self._dbmeter_dark_bar_spacer.width = ui.Percent(portion * 100)
self._dbmeter_dark_bar.width = ui.Percent((1 - portion) * 100)
def _build_content(self):
with ui.HStack(height=22):
with ui.VStack():
ui.Spacer(height=7)
with ui.ZStack(height=8):
with ui.HStack(width=ui.Percent(100)):
self._dbmeter_rect_green = ui.Rectangle(
width=ui.Percent(40.0 / 60.0 * 100), style=DBMeterRectGreenStyle
)
self._dbmeter_rect_yellow = ui.Rectangle(
width=ui.Percent(11.0 / 60.0 * 100), style=DBMeterRectYellowStyle
)
self._dbmeter_rect_red = ui.Rectangle(
width=ui.Percent(9.0 / 60.0 * 100), style=DBMeterRectRedStyle
)
with ui.HStack(width=ui.Percent(100)):
self._dbmeter_dark_bar_spacer = ui.Spacer(width=ui.Percent(0))
self._dbmeter_dark_bar = ui.Rectangle(width=ui.Percent(100), style=DBMeterDarkBarStyle)
self._dbmeter_border = ui.Rectangle(width=ui.Percent(100), style=DBMeterBorderStyle)
self._txt_dbmeter = ui.Label(
"- Inf dB", width=70, style={"margin_width": 10}, alignment=omni.ui.Alignment.RIGHT_CENTER
)
class RecorderWidget(SimpleWidget):
def __init__(self, instance_widget):
super().__init__()
self._instance_widget = instance_widget
self._btn_mute = ButtonMuteUnmute(self._instance_widget._instance.setting("recorder_muted"))
self._btn_record = ButtonRecordPause(self._instance_widget._instance.setting("is_recording"))
self._btn_live_mode = ButtonLiveMode(self._instance_widget._instance.setting("recorder_live_mode"))
self._dbmeter_widget = DBMeterWidget()
self._btn_new = None
self._btn_save = None
self._field_track_name = StringSettingWidget(self._instance_widget._instance.setting("recorder_track_name"))
self._dbmeter_updater = RareUpdater(0.03)
self._waveform_updater = RareUpdater(0.3)
self._update_sub = omni.kit.app.get_app().get_update_event_stream().create_subscription_to_pop(self._on_update)
def shutdown(self):
self._update_sub = None
self._btn_mute.shutdown()
self._btn_mute = None
self._btn_record.shutdown()
self._btn_record = None
self._btn_live_mode.shutdown()
self._btn_live_mode = None
self._dbmeter_widget.shutdown()
self._dbmeter_widget = None
self._btn_new = None
self._btn_save = None
self._field_track_name.shutdown()
self._field_track_name = None
self._dbmeter_updater = None
self._waveform_updater = None
self._instance_widget = None
super().shutdown()
def _build_content(self):
with ui.VStack(spacing=VSPACING):
ui.Rectangle(height=2, style=SeparatorStyle)
with ui.HStack():
with ui.HStack(width=0):
self._btn_mute.build()
self._btn_record.build()
self._btn_live_mode.build()
ui.Spacer(width=4)
self._dbmeter_widget.build()
with ui.HStack():
with ui.HStack(width=0):
self._btn_new = ui.Button("NEW", width=RECORDER_BTN_WIDTH)
self._btn_new.set_clicked_fn(self._on_btn_new_clicked)
self._btn_save = ui.Button("SAVE AS", width=RECORDER_BTN_WIDTH)
self._btn_save.set_clicked_fn(self._on_btn_save_clicked)
self._field_track_name.build()
def _on_btn_new_clicked(self):
self._instance_widget._instance.audio_recorder_new_track()
def _on_btn_save_clicked(self):
self._instance_widget._audio_recorder_prompt_save_track()
def _on_update(self, *_):
if self._instance_widget._instance.get_recorder() is not None and self._dbmeter_widget is not None:
if self._dbmeter_updater.it_is_time():
self._dbmeter_widget.update(self._instance_widget._instance.get_recorder().get_signal_magnitude())
if self._instance_widget._instance.is_recording():
if self._waveform_updater.it_is_time():
self._instance_widget.update_track_waveform()
class PlayerInstanceWidget(CollapsableWidget):
def __init__(self, instance):
super().__init__(title="AUDIO PLAYER", default_collapsed=False)
self._instance = instance
self._root_path_widget = PathWidgetWithReset(self._instance.setting("track_root_path"))
self._track_widget = CategoricalSettingWidgetWithReset(self._instance.setting("track_name"))
self._range_widget = FloatRangeWidgetWithReset(
self._instance.setting("range"),
get_start_fn=lambda: self._instance.get_setting_val("time"),
get_end_fn=lambda: self._instance.get_setting_val("time"),
fast_change=True,
format="%.2f",
)
self._timeline_widget = TimelineWidget(self._instance)
self._timecode_widget = TimecodeWidget(self._instance)
self._recorder_widget = RecorderWidget(self)
self._btn_rewind = None
self._btn_play = ButtonPlayPause(self._instance.setting("is_playing"))
self._btn_loop = ButtonLooping(self._instance.setting("is_looping"))
self._btn_recorder = ButtonRecorder(self._instance.setting("recorder_enabled"))
#MyCode
udp_server(self)
self._subs = []
self._subs.append(
self._instance.get_event_stream().create_subscription_to_pop_by_type(
AudioPlayerEventType.TRACK_CHANGED, self._on_track_changed
)
)
self._subs.append(
self._instance.get_event_stream().create_subscription_to_pop_by_type(
AudioPlayerEventType.TRACK_RECORDER_STATE_CHANGED, self._on_recorder_visibility_changed
)
)
self._subs.append(
self._instance.get_event_stream().create_subscription_to_pop_by_type(
AudioPlayerEventType.TRACK_RECORDING_STATE_CHANGED, self._on_recodding_state_changed
)
)
self._subs.append(
self._instance.get_event_stream().create_subscription_to_pop_by_type(
AudioPlayerEventType.TRACK_WAVEFORM_CHANGED, self._on_waveform_changed
)
)
self._save_track_overwrite_popup = None
self._choose_output_fpath_dialog = None
self.update_track_waveform()
def shutdown(self):
self._subs = []
self._root_path_widget.shutdown()
self._root_path_widget = None
self._track_widget.shutdown()
self._track_widget = None
self._range_widget.shutdown()
self._range_widget = None
self._timeline_widget.shutdown()
self._timeline_widget = None
self._timecode_widget.shutdown()
self._timecode_widget = None
self._recorder_widget.shutdown()
self._recorder_widget = None
self._btn_rewind = None
self._btn_play.shutdown()
self._btn_play = None
self._btn_loop.shutdown()
self._btn_loop = None
self._btn_recorder.shutdown()
self._btn_recorder = None
self._instance = None
if self._save_track_overwrite_popup is not None:
self._save_track_overwrite_popup.shutdown()
self._save_track_overwrite_popup = None
if self._choose_output_fpath_dialog is not None:
self._choose_output_fpath_dialog.hide()
self._choose_output_fpath_dialog = None
super().shutdown()
def _audio_recorder_prompt_save_track(self):
if self._instance._audio_recorder is None:
return
if self._instance.is_recording():
self._instance.audio_recorder_pause()
self._open_choose_output_fpath_dialog()
def _open_choose_output_fpath_dialog(self):
def on_selected(filename: str, dirname: str, extension: str = '', selections: List[str] = []):
if dirname is not None and filename != "":
if os.path.splitext(filename)[1] != ".wav":
filename += ".wav"
self._instance.set_setting_val("recorder_track_name", filename)
track_fpath = path_join(dirname, filename)
log_info("Output file path is selected: {}".format(track_fpath))
if is_valid_path(track_fpath):
self._show_save_track_overwrite_popup(track_fpath)
else:
self._instance._audio_recorder_save_track_confirmed(track_fpath)
if self._choose_output_fpath_dialog is None:
self._choose_output_fpath_dialog = get_file_exporter()
self._choose_output_fpath_dialog.show_window(
title="Choose Output Audio File Path",
export_button_label="Save",
export_handler=on_selected,
show_only_folders=False,
enable_filename_input=True,
file_extension_types=[(".wav", "Audio Wave Files (*.wav)")],
filename_url=self._instance.get_setting_val("recorder_track_name"),
)
def _show_save_track_overwrite_popup(self, track_fpath):
if self._save_track_overwrite_popup is not None:
self._save_track_overwrite_popup.shutdown()
self._save_track_overwrite_popup = PromptOverwriteFile(
track_fpath, confirm_fn=lambda: self._instance._audio_recorder_save_track_confirmed(track_fpath)
)
def update_title(self, show_full_title):
title = "AUDIO PLAYER"
if self._instance.get_setting_val("dummy_player"):
title += " (DUMMY)"
if show_full_title:
title += " :: " + str(self._instance.get_name())
self.set_title(title)
def _on_waveform_changed(self, event):
self.update_track_waveform()
def _on_track_changed(self, event):
self.update_track_waveform()
def update_track_waveform(self):
self._timeline_widget.update_track_waveform()
def set_timeline_style(self, style):
self._timeline_widget.set_style(style)
def _on_recodding_state_changed(self, event):
if self._instance.is_recording():
self.set_timeline_style("recording")
else:
self.set_timeline_style("regular")
def _on_recorder_visibility_changed(self, event):
self.update_recorder_visibility()
def update_recorder_visibility(self):
if self._recorder_widget is not None:
if self._instance.get_setting_val("recorder_enabled"):
self._recorder_widget.show()
else:
self._recorder_widget.hide()
def _build_content(self):
with ui.VStack(spacing=VSPACING):
self._root_path_widget.build()
self._track_widget.build()
self._range_widget.build()
with ui.HStack(height=WAVEFORM_HEIGHT):
ui.Spacer(width=4)
self._timeline_widget.build()
self._timeline_widget.set_style("regular")
ui.Spacer(width=4)
with ui.VStack(spacing=VSPACING, width=0):
self._timecode_widget.build()
with ui.HStack(height=22):
self._btn_play.build()
self._btn_rewind = ui.Button(width=BTN_WIDTH, style=RewindBtnStyle, tooltip="Rewind (R)")
self._btn_rewind.set_clicked_fn(self._on_btn_rewind_clicked)
self._btn_loop.build()
self._btn_recorder.build()
self._recorder_widget.build()
self.update_recorder_visibility()
def _on_btn_rewind_clicked(self):
self._instance.audio_player_rewind()
class PlayerStreamingInstanceWidget(CollapsableWidget):
def __init__(self, instance):
super().__init__(title="AUDIO PLAYER STREAMING", default_collapsed=False)
self._instance = instance
self._timeline_widget = TimelineWidget(self._instance)
self._timecode_widget = TimecodeWidget(self._instance)
self._subs = []
self._subs.append(
self._instance.get_event_stream().create_subscription_to_pop_by_type(
AudioPlayerEventType.TRACK_CHANGED, self._on_track_changed
)
)
self._subs.append(
self._instance.get_event_stream().create_subscription_to_pop_by_type(
AudioPlayerEventType.TRACK_WAVEFORM_CHANGED, self._on_waveform_changed
)
)
self.update_track_waveform()
def shutdown(self):
self._subs = []
self._timeline_widget.shutdown()
self._timeline_widget = None
self._timecode_widget.shutdown()
self._timecode_widget = None
self._instance = None
super().shutdown()
def update_title(self, show_full_title):
title = "AUDIO PLAYER STREAMING"
if self._instance.get_setting_val("dummy_player"):
title += " (DUMMY)"
if show_full_title:
title += " :: " + str(self._instance.get_name())
self.set_title(title)
def _on_waveform_changed(self, event):
self.update_track_waveform()
def _on_track_changed(self, event):
self.update_track_waveform()
def update_track_waveform(self):
self._timeline_widget.update_track_waveform()
def _build_content(self):
with ui.VStack(spacing=VSPACING):
with ui.HStack(height=WAVEFORM_HEIGHT):
ui.Spacer(width=4)
self._timeline_widget.build()
self._timeline_widget.set_style("streaming")
ui.Spacer(width=4)
with ui.VStack(spacing=VSPACING, width=128):
ui.Spacer()
self._timecode_widget.build()
ui.Spacer()
self._main_wrapper.set_mouse_pressed_fn(self._on_mouse_clicked)
def _on_mouse_clicked(self, x, y, button, key_mod):
if button == 1: # Right click
context_menu = omni.kit.context_menu.get_instance()
menu_list = [
{
"name": "Send example track",
"glyph": "timeline_play.svg",
"show_fn": lambda *_: True,
"onclick_fn": lambda *_: self._instance.send_example_track(),
}
]
context_menu.show_context_menu("streaming player menu", menu_list=menu_list, objects={}, min_menu_entries=1)
@singledispatch
def create_instance_widget(instance):
raise ValueError(f"Unhandled type {type(instance)}")
@create_instance_widget.register(AudioPlayerInstance)
def create_instance_widget_player(instance):
return PlayerInstanceWidget(instance)
@create_instance_widget.register(AudioPlayerStreamingInstance)
def create_instance_widget_streaming_player(instance):
return PlayerStreamingInstanceWidget(instance)
class PlayerInstanceManagerWidget(SimpleWidget):
def __init__(self, instance_manager):
super().__init__()
self._instance_manager = instance_manager
self._sub = self._instance_manager.get_event_stream().create_subscription_to_pop(self.on_instance_created)
self._widgets = {}
def on_instance_created(self, event):
if event.type == InstanceManagerEventType.INSTANCE_CREATED:
self.refresh()
elif event.type == InstanceManagerEventType.INSTANCE_REMOVED:
name = event.payload["name"]
if name in self._widgets:
self._widgets[name].shutdown()
del self._widgets[name]
self.refresh()
def shutdown(self):
self._instance_manager = None
self._sub = None
for w in self._widgets.values():
w.shutdown()
self.widgets = {}
super().shutdown()
def _build_content(self):
with ui.VStack():
show_collapsable = len(self._instance_manager.get_instances()) > 1
show_full_title = show_collapsable
for instance in self._instance_manager.get_instances().values():
name = instance.get_name()
if name not in self._widgets:
self._widgets[name] = create_instance_widget(instance)
w = self._widgets[name]
w.update_title(show_full_title)
w.build()
My logic is to call the click event of the play button when UDP sends the message “start playback” and “pause playback”.
Now I have a few questions:
1 After sending the message, my own audio Audio2Face basically freezes, but the default template basically does not. And whether it is the default template or my audio. There is no problem when manually clicking the play button, which means it is not a performance problem.
- I know:
def _update_from_state(self, is_playing):
if self._btn is n
![preview|582x378](upload://3Wa9qJ3NW8YKWCJcCl3VuxHoWTS.png)
ot None:
if is_playing == True:
self._btn.set_style(PauseBtnStyle)
else:
self._btn.set_style(PlayBtnStyle)
but I don’t know where the code that actually executes the audio playback is.
- There is a CategoricalSettingWidgetWithReset, which can refresh the audio in the folder. I don’t know its implementation logic either.