video_path and image_context_path can both take png and jpg etc as inputs.
Cosmos Writer specifically outputs mp4. It could be adapted to output the modalities as single images, but you’re right, it would be nice to have a CosmosImageWriter or something for scenarios where I2I is needed.
Let me see what the team thinks about releasing something official. But in the interim here’s something that’s hacked together with the help of Cursor. It can hopefully point you in the right direction. Its something I put together to get cleaner canny edges, but its got normals and shaded views also. But not colored shading. See if you can adapt this for your purposes.
Outputs should look like this:
import asyncio
from typing import Dict
import omni.replicator.core as rep
import numpy as np
import warp as wp
from omni.replicator.core import functional as F
from datetime import datetime
now = datetime.now()
time = now.strftime("%H-%M-%S")
output_directory = "_genaiwriter_" + time
@wp.kernel
def rgb_to_grey_and_blur(data_in: wp.array3d(dtype=wp.uint8), data_out: wp.array2d(dtype=wp.uint8)):
i, j = wp.tid()
height = data_in.shape[0]
width = data_in.shape[1]
if i > 0 and i < height - 1 and j > 0 and j < width - 1:
kernel = wp.mat33f(1.0, 2.0, 1.0, 2.0, 4.0, 2.0, 1.0, 2.0, 1.0) / 16.0
sum = 0.0
for ki in range(-1, 2):
for kj in range(-1, 2):
gray_val = (
wp.float(data_in[i + ki, j + kj, 0]) * 0.299
+ wp.float(data_in[i + ki, j + kj, 1]) * 0.587
+ wp.float(data_in[i + ki, j + kj, 2]) * 0.114
)
sum += gray_val * kernel[ki + 1, kj + 1]
data_out[i, j] = wp.uint8(wp.clamp(sum, 0.0, 255.0))
else:
data_out[i, j] = wp.uint8(
wp.float(data_in[i, j, 0]) * 0.299 + wp.float(data_in[i, j, 1]) * 0.587 + wp.float(data_in[i, j, 2]) * 0.114
)
@wp.kernel
def sobel_and_suppress(
data_in: wp.array2d(dtype=wp.uint8),
data_out: wp.array3d(dtype=wp.uint8),
low_threshold: float,
high_threshold: float,
):
i, j = wp.tid()
height = data_in.shape[0]
width = data_in.shape[1]
if i > 0 and i < height - 2 and j > 0 and j < width - 2:
gx = (
-1.0 * wp.float(data_in[i - 1, j - 1])
+ -2.0 * wp.float(data_in[i, j - 1])
+ -1.0 * wp.float(data_in[i + 1, j - 1])
+ 1.0 * wp.float(data_in[i - 1, j + 1])
+ 2.0 * wp.float(data_in[i, j + 1])
+ 1.0 * wp.float(data_in[i + 1, j + 1])
)
gy = (
-1.0 * wp.float(data_in[i - 1, j - 1])
+ -2.0 * wp.float(data_in[i - 1, j])
+ -1.0 * wp.float(data_in[i - 1, j + 1])
+ 1.0 * wp.float(data_in[i + 1, j - 1])
+ 2.0 * wp.float(data_in[i + 1, j])
+ 1.0 * wp.float(data_in[i + 1, j + 1])
)
magnitude = wp.sqrt(gx * gx + gy * gy)
angle = wp.atan2(gy, gx) * 180.0 / 3.14159
if angle < 0:
angle += 180.0
g00 = wp.float(0.0)
g01 = wp.float(0.0)
xstep = wp.float(0.0)
ystep = wp.float(0.0)
if angle <= 22.5 or angle > 157.5:
xstep = wp.float(1.0)
ystep = wp.float(0.0)
elif angle > 22.5 and angle <= 67.5:
xstep = wp.float(1.0)
ystep = wp.float(1.0)
elif angle > 67.5 and angle <= 112.5:
xstep = wp.float(0.0)
ystep = wp.float(1.0)
else:
xstep = wp.float(-1.0)
ystep = wp.float(1.0)
x1 = wp.float(j) + xstep
y1 = wp.float(i) + ystep
if x1 >= 0 and x1 < width and y1 >= 0 and y1 < height:
x1_floor = wp.int32(x1)
y1_floor = wp.int32(y1)
if x1_floor >= 0 and x1_floor < width - 1 and y1_floor >= 0 and y1_floor < height - 1:
gx1 = (
-1.0 * wp.float(data_in[y1_floor - 1, x1_floor - 1])
+ -2.0 * wp.float(data_in[y1_floor, x1_floor - 1])
+ -1.0 * wp.float(data_in[y1_floor + 1, x1_floor - 1])
+ 1.0 * wp.float(data_in[y1_floor - 1, x1_floor + 1])
+ 2.0 * wp.float(data_in[y1_floor, x1_floor + 1])
+ 1.0 * wp.float(data_in[y1_floor + 1, x1_floor + 1])
)
gy1 = (
-1.0 * wp.float(data_in[y1_floor - 1, x1_floor - 1])
+ -2.0 * wp.float(data_in[y1_floor - 1, x1_floor])
+ -1.0 * wp.float(data_in[y1_floor - 1, x1_floor + 1])
+ 1.0 * wp.float(data_in[y1_floor + 1, x1_floor - 1])
+ 2.0 * wp.float(data_in[y1_floor + 1, x1_floor])
+ 1.0 * wp.float(data_in[y1_floor + 1, x1_floor + 1])
)
g00 = wp.sqrt(gx1 * gx1 + gy1 * gy1)
x2 = wp.float(j) - xstep
y2 = wp.float(i) - ystep
if x2 >= 0 and x2 < width and y2 >= 0 and y2 < height:
x2_floor = wp.int32(x2)
y2_floor = wp.int32(y2)
if x2_floor >= 0 and x2_floor < width - 1 and y2_floor >= 0 and y2_floor < height - 1:
gx2 = (
-1.0 * wp.float(data_in[y2_floor - 1, x2_floor - 1])
+ -2.0 * wp.float(data_in[y2_floor, x2_floor - 1])
+ -1.0 * wp.float(data_in[y2_floor + 1, x2_floor - 1])
+ 1.0 * wp.float(data_in[y2_floor - 1, x2_floor + 1])
+ 2.0 * wp.float(data_in[y2_floor, x2_floor + 1])
+ 1.0 * wp.float(data_in[y2_floor + 1, x2_floor + 1])
)
gy2 = (
-1.0 * wp.float(data_in[y2_floor - 1, x2_floor - 1])
+ -2.0 * wp.float(data_in[y2_floor - 1, x2_floor])
+ -1.0 * wp.float(data_in[y2_floor - 1, x2_floor + 1])
+ 1.0 * wp.float(data_in[y2_floor + 1, x2_floor - 1])
+ 2.0 * wp.float(data_in[y2_floor + 1, x2_floor])
+ 1.0 * wp.float(data_in[y2_floor + 1, x2_floor + 1])
)
g01 = wp.sqrt(gx2 * gx2 + gy2 * gy2)
if magnitude > g00 and magnitude > g01:
scaled_magnitude = magnitude * 3.0
if scaled_magnitude >= high_threshold:
data_out[i, j, 0] = wp.uint8(255)
elif scaled_magnitude >= low_threshold:
data_out[i, j, 0] = wp.uint8(127)
else:
data_out[i, j, 0] = wp.uint8(0)
else:
data_out[i, j, 0] = wp.uint8(0)
else:
data_out[i, j, 0] = wp.uint8(0)
@wp.kernel
def hysteresis_thresholding(data_inout: wp.array3d(dtype=wp.uint8)):
i, j = wp.tid()
height = data_inout.shape[0]
width = data_inout.shape[1]
if i > 0 and i < height - 1 and j > 0 and j < width - 1:
if data_inout[i, j, 0] == 127:
has_strong_neighbor = float(0.0)
for di in range(-1, 2):
if i + di < 0 or i + di >= height:
continue
for dj in range(-1, 2):
if j + dj < 0 or j + dj >= width:
continue
if di == 0 and dj == 0:
continue
if data_inout[i + di, j + dj, 0] == 255:
has_strong_neighbor += 1.0
break
if has_strong_neighbor >= 1.0:
break
if has_strong_neighbor >= 1.0:
data_inout[i, j, 0] = wp.uint8(255)
else:
data_inout[i, j, 0] = wp.uint8(0)
data_inout[i, j, 1] = data_inout[i, j, 0]
data_inout[i, j, 2] = data_inout[i, j, 0]
def _remap_depth_for_export(depth_data, z_far: float = 100.0) -> np.ndarray:
"""Remap depth for export, matching depth_export_png.py behavior.
Clamps to z_far, inverts (near=white, far=black), outputs uint16.
Replaces NaN/inf with large value so invalid pixels render as black.
Args:
depth_data: Raw depth from distance_to_image_plane annotator
z_far: Maximum distance for scaling (default 100.0)
Returns:
uint16 depth image (H, W)
"""
if isinstance(depth_data, dict) and "data" in depth_data:
depth_data = depth_data["data"]
if hasattr(depth_data, "numpy"):
depth = depth_data.numpy()
else:
depth = np.asarray(depth_data, dtype=np.float32)
depth = np.squeeze(depth)
depth = np.nan_to_num(depth, nan=z_far, posinf=z_far, neginf=z_far)
depth = np.clip(depth, 0, z_far)
normalized = 1.0 - (depth / z_far)
return (normalized * 65535).astype(np.uint16)
def _shaded_view(normals: np.ndarray, camera_params: Dict = None) -> np.ndarray:
"""Shade pixels based on dot product of surface normals with camera forward direction.
Surfaces facing the camera appear brighter. Uses camera_params for world-space
normals; falls back to camera-space Z component if camera_params is not available.
Args:
normals: Normal map (H, W, 3) or (H, W, 4), typically in range [-1, 1]
camera_params: Optional camera parameters from annotator. If provided, uses
cameraViewTransform to compute camera forward direction in world space.
Returns:
RGBA image (H, W, 4) as uint8
"""
# Ensure we have XYZ components (handle Warp/tensor inputs)
if hasattr(normals, "numpy"):
normals = normals.numpy()
normals_xyz = np.asarray(normals)
if normals_xyz.ndim == 3 and normals_xyz.shape[2] >= 3:
normals_xyz = normals_xyz[:, :, :3].astype(np.float32)
else:
raise ValueError("Normals must be (H, W, 3) or (H, W, 4)")
if (
camera_params is not None
and isinstance(camera_params, dict)
and "cameraViewTransform" in camera_params
):
# Extract camera forward direction from view matrix (world space)
view_matrix = np.asarray(camera_params["cameraViewTransform"]).reshape(4, 4).T
# Camera looks down -Z in camera space; row 2 is camera's Z axis in world space
camera_forward = -view_matrix[2, :3]
camera_forward = camera_forward / (np.linalg.norm(camera_forward) + 1e-8)
# Dot product: surfaces facing camera have positive dot product with view direction
view_dir = -camera_forward # from surface toward camera
shade = np.dot(normals_xyz.reshape(-1, 3), view_dir).reshape(normals_xyz.shape[0], normals_xyz.shape[1])
else:
# Assume normals in camera space; Z component indicates facing camera
shade = normals_xyz[:, :, 2]
# Remap from [-1, 1] to [0, 255]
shade = np.clip(shade * 0.5 + 0.5, 0, 1)
shade_uint8 = (shade * 255).astype(np.uint8)
# Broadcast to RGBA
return np.stack([shade_uint8, shade_uint8, shade_uint8, np.full_like(shade_uint8, 255)], axis=-1)
def _colorize_instance_segmentation(instance_data) -> np.ndarray:
"""Colorize instance IDs for visualization.
Args:
instance_data: Instance segmentation - either dict with "data" key or raw array (H,W) of instance IDs.
Returns:
RGBA image (H, W, 4) as uint8
"""
import colorsys
if isinstance(instance_data, dict) and "data" in instance_data:
instance_ids = np.asarray(instance_data["data"])
else:
instance_ids = np.asarray(instance_data)
if hasattr(instance_ids, "numpy"):
instance_ids = instance_ids.numpy()
instance_ids = instance_ids.astype(np.uint32).squeeze()
unique_ids = np.unique(instance_ids)
max_id = int(np.max(instance_ids))
id_to_idx = np.full(max_id + 1, len(unique_ids), dtype=np.int32)
for idx, uid in enumerate(unique_ids):
id_to_idx[int(uid)] = idx
lut_idx = id_to_idx[instance_ids]
num_colours = len(unique_ids)
colours = np.zeros((num_colours + 1, 4), dtype=np.float32)
for idx, uid in enumerate(unique_ids):
if uid == 0:
colours[idx] = [0, 0, 0, 0]
else:
h = (uid * 0.618033988749895) % 1.0
r, g, b = colorsys.hsv_to_rgb(h, 0.8, 1.0)
colours[idx] = [r, g, b, 1.0]
colours[num_colours] = [0, 0, 0, 0]
result = colours[lut_idx]
return (result * 255).astype(np.uint8)
def _remap_segmentation(segmentation_data: np.ndarray, mapping: Dict) -> np.ndarray:
"""Remap semantic IDs to predefined colours
Args:
segmentation_image: data returned by the annotator.
Return:
Data converted to uint8 RGBA image
"""
segmentation = segmentation_data["data"]
mapping = {
int(k): mapping.get(v.get("class"), (0, 0, 0, 0))
for k, v in segmentation_data["info"]["idToLabels"].items()
}
segmentation_ids = np.unique(segmentation)
num_colours = len(segmentation_ids)
# This is to avoid generating lots of colours for semantic classes not in frame
lut = np.array([segmentation_ids, list(range(num_colours))])
new_segmentation_image = lut[1, np.searchsorted(lut[0, :], segmentation)]
colours = np.array([[0.0] * 4] * (num_colours + 1))
for idx in range(lut.shape[1]):
semantic_id, lut_idx = lut[:, idx]
colours[lut_idx] = mapping.get(semantic_id)
segmentation_image_rgba = np.array(colours[new_segmentation_image], dtype=np.uint8)
return segmentation_image_rgba
class GenAIWriter(rep.writers.Writer):
def __init__(
self,
output_dir,
canny_threshold_low: int = 10,
canny_threshold_high: int = 100,
depth_z_far: float = 100.0,
):
self.frame_id = 0
self._canny_threshold_low = canny_threshold_low
self._canny_threshold_high = canny_threshold_high
self._depth_z_far = depth_z_far
self._cached_buffers = {}
self.class_mapping = {
"cone": (255, 25, 171, 255),
"background": (149, 255, 25, 255),
"walking_worker": (25, 240, 255, 255),
"warehouse_pallets": (255, 197, 25, 255),
"warehouse_bin": (255, 249, 25, 255),
}
self.annotators = [
"LdrColor",
"normals",
"distance_to_image_plane",
"semantic_segmentation",
"instance_segmentation",
"camera_params",
]
self.backend = rep.backends.get(
"DiskBackend", init_params={"output_dir": output_dir}
)
def _get_canny_edges(self, rgb_image):
"""Get Canny edges from an RGB image.
Args:
rgb_image: RGB image array (H,W,3) - numpy or Warp array
"""
if hasattr(rgb_image, "numpy"):
rgb_np = rgb_image.numpy()
else:
rgb_np = np.asarray(rgb_image)
if not isinstance(rgb_np, np.ndarray):
raise TypeError(f"_get_canny_edges expects array-like input, got {type(rgb_image)}")
if rgb_np.dtype != np.uint8:
max_val = float(np.max(rgb_np))
rgb_np = (np.clip(rgb_np, 0, 1) * 255).astype(np.uint8) if max_val <= 1 else rgb_np.astype(np.uint8)
height, width = rgb_np.shape[:2]
rgb_3ch = rgb_np[:, :, :3].copy()
device = "cuda"
rgb_wp = wp.from_numpy(rgb_3ch, dtype=wp.uint8, device=device)
if self._cached_buffers.get("greyscale") is None or self._cached_buffers["greyscale"].shape != (height, width):
self._cached_buffers["greyscale"] = wp.empty(
dtype=wp.uint8,
shape=(height, width),
device=device,
)
canny_edges_out = wp.empty(dtype=wp.uint8, shape=(height, width, 3), device=device)
wp.launch(
kernel=rgb_to_grey_and_blur,
dim=(height, width),
inputs=[rgb_wp, self._cached_buffers["greyscale"]],
device=device,
)
wp.launch(
kernel=sobel_and_suppress,
dim=(height, width),
inputs=[
self._cached_buffers["greyscale"],
canny_edges_out,
float(self._canny_threshold_low),
float(self._canny_threshold_high),
],
device=device,
)
wp.launch(
kernel=hysteresis_thresholding,
dim=(height, width),
inputs=[canny_edges_out],
device=device,
)
return canny_edges_out
def _combine_canny_maps(self, canny1, canny2):
"""Combine two Canny edge maps by taking the max pixel value at each location.
Args:
canny1: First edge map (H,W,3) - Warp or numpy array
canny2: Second edge map (H,W,3) - Warp or numpy array
Returns:
Combined edge map (H,W,3) as uint8
"""
if hasattr(canny1, "numpy"):
c1 = canny1.numpy()
else:
c1 = np.asarray(canny1)
if hasattr(canny2, "numpy"):
c2 = canny2.numpy()
else:
c2 = np.asarray(canny2)
combined = np.maximum(c1, c2)
return combined.astype(np.uint8)
def write(self, data):
# write colour
self.backend.schedule(
F.write_image, data=data["LdrColor"], path=f"ldr_color_{self.frame_id}.png"
)
# write normals
colourize_normals = rep.backends.Sequential(
lambda x: ((x * 0.5 + 0.5) * 255).astype(np.uint8)
)
self.backend.schedule(
F.write_image,
data=colourize_normals(data["normals"]),
path=f"normals_{self.frame_id}.png",
)
# write shaded view (normals shaded by camera forward direction)
shaded_view_data = _shaded_view(data["normals"], data.get("camera_params"))
self.backend.schedule(
F.write_image,
data=shaded_view_data,
path=f"shaded_view_{self.frame_id}.png",
)
# write depth (remap with z_far scaling, matching depth_export_png.py)
depth_uint16 = _remap_depth_for_export(
data["distance_to_image_plane"], z_far=self._depth_z_far
)
self.backend.schedule(
F.write_image,
data=depth_uint16,
path=f"depth_{self.frame_id}.png",
)
# write segmentation
remap_segmentation = rep.backends.Sequential(
lambda x: _remap_segmentation(x, self.class_mapping)
)
self.backend.schedule(
F.write_image,
data=remap_segmentation(data["semantic_segmentation"]),
path=f"semantic_{self.frame_id}.png",
)
# write instance segmentation
instance_seg_colorized = _colorize_instance_segmentation(data["instance_segmentation"])
self.backend.schedule(
F.write_image,
data=instance_seg_colorized,
path=f"instance_segmentation_{self.frame_id}.png",
)
# write Canny edges (from LdrColor)
canny_edges = self._get_canny_edges(data["LdrColor"])
self.backend.schedule(
F.write_image,
data=canny_edges,
path=f"canny_edges_{self.frame_id}.png",
)
# write shaded_edges: Canny on instance seg + Canny on shaded_view, combined by max
canny_instance = self._get_canny_edges(instance_seg_colorized)
canny_shaded = self._get_canny_edges(shaded_view_data)
shaded_edges = self._combine_canny_maps(canny_instance, canny_shaded)
self.backend.schedule(
F.write_image,
data=shaded_edges,
path=f"shaded_edges_{self.frame_id}.png",
)
self.frame_id += 1
NUM_IMAGES = 8
CAMERA_POSITIONS = [(20,20,17), (10,20,17),(20,10,17)]
# Get the current USD context, and make a new stage
ctx = omni.usd.get_context()
ctx.new_stage()
stage = omni.usd.get_context().get_stage()
# Set the scene UP axis and the default unit size
rep.settings.set_stage_up_axis("Z")
rep.settings.set_stage_meters_per_unit(1)
# Set the renderer
rep.settings.set_render_pathtraced(samples_per_pixel=16)
#rep.settings.set_render_rtx_realtime()
# Randomization Seed
rng = np.random.RandomState(1234)
camera = F.create.camera(position=(19,19,17), look_at=(0,0,0), focus_distance=0, f_stop=8)
#Create a distant Light
distant_light = F.create.distant_light(position=(0,0,0), intensity=15000.0, rotation=(-30,0,90))
rect_light = F.create.rect_light(position=(0,0,5), intensity=10000, rotation=(-2.4,37.3,0))
cone = F.create.cone(position=(0,0,1.3), scale=2, semantics=[("class", "cone")])
floor = F.create.cube(position=(0,0,0), scale=(10,10,0.1), semantics=[("class", "background")])
wall1 = F.create.cube(position=(-4.50,0,2.50), scale=(1,10,5), semantics=[("class", "background")])
wall2 = F.create.cube(position=(0,-4.50,2.50), scale=(10,1,5), semantics=[("class", "background")])
# Create a render product, define the use of the default Basic Writer and initialize it
rp = rep.create.render_product(camera, (1280, 704))
#writer = rep.WriterRegistry.get("BasicWriter")
#writer.initialize(output_dir="replicator_basic_spawning_example", rgb=True)
writer = GenAIWriter(output_directory)
writer.attach(rp, trigger= None)
n = len(CAMERA_POSITIONS)
async def capture():
for i in range(NUM_IMAGES):
F.modify.pose(camera, position_value=CAMERA_POSITIONS[i%n])
F.modify.look_at(camera, look_at_up_axis=(0,0,1), value=(0,0,1.25))
await rep.orchestrator.step_async(wait_for_render=False)
writer.schedule_write()
asyncio.ensure_future(capture())