Cosmos Transfer from pictures?

We are doing SDG mostly using scatter functions which makes our outputs very random for each frame. From the documentation and samples I’ve seen around CosmosWriter it and the Cosmos Transfer rely heavily on video files, instead of images.

Can we get an example on how to operate Cosmos Transfer without having to produce those mp4s?

Good timing on bringing this up. This was released in December under “Image2Image”.

Take a look at the inference guide here:

1 Like

Thank you for that info!

How do we leverage the CosmosWriter outputs in that model? Or should we just pursue the regular rgb path?

Seems like the depth and segmentation data would be helpful to the process.

Also @pcallender the image2image example you posted is based on the video files?

video_path and image_context_path can both take png and jpg etc as inputs.

Cosmos Writer specifically outputs mp4. It could be adapted to output the modalities as single images, but you’re right, it would be nice to have a CosmosImageWriter or something for scenarios where I2I is needed.

Let me see what the team thinks about releasing something official. But in the interim here’s something that’s hacked together with the help of Cursor. It can hopefully point you in the right direction. Its something I put together to get cleaner canny edges, but its got normals and shaded views also. But not colored shading. See if you can adapt this for your purposes.

Outputs should look like this:

import asyncio
from typing import Dict
import omni.replicator.core as rep
import numpy as np
import warp as wp
from omni.replicator.core import functional as F
from datetime import datetime

now = datetime.now()
time = now.strftime("%H-%M-%S")
output_directory = "_genaiwriter_" + time

@wp.kernel
def rgb_to_grey_and_blur(data_in: wp.array3d(dtype=wp.uint8), data_out: wp.array2d(dtype=wp.uint8)):
    i, j = wp.tid()
    height = data_in.shape[0]
    width = data_in.shape[1]

    if i > 0 and i < height - 1 and j > 0 and j < width - 1:
        kernel = wp.mat33f(1.0, 2.0, 1.0, 2.0, 4.0, 2.0, 1.0, 2.0, 1.0) / 16.0
        sum = 0.0
        for ki in range(-1, 2):
            for kj in range(-1, 2):
                gray_val = (
                    wp.float(data_in[i + ki, j + kj, 0]) * 0.299
                    + wp.float(data_in[i + ki, j + kj, 1]) * 0.587
                    + wp.float(data_in[i + ki, j + kj, 2]) * 0.114
                )
                sum += gray_val * kernel[ki + 1, kj + 1]
        data_out[i, j] = wp.uint8(wp.clamp(sum, 0.0, 255.0))
    else:
        data_out[i, j] = wp.uint8(
            wp.float(data_in[i, j, 0]) * 0.299 + wp.float(data_in[i, j, 1]) * 0.587 + wp.float(data_in[i, j, 2]) * 0.114
        )


@wp.kernel
def sobel_and_suppress(
    data_in: wp.array2d(dtype=wp.uint8),
    data_out: wp.array3d(dtype=wp.uint8),
    low_threshold: float,
    high_threshold: float,
):
    i, j = wp.tid()
    height = data_in.shape[0]
    width = data_in.shape[1]

    if i > 0 and i < height - 2 and j > 0 and j < width - 2:
        gx = (
            -1.0 * wp.float(data_in[i - 1, j - 1])
            + -2.0 * wp.float(data_in[i, j - 1])
            + -1.0 * wp.float(data_in[i + 1, j - 1])
            + 1.0 * wp.float(data_in[i - 1, j + 1])
            + 2.0 * wp.float(data_in[i, j + 1])
            + 1.0 * wp.float(data_in[i + 1, j + 1])
        )
        gy = (
            -1.0 * wp.float(data_in[i - 1, j - 1])
            + -2.0 * wp.float(data_in[i - 1, j])
            + -1.0 * wp.float(data_in[i - 1, j + 1])
            + 1.0 * wp.float(data_in[i + 1, j - 1])
            + 2.0 * wp.float(data_in[i + 1, j])
            + 1.0 * wp.float(data_in[i + 1, j + 1])
        )
        magnitude = wp.sqrt(gx * gx + gy * gy)
        angle = wp.atan2(gy, gx) * 180.0 / 3.14159
        if angle < 0:
            angle += 180.0
        g00 = wp.float(0.0)
        g01 = wp.float(0.0)
        xstep = wp.float(0.0)
        ystep = wp.float(0.0)
        if angle <= 22.5 or angle > 157.5:
            xstep = wp.float(1.0)
            ystep = wp.float(0.0)
        elif angle > 22.5 and angle <= 67.5:
            xstep = wp.float(1.0)
            ystep = wp.float(1.0)
        elif angle > 67.5 and angle <= 112.5:
            xstep = wp.float(0.0)
            ystep = wp.float(1.0)
        else:
            xstep = wp.float(-1.0)
            ystep = wp.float(1.0)
        x1 = wp.float(j) + xstep
        y1 = wp.float(i) + ystep
        if x1 >= 0 and x1 < width and y1 >= 0 and y1 < height:
            x1_floor = wp.int32(x1)
            y1_floor = wp.int32(y1)
            if x1_floor >= 0 and x1_floor < width - 1 and y1_floor >= 0 and y1_floor < height - 1:
                gx1 = (
                    -1.0 * wp.float(data_in[y1_floor - 1, x1_floor - 1])
                    + -2.0 * wp.float(data_in[y1_floor, x1_floor - 1])
                    + -1.0 * wp.float(data_in[y1_floor + 1, x1_floor - 1])
                    + 1.0 * wp.float(data_in[y1_floor - 1, x1_floor + 1])
                    + 2.0 * wp.float(data_in[y1_floor, x1_floor + 1])
                    + 1.0 * wp.float(data_in[y1_floor + 1, x1_floor + 1])
                )
                gy1 = (
                    -1.0 * wp.float(data_in[y1_floor - 1, x1_floor - 1])
                    + -2.0 * wp.float(data_in[y1_floor - 1, x1_floor])
                    + -1.0 * wp.float(data_in[y1_floor - 1, x1_floor + 1])
                    + 1.0 * wp.float(data_in[y1_floor + 1, x1_floor - 1])
                    + 2.0 * wp.float(data_in[y1_floor + 1, x1_floor])
                    + 1.0 * wp.float(data_in[y1_floor + 1, x1_floor + 1])
                )
                g00 = wp.sqrt(gx1 * gx1 + gy1 * gy1)
        x2 = wp.float(j) - xstep
        y2 = wp.float(i) - ystep
        if x2 >= 0 and x2 < width and y2 >= 0 and y2 < height:
            x2_floor = wp.int32(x2)
            y2_floor = wp.int32(y2)
            if x2_floor >= 0 and x2_floor < width - 1 and y2_floor >= 0 and y2_floor < height - 1:
                gx2 = (
                    -1.0 * wp.float(data_in[y2_floor - 1, x2_floor - 1])
                    + -2.0 * wp.float(data_in[y2_floor, x2_floor - 1])
                    + -1.0 * wp.float(data_in[y2_floor + 1, x2_floor - 1])
                    + 1.0 * wp.float(data_in[y2_floor - 1, x2_floor + 1])
                    + 2.0 * wp.float(data_in[y2_floor, x2_floor + 1])
                    + 1.0 * wp.float(data_in[y2_floor + 1, x2_floor + 1])
                )
                gy2 = (
                    -1.0 * wp.float(data_in[y2_floor - 1, x2_floor - 1])
                    + -2.0 * wp.float(data_in[y2_floor - 1, x2_floor])
                    + -1.0 * wp.float(data_in[y2_floor - 1, x2_floor + 1])
                    + 1.0 * wp.float(data_in[y2_floor + 1, x2_floor - 1])
                    + 2.0 * wp.float(data_in[y2_floor + 1, x2_floor])
                    + 1.0 * wp.float(data_in[y2_floor + 1, x2_floor + 1])
                )
                g01 = wp.sqrt(gx2 * gx2 + gy2 * gy2)
        if magnitude > g00 and magnitude > g01:
            scaled_magnitude = magnitude * 3.0
            if scaled_magnitude >= high_threshold:
                data_out[i, j, 0] = wp.uint8(255)
            elif scaled_magnitude >= low_threshold:
                data_out[i, j, 0] = wp.uint8(127)
            else:
                data_out[i, j, 0] = wp.uint8(0)
        else:
            data_out[i, j, 0] = wp.uint8(0)
    else:
        data_out[i, j, 0] = wp.uint8(0)


@wp.kernel
def hysteresis_thresholding(data_inout: wp.array3d(dtype=wp.uint8)):
    i, j = wp.tid()
    height = data_inout.shape[0]
    width = data_inout.shape[1]

    if i > 0 and i < height - 1 and j > 0 and j < width - 1:
        if data_inout[i, j, 0] == 127:
            has_strong_neighbor = float(0.0)
            for di in range(-1, 2):
                if i + di < 0 or i + di >= height:
                    continue
                for dj in range(-1, 2):
                    if j + dj < 0 or j + dj >= width:
                        continue
                    if di == 0 and dj == 0:
                        continue
                    if data_inout[i + di, j + dj, 0] == 255:
                        has_strong_neighbor += 1.0
                        break
                if has_strong_neighbor >= 1.0:
                    break
            if has_strong_neighbor >= 1.0:
                data_inout[i, j, 0] = wp.uint8(255)
            else:
                data_inout[i, j, 0] = wp.uint8(0)
    data_inout[i, j, 1] = data_inout[i, j, 0]
    data_inout[i, j, 2] = data_inout[i, j, 0]


def _remap_depth_for_export(depth_data, z_far: float = 100.0) -> np.ndarray:
    """Remap depth for export, matching depth_export_png.py behavior.

    Clamps to z_far, inverts (near=white, far=black), outputs uint16.
    Replaces NaN/inf with large value so invalid pixels render as black.

    Args:
        depth_data: Raw depth from distance_to_image_plane annotator
        z_far: Maximum distance for scaling (default 100.0)

    Returns:
        uint16 depth image (H, W)
    """
    if isinstance(depth_data, dict) and "data" in depth_data:
        depth_data = depth_data["data"]
    if hasattr(depth_data, "numpy"):
        depth = depth_data.numpy()
    else:
        depth = np.asarray(depth_data, dtype=np.float32)
    depth = np.squeeze(depth)
    depth = np.nan_to_num(depth, nan=z_far, posinf=z_far, neginf=z_far)
    depth = np.clip(depth, 0, z_far)
    normalized = 1.0 - (depth / z_far)
    return (normalized * 65535).astype(np.uint16)


def _shaded_view(normals: np.ndarray, camera_params: Dict = None) -> np.ndarray:
    """Shade pixels based on dot product of surface normals with camera forward direction.

    Surfaces facing the camera appear brighter. Uses camera_params for world-space
    normals; falls back to camera-space Z component if camera_params is not available.

    Args:
        normals: Normal map (H, W, 3) or (H, W, 4), typically in range [-1, 1]
        camera_params: Optional camera parameters from annotator. If provided, uses
            cameraViewTransform to compute camera forward direction in world space.

    Returns:
        RGBA image (H, W, 4) as uint8
    """
    # Ensure we have XYZ components (handle Warp/tensor inputs)
    if hasattr(normals, "numpy"):
        normals = normals.numpy()
    normals_xyz = np.asarray(normals)
    if normals_xyz.ndim == 3 and normals_xyz.shape[2] >= 3:
        normals_xyz = normals_xyz[:, :, :3].astype(np.float32)
    else:
        raise ValueError("Normals must be (H, W, 3) or (H, W, 4)")

    if (
        camera_params is not None
        and isinstance(camera_params, dict)
        and "cameraViewTransform" in camera_params
    ):
        # Extract camera forward direction from view matrix (world space)
        view_matrix = np.asarray(camera_params["cameraViewTransform"]).reshape(4, 4).T
        # Camera looks down -Z in camera space; row 2 is camera's Z axis in world space
        camera_forward = -view_matrix[2, :3]
        camera_forward = camera_forward / (np.linalg.norm(camera_forward) + 1e-8)
        # Dot product: surfaces facing camera have positive dot product with view direction
        view_dir = -camera_forward  # from surface toward camera
        shade = np.dot(normals_xyz.reshape(-1, 3), view_dir).reshape(normals_xyz.shape[0], normals_xyz.shape[1])
    else:
        # Assume normals in camera space; Z component indicates facing camera
        shade = normals_xyz[:, :, 2]

    # Remap from [-1, 1] to [0, 255]
    shade = np.clip(shade * 0.5 + 0.5, 0, 1)
    shade_uint8 = (shade * 255).astype(np.uint8)
    # Broadcast to RGBA
    return np.stack([shade_uint8, shade_uint8, shade_uint8, np.full_like(shade_uint8, 255)], axis=-1)


def _colorize_instance_segmentation(instance_data) -> np.ndarray:
    """Colorize instance IDs for visualization.

    Args:
        instance_data: Instance segmentation - either dict with "data" key or raw array (H,W) of instance IDs.

    Returns:
        RGBA image (H, W, 4) as uint8
    """
    import colorsys

    if isinstance(instance_data, dict) and "data" in instance_data:
        instance_ids = np.asarray(instance_data["data"])
    else:
        instance_ids = np.asarray(instance_data)

    if hasattr(instance_ids, "numpy"):
        instance_ids = instance_ids.numpy()
    instance_ids = instance_ids.astype(np.uint32).squeeze()

    unique_ids = np.unique(instance_ids)
    max_id = int(np.max(instance_ids))
    id_to_idx = np.full(max_id + 1, len(unique_ids), dtype=np.int32)
    for idx, uid in enumerate(unique_ids):
        id_to_idx[int(uid)] = idx
    lut_idx = id_to_idx[instance_ids]
    num_colours = len(unique_ids)

    colours = np.zeros((num_colours + 1, 4), dtype=np.float32)
    for idx, uid in enumerate(unique_ids):
        if uid == 0:
            colours[idx] = [0, 0, 0, 0]
        else:
            h = (uid * 0.618033988749895) % 1.0
            r, g, b = colorsys.hsv_to_rgb(h, 0.8, 1.0)
            colours[idx] = [r, g, b, 1.0]
    colours[num_colours] = [0, 0, 0, 0]

    result = colours[lut_idx]
    return (result * 255).astype(np.uint8)


def _remap_segmentation(segmentation_data: np.ndarray, mapping: Dict) -> np.ndarray:
    """Remap semantic IDs to predefined colours

    Args:
        segmentation_image: data returned by the annotator.
    Return:
        Data converted to uint8 RGBA image
    """
    segmentation = segmentation_data["data"]
    mapping = {
        int(k): mapping.get(v.get("class"), (0, 0, 0, 0))
        for k, v in segmentation_data["info"]["idToLabels"].items()
    }
    segmentation_ids = np.unique(segmentation)
    num_colours = len(segmentation_ids)

    # This is to avoid generating lots of colours for semantic classes not in frame
    lut = np.array([segmentation_ids, list(range(num_colours))])

    new_segmentation_image = lut[1, np.searchsorted(lut[0, :], segmentation)]

    colours = np.array([[0.0] * 4] * (num_colours + 1))
    for idx in range(lut.shape[1]):
        semantic_id, lut_idx = lut[:, idx]
        colours[lut_idx] = mapping.get(semantic_id)

    segmentation_image_rgba = np.array(colours[new_segmentation_image], dtype=np.uint8)
    return segmentation_image_rgba


class GenAIWriter(rep.writers.Writer):
    def __init__(
        self,
        output_dir,
        canny_threshold_low: int = 10,
        canny_threshold_high: int = 100,
        depth_z_far: float = 100.0,
    ):
        self.frame_id = 0
        self._canny_threshold_low = canny_threshold_low
        self._canny_threshold_high = canny_threshold_high
        self._depth_z_far = depth_z_far
        self._cached_buffers = {}
        self.class_mapping = {
            "cone": (255, 25, 171, 255),
            "background": (149, 255, 25, 255),
            "walking_worker": (25, 240, 255, 255),
            "warehouse_pallets": (255, 197, 25, 255),
            "warehouse_bin": (255, 249, 25, 255),
        }
        self.annotators = [
            "LdrColor",
            "normals",
            "distance_to_image_plane",
            "semantic_segmentation",
            "instance_segmentation",
            "camera_params",
        ]
        self.backend = rep.backends.get(
            "DiskBackend", init_params={"output_dir": output_dir}
        )

    def _get_canny_edges(self, rgb_image):
        """Get Canny edges from an RGB image.

        Args:
            rgb_image: RGB image array (H,W,3) - numpy or Warp array
        """
        if hasattr(rgb_image, "numpy"):
            rgb_np = rgb_image.numpy()
        else:
            rgb_np = np.asarray(rgb_image)
        if not isinstance(rgb_np, np.ndarray):
            raise TypeError(f"_get_canny_edges expects array-like input, got {type(rgb_image)}")
        if rgb_np.dtype != np.uint8:
            max_val = float(np.max(rgb_np))
            rgb_np = (np.clip(rgb_np, 0, 1) * 255).astype(np.uint8) if max_val <= 1 else rgb_np.astype(np.uint8)
        height, width = rgb_np.shape[:2]
        rgb_3ch = rgb_np[:, :, :3].copy()
        device = "cuda"
        rgb_wp = wp.from_numpy(rgb_3ch, dtype=wp.uint8, device=device)

        if self._cached_buffers.get("greyscale") is None or self._cached_buffers["greyscale"].shape != (height, width):
            self._cached_buffers["greyscale"] = wp.empty(
                dtype=wp.uint8,
                shape=(height, width),
                device=device,
            )
        canny_edges_out = wp.empty(dtype=wp.uint8, shape=(height, width, 3), device=device)

        wp.launch(
            kernel=rgb_to_grey_and_blur,
            dim=(height, width),
            inputs=[rgb_wp, self._cached_buffers["greyscale"]],
            device=device,
        )
        wp.launch(
            kernel=sobel_and_suppress,
            dim=(height, width),
            inputs=[
                self._cached_buffers["greyscale"],
                canny_edges_out,
                float(self._canny_threshold_low),
                float(self._canny_threshold_high),
            ],
            device=device,
        )
        wp.launch(
            kernel=hysteresis_thresholding,
            dim=(height, width),
            inputs=[canny_edges_out],
            device=device,
        )
        return canny_edges_out

    def _combine_canny_maps(self, canny1, canny2):
        """Combine two Canny edge maps by taking the max pixel value at each location.

        Args:
            canny1: First edge map (H,W,3) - Warp or numpy array
            canny2: Second edge map (H,W,3) - Warp or numpy array

        Returns:
            Combined edge map (H,W,3) as uint8
        """
        if hasattr(canny1, "numpy"):
            c1 = canny1.numpy()
        else:
            c1 = np.asarray(canny1)
        if hasattr(canny2, "numpy"):
            c2 = canny2.numpy()
        else:
            c2 = np.asarray(canny2)
        combined = np.maximum(c1, c2)
        return combined.astype(np.uint8)

    def write(self, data):
        # write colour
        self.backend.schedule(
            F.write_image, data=data["LdrColor"], path=f"ldr_color_{self.frame_id}.png"
        )

        # write normals
        colourize_normals = rep.backends.Sequential(
            lambda x: ((x * 0.5 + 0.5) * 255).astype(np.uint8)
        )
        self.backend.schedule(
            F.write_image,
            data=colourize_normals(data["normals"]),
            path=f"normals_{self.frame_id}.png",
        )

        # write shaded view (normals shaded by camera forward direction)
        shaded_view_data = _shaded_view(data["normals"], data.get("camera_params"))
        self.backend.schedule(
            F.write_image,
            data=shaded_view_data,
            path=f"shaded_view_{self.frame_id}.png",
        )

        # write depth (remap with z_far scaling, matching depth_export_png.py)
        depth_uint16 = _remap_depth_for_export(
            data["distance_to_image_plane"], z_far=self._depth_z_far
        )
        self.backend.schedule(
            F.write_image,
            data=depth_uint16,
            path=f"depth_{self.frame_id}.png",
        )

        # write segmentation
        remap_segmentation = rep.backends.Sequential(
            lambda x: _remap_segmentation(x, self.class_mapping)
        )
        self.backend.schedule(
            F.write_image,
            data=remap_segmentation(data["semantic_segmentation"]),
            path=f"semantic_{self.frame_id}.png",
        )

        # write instance segmentation
        instance_seg_colorized = _colorize_instance_segmentation(data["instance_segmentation"])
        self.backend.schedule(
            F.write_image,
            data=instance_seg_colorized,
            path=f"instance_segmentation_{self.frame_id}.png",
        )

        # write Canny edges (from LdrColor)
        canny_edges = self._get_canny_edges(data["LdrColor"])
        self.backend.schedule(
            F.write_image,
            data=canny_edges,
            path=f"canny_edges_{self.frame_id}.png",
        )

        # write shaded_edges: Canny on instance seg + Canny on shaded_view, combined by max
        canny_instance = self._get_canny_edges(instance_seg_colorized)
        canny_shaded = self._get_canny_edges(shaded_view_data)
        shaded_edges = self._combine_canny_maps(canny_instance, canny_shaded)
        self.backend.schedule(
            F.write_image,
            data=shaded_edges,
            path=f"shaded_edges_{self.frame_id}.png",
        )

        self.frame_id += 1



NUM_IMAGES = 8
CAMERA_POSITIONS = [(20,20,17), (10,20,17),(20,10,17)]

# Get the current USD context, and make a new stage
ctx = omni.usd.get_context()
ctx.new_stage()
stage = omni.usd.get_context().get_stage()

# Set the scene UP axis and the default unit size
rep.settings.set_stage_up_axis("Z")
rep.settings.set_stage_meters_per_unit(1)

# Set the renderer
rep.settings.set_render_pathtraced(samples_per_pixel=16)
#rep.settings.set_render_rtx_realtime()

# Randomization Seed
rng = np.random.RandomState(1234)

camera = F.create.camera(position=(19,19,17), look_at=(0,0,0), focus_distance=0, f_stop=8)

#Create a distant Light
distant_light = F.create.distant_light(position=(0,0,0), intensity=15000.0, rotation=(-30,0,90))
rect_light = F.create.rect_light(position=(0,0,5), intensity=10000, rotation=(-2.4,37.3,0))

cone = F.create.cone(position=(0,0,1.3), scale=2, semantics=[("class", "cone")])
floor = F.create.cube(position=(0,0,0), scale=(10,10,0.1), semantics=[("class", "background")])
wall1 = F.create.cube(position=(-4.50,0,2.50), scale=(1,10,5), semantics=[("class", "background")])
wall2 = F.create.cube(position=(0,-4.50,2.50), scale=(10,1,5), semantics=[("class", "background")])

# Create a render product, define the use of the default Basic Writer and initialize it
rp = rep.create.render_product(camera, (1280, 704))
#writer = rep.WriterRegistry.get("BasicWriter")
#writer.initialize(output_dir="replicator_basic_spawning_example", rgb=True)
writer = GenAIWriter(output_directory)
writer.attach(rp, trigger= None)

n = len(CAMERA_POSITIONS)

async def capture():

    for i in range(NUM_IMAGES):
        F.modify.pose(camera, position_value=CAMERA_POSITIONS[i%n])
        F.modify.look_at(camera, look_at_up_axis=(0,0,1), value=(0,0,1.25))
        await rep.orchestrator.step_async(wait_for_render=False)
        writer.schedule_write()
        

asyncio.ensure_future(capture())
1 Like

Thanks for the reply!

I don’t get any mp4 videos from the CosmosWriter, only images. (depth, edges, rgb, etc)

My question was not around saving data (that obviously works) but rather using the Cosmos Transfer with all the images.

All the examples work with mp4 files which I don’t get from the writer.

Here’s what I have from writer - how do I use that in the Transfer?

clip_0000.zip (9.1 MB)

@pcallender ?