I wrote some python based on the linux_audio_player.py and linux_imx274_player.py examples and tried to get the linux_receiver operator to serves as an input to the udp_transmitter operator for loopback testing without success.
I can send data into the Jetson and I believe the linux_receiver is working, but not the other way.
Any help would be appreciated.
# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import argparse
import logging
import time
import cuda.bindings.driver as cuda
import holoscan
import hololink as hololink_module
class AudioTransmitApp(holoscan.core.Application):
def __init__(self, hololink_ip, hololink_channel, cuda_context, camera, frame_limit):
super().__init__()
self._hololink_ip = hololink_ip
self._hololink_channel = hololink_channel
self._cuda_context = cuda_context
self._camera = camera
self._frame_limit = frame_limit
def compose(self):
# Create allocator for GPU memory
allocator = holoscan.resources.UnboundedAllocator(
self,
name="allocator",
)
# Create UDP receiver operator
if self._frame_limit:
self._count = holoscan.conditions.CountCondition(
self,
name="count",
count=self._frame_limit,
)
condition = self._count
else:
self._ok = holoscan.conditions.BooleanCondition(
self, name="ok", enable_tick=True
)
condition = self._ok
frame_size = 1024
frame_context = self._cuda_context
receiver_operator = hololink_module.operators.LinuxReceiverOperator(
self,
condition,
name="receiver",
frame_size=frame_size,
frame_context=frame_context,
hololink_channel=self._hololink_channel,
device=self._camera,
)
# Create UDP transmitter operator
udp_transmitter = hololink_module.operators.UdpTransmitterOp(
self,
ip=self._hololink_ip,
port=4791,
max_buffer_size=32768,
name="udp_transmitter",
lossy=False,
)
# Connect operators
self.add_flow(receiver_operator, udp_transmitter, {("output", "input")})
def main():
parser = argparse.ArgumentParser(description="Audio Transmission Application")
parser.add_argument(
"--frame-limit",
type=int,
default=None,
help="Exit after receiving this many frames",
)
parser.add_argument(
"--expander-configuration",
type=int,
default=0,
choices=(0, 1),
help="I2C Expander configuration",
)
parser.add_argument("--hololink", required=True, help="Hololink IP address")
args = parser.parse_args()
# Get a handle to the Hololink device
channel_metadata = hololink_module.Enumerator.find_channel(channel_ip=args.hololink)
hololink_channel = hololink_module.DataChannel(channel_metadata)
# Get a handle to the GPU
(cu_result,) = cuda.cuInit(0)
assert cu_result == cuda.CUresult.CUDA_SUCCESS
cu_device_ordinal = 0
cu_result, cu_device = cuda.cuDeviceGet(cu_device_ordinal)
assert cu_result == cuda.CUresult.CUDA_SUCCESS
cu_result, cu_context = cuda.cuDevicePrimaryCtxRetain(cu_device)
assert cu_result == cuda.CUresult.CUDA_SUCCESS
# Get a handle to the camera
camera = hololink_module.sensors.imx274.dual_imx274.Imx274Cam(
hololink_channel, expander_configuration=args.expander_configuration
)
app = AudioTransmitApp(
hololink_ip=args.hololink,
hololink_channel=hololink_channel,
cuda_context=cu_context,
camera=camera,
frame_limit=args.frame_limit
)
hololink = hololink_channel.hololink()
hololink.start()
hololink.reset()
app.run()
#hololink.stop()
if __name__ == "__main__":
main()
