Hi there,
sorry for the late reply:
here is and example of how one can do this with the BasicWriter
:
import asyncio
import os
import carb
import omni.replicator.core as rep
settings_i = carb.settings.acquire_settings_interface()
settings_i.set_bool("/app/omni.graph.scriptnode/opt_in", True)
def rgba_to_bgra_np(data_in):
return data_in[..., [2, 1, 0, 3]]
rep.AnnotatorRegistry.register_augmentation("rgba_to_bgra_np", rep.Augmentation.from_function(rgba_to_bgra_np))
out_dir = os.getcwd() + "/_out_augm"
print(f" *** Output directory: {out_dir}")
mat = rep.create.material_omnipbr(diffuse=(1, 0, 0))
rep.create.cube(material=mat)
camera = rep.create.camera(position=(0, 0, 5), look_at=(0, 0, 0))
render_product = rep.create.render_product(camera, (512, 512))
bgr_augm = rep.AnnotatorRegistry.get_augmentation("rgba_to_bgra_np")
basic_writer = rep.WriterRegistry.get("BasicWriter")
basic_writer.initialize(output_dir=out_dir, rgb=True)
for annot in basic_writer.annotators:
if annot.get_name().startswith("rgb"):
orig_name = annot.get_name()
annot.augment(bgr_augm)
annot._name = orig_name
basic_writer.attach([render_product])
rep.orchestrator.preview()
asyncio.ensure_future(rep.orchestrator.run_async(num_frames=5))
Here is another example using annotator/writer combinations:
import asyncio
import os
import carb
import numpy as np
import omni.kit
import omni.replicator.core as rep
import warp as wp
from PIL import Image
settings_i = carb.settings.acquire_settings_interface()
settings_i.set_bool("/app/omni.graph.scriptnode/opt_in", True)
def write_rgba(data, path):
rgba_img = Image.fromarray(data, mode="RGBA")
rgba_img.save(path + ".png")
def write_depth(data, path):
# Normalize, handle invalid nan values, and convert to 8-bit integer array (from float32)
normalized_array = (data - np.min(data)) / (np.max(data) - np.min(data))
normalized_array = np.nan_to_num(normalized_array)
integer_array = (normalized_array * 255).astype(np.uint8)
depth_img = Image.fromarray(integer_array, mode="L")
depth_img.save(path + ".png")
def rgba_to_bgra_np(data_in):
return data_in[..., [2, 1, 0, 3]]
rep.AnnotatorRegistry.register_augmentation("rgba_to_bgra_np", rep.Augmentation.from_function(rgba_to_bgra_np))
@wp.kernel
def rgba_to_bgra_wp(data_in: wp.array(dtype=wp.uint8, ndim=3), data_out: wp.array(dtype=wp.uint8, ndim=3)):
i, j = wp.tid()
data_out[i, j, 0] = data_in[i, j, 2]
data_out[i, j, 1] = data_in[i, j, 1]
data_out[i, j, 2] = data_in[i, j, 0]
data_out[i, j, 3] = data_in[i, j, 3]
def gaussian_noise_np(data_in, kernel_seed, sigma: float = 25.0):
np.random.seed(kernel_seed)
data_in[:, :, 0] = data_in[:, :, 0] + np.random.randn(*data_in.shape[:-1]) * sigma
data_in[:, :, 1] = data_in[:, :, 1] + np.random.randn(*data_in.shape[:-1]) * sigma
data_in[:, :, 2] = data_in[:, :, 2] + np.random.randn(*data_in.shape[:-1]) * sigma
return data_in
def gaussian_noise_depth_np(data_in, kernel_seed, sigma: float = 25.0):
np.random.seed(kernel_seed)
data_in = data_in + np.random.randn(*data_in.shape) * sigma
return data_in
out_dir = os.getcwd() + "/_out_augm"
os.makedirs(out_dir, exist_ok=True)
print(f" *** Output directory: {out_dir}")
mat = rep.create.material_omnipbr(diffuse=(1, 0, 0))
rep.create.cube(material=mat)
camera = rep.create.camera(position=(0, 0, 5), look_at=(0, 0, 0))
render_product = rep.create.render_product(camera, (512, 512))
### Augmentation
bgr_augm = rep.AnnotatorRegistry.get_augmentation("rgba_to_bgra_np")
# bgr_augm = rep.annotators.Augmentation.from_function(rgba_to_bgra_wp)
gn_augm = rep.annotators.Augmentation.from_function(gaussian_noise_np, kernel_seed=123, sigma=21.0)
gn_depth_augm = rep.annotators.Augmentation.from_function(gaussian_noise_depth_np, kernel_seed=123, sigma=221.0)
### Annot
rgba_annot = rep.AnnotatorRegistry.get_annotator("rgb")
rgba_annot.augment(bgr_augm)
rgba_annot2 = rep.AnnotatorRegistry.get_annotator("rgb")
rgba_annot2.augment_compose([bgr_augm, gn_augm])
depth_annot = rep.AnnotatorRegistry.get_annotator("distance_to_camera")
depth_annot.augment(gn_depth_augm)
rgba_annot.attach([render_product])
rgba_annot2.attach([render_product])
depth_annot.attach([render_product])
### Writer
basic_writer = rep.WriterRegistry.get("BasicWriter")
basic_writer.initialize(output_dir=out_dir, rgb=True, distance_to_camera=True)
for annot in basic_writer.annotators:
if annot.get_name().startswith("rgb"):
orig_name = annot.get_name()
annot.augment(bgr_augm)
annot._name = orig_name
elif annot.get_name().startswith("distance_to_camera"):
orig_name = annot.get_name()
annot.augment(gn_depth_augm)
annot._name = orig_name
basic_writer.attach([render_product])
### Replicator
rep.orchestrator.preview()
async def step_replicator(nun_steps):
for i in range(nun_steps):
await rep.orchestrator.step_async()
rgba_data = rgba_annot.get_data()
write_rgba(rgba_data, f"{out_dir}/rgb_annot_{i}")
rgba_data2 = rgba_annot2.get_data()
write_rgba(rgba_data2, f"{out_dir}/rgb_annot2_{i}")
depth_data = depth_annot.get_data()
write_depth(depth_data, f"{out_dir}/depth_annot_{i}")
print(f" *** Step {i} done")
asyncio.ensure_future(step_replicator(5))
In the next release this will be possible in a more straightforward way (e.g. avoiding accessing private members).
Cheers,
Andrei