Holoscan - Obtain a buffer from BlockMemoryPool and return a buffer to the pool

Hello,
I am working towards modifying a network operator to make use of BlockMemoryPool and MemoryAvailableCondition. The operator has been created with the above mentioned pool and condition arguments.

It is not clear to me how do I get the buffer from the pool in the operator’s compute route and later how is returned back to the pool once I am done with the buffer.

holoscan-sdk/examples/tensor_interop/cpp/send_tensor_gxf.hpp kind of shows something similar in class SendTensor, where it obtains a out_message using CreateTensorMap, but couldn’t find the source for CreateTensorMap to understand how it uses the pool.

Please could somebody point me towards how this can be done.

Thank you!

Hi vangogh,

To be sure we are on the same page, I think this question is being asked for Holoscan’s C++ API?

From C++ it is possible to directly use APIs like nvidia::gxf::CreateTensorMap from the underlying GXF library. We redistribute the headers from that library along with holoscan. In holoscan’s include folder you will find a gxf subfolder containing all of the GXF headers.

The CreateTensorMap component you are asking about is declared in gxf/std/tensor.hpp

// Creates a new entity with a collection of named tensors
Expected<Entity> CreateTensorMap(gxf_context_t context, Handle<Allocator> pool,
                                 std::initializer_list<TensorDescription> descriptions,
                                 bool activate = true);

where the description object struct is also defined in that header

// Type to description a tensor used by 'CreateTensorMap'
struct TensorDescription {
  std::string name;
  MemoryStorageType storage_type;
  Shape shape;
  PrimitiveType element_type;
  uint64_t bytes_per_element;
  // array providing number of bytes for each slice on each rank
  Expected<Tensor::stride_array_t> strides = Unexpected{GXF_UNINITIALIZED_VALUE};
};

BlockMemoryPool is what is known as an Allocator in GXF, so could be passed as the second argument (“pool”) in CreateTensorMap. Any tensors added to that map would be allocated using that allocator. When tensors go out of scope and are destroyed any memory blocks from the block memory pool would be released.

The Allocator interface is defined in gxf/std/allocator.hpp with relevant public methods

  // Returns true if the allocator can provide a memory block with the given size.
  bool is_available(uint64_t size);

  // Allocates a memory block with the given size.
  Expected<byte*> allocate(uint64_t size, MemoryStorageType type);

  // Frees the given memory block.
  Expected<void> free(byte* pointer);

  // Get the block size of this allocator, defaults to 1 for byte-based allocators
  uint64_t block_size() const;

Note that Holoscan provides a holoscan::TensorMap object which is basically an unordered_map<std::string, holoscan::Tensor> which behind the scenes gets transmitted by zero-copy to a GXF Entity containing tensor components (exactly like what is returned by CreateTensorMap.). In C++ there is, unfortunately, not as convenient of an API exposed in Holoscan for direct creation of tensors and often underlying GXF C++ APIs are used for that purpose. Aside from CreateTensorMap, the two APIs on nvidia::gxf::Tensor that can be used are reshapeCustom to allocate memory using a provided Allocator and :wrapMemory to wrap some existing previously allocated memory as a Tensor. Use of both of those GXF tensor APIs can be seen in the example PingTensorTxOp source distributed with Holoscan

CreateTensorMap is basically just a convenience function to provide a more concise way to create a GXF Entity and add multiple Tensor components (each of which would be created by reshapeCustom using the provided allocator).

Holoscan can automatically convert to/from holoscan::TensorMap and a GXF Entity containing tensor components so behavior should be the same whether you emit a holoscan::TensorMap or a GXF Entity as returned by CreateTensorMap. In either case, a downstream operator can use receive<holoscan::TensorMap>(port_name) to receive as an unorderdered map of tensors from the upstream operator

I hope this is helpful

1 Like

Hi grelee,
Thank you for this detailed response. Yes, this is for Holoscan C++ API. There is a lot of valuable and helpful information embedded in your response which I am going to refer to as I make progress on the project. However, I think I created confusion in the way I asked the question. At this time I am not trying to allocate a tensor. I am trying to do this:

I have a network operator that is kind of based on the BasicNetworkOp in holohub examples. Currently, when used as a receive operator the compute routine allocates a receive buffer using new() once. I want to replace this new() based allocation with a buffer obtained from a BlockMemoryPool such that when the compute routine is invoked compute can get one from the pool.

To that end the question I have am trying to answer for myself is how do I get a buffer from such a pool and how is a buffer returned to the pool?

In the examples such PingTensorTxOp a Handle to Allocator is being passed to reshapeCustom() which I guess deals with the allocation etc. However, I could not find an example that deals with Allocator directly to get a buffer.

I have allocator_ declared as Parameter<std::shared_ptr>allocator_. In the compute routine of the operator I have allocator_.allocate(some_size,MemoryStorageType::kHost), but the compiler is not able to resolve allocate.

Thank you.

Oh, I see. Regarding use of the allocator you are correct that the allocate method will obtain a buffer from the pool. When you pass the pointer returned by allocate to the free method of the buffer, it would be freed.

In the GXF level there is a nvidia::gxf::MemoryBuffer class which provides an interface to more easily manage the lifetime of such a memory buffer (gxf/std/memory_buffer.hpp). That class is fully defined in that memory_buffer.hpp header, so you can see full details there. You would declare a MemoryBuffer object, then use the resize method that takes an allocator as an argument to allocate memory of the requested size. The destructor of the MemoryBuffer class would take care of freeing that allocated buffer. That MemoryBuffer class is what is used internally in the implementation of nvidia::gxf::Tensor to handle the lifetime of the memory.

I have allocator_ declared as Parameter<std::shared_ptr>allocator_. In the compute routine of the operator I have allocator_.allocate(some_size,MemoryStorageType::kHost), but the compiler is not able to resolve allocate.

I think the compiler issue you are seeing is likely that allocator_ is of type Parameter so you need to call the method on the contained Allocator and not the Parameter class itself. I think that can be done by either allocator_->allocate(some_size,MemoryStorageType::kHost) or allocator_.get().allocate(some_size,MemoryStorageType::kHost).

However the above is likely still not what you want depending on the use case. Note that the holoscan::Allocator classes like holoscan::BlockMemoryPool provide a wrapper representing an underlying GXF component. If you intend to use the allocator with any of the underlying GXF APIs like the Tensor-related ones described in the previous message or the MemoryBuffer described above then you will want to retrieve the actual underlying GXF component (i.e. an nvidia::gxf::Allocator). To do that you can reference how it is done in the provided FormatConverterOp, for example. In the linked code pool_ is declared as Parameter<Allocator> pool_ in the header so it should be the same as your case;

The GXF code in the lines below takes the GXF context and the component ID and returns a GXF Handle to the allocator. The GXF Handle class as kind of like a std::shared_ptr, so you would use pool->allocate(...), etc. to call methods on the GXF component.

  // get Handle to underlying nvidia::gxf::Allocator from std::shared_ptr<holoscan::Allocator>
  auto pool =
      nvidia::gxf::Handle<nvidia::gxf::Allocator>::Create(context.context(), pool_->gxf_cid());

that start method of that same operator also creates a MemoryBuffer (the class I had described above)

// line 191
device_scratch_buffer_ = std::make_unique<nvidia::gxf::MemoryBuffer>();

(the header defines a few memory buffers, such as device_scratch_buffer_ used by that operator)

The compute method uses the resize method on that buffer to allocate memory from the “pool” allocator

     // lines 351-359
      size_t buffer_size = static_cast<size_t>(rows) * columns * in_channels * element_size;
      if (buffer_size > device_scratch_buffer_->size()) {
        device_scratch_buffer_->resize(
            pool.value(), buffer_size, nvidia::gxf::MemoryStorageType::kDevice);
        if (!device_scratch_buffer_->pointer()) {
          throw std::runtime_error(
              fmt::format("Failed to allocate device scratch buffer ({} bytes)", buffer_size));
        }
      }

Note that for the FormatConverterOp, since it is managing its own buffers created during start the stop method needs to free these.

Thank you for getting back on this. I will definitely research and experiment with each of the things that you have have pointed out. Very much appreciate all the advice that you have offered up till now on this topic.

Just wanted to let you know that I was able to get things working. Thank you for pointing me in the right direction.

Hello,

A couple of questions that I have are:

  1. Suppose an upstream operator emits a message to a downstream operators. Message contains buffer information that downstream operator make use of. Once it has carried out any activity using the buffer it is expected to return the buffer to the upstream operator. What is the correct approach to do this?
    (a) I have attempted passing the Allocator object as part of the message. The downstream operator performs
    free and this seems to return the buffer to the upstream operator’s pool. But the situation arises how can the upstream operator be communicated about this so that it or if it has a async thread pull the buffer out of the pool for re-use.
    (b) Attempted a flow from downsteam op to upstream op. This did not quite work. Such an approach would create loop in the graph.

  2. Downstream operator has created a input port as:
    spec.input<std::shared_ptr<op_msg>>(“burst_in”,
    IOSpec::IOSize(1024)).condition(ConditionType::kMessageAvailable,holoscan::Arg(“min_size”,static_cast<uint64_t>(1)));

    Upstream operator has created an output port as:
    spec.output<std::shared_ptr<op_msg>>(“burst_out”,
    IOSpec::IOSize(16)).condition(ConditionType::kDownstreamMessageAffordable,
    holoscan::Arg(“min_size”,static_cast<uint64_t>(1)));

When the upstream operator’s compute() is scheduled it is possible that it finds that it can emit multiple messages in the context of that compute call. However, as soon as a second emit is performed, following error is observed:

[warning] [double_buffer_transmitter.cpp:79] Push failed on ‘burst_out’
[error] [gxf_io_context.cpp:435] Failed to publish output message with error: GXF_EXCEEDING_PREALLOCATED_SIZE
[error] [gxf_wrapper.cpp:118] Exception occurred for operator: ‘upstream_op’ - Failed to publish output message with error: GXF_EXCEEDING_PREALLOCATED_SIZE
[error] [entity_executor.cpp:596] Failed to tick codelet upstream_op in entity: upstream_op code: GXF_FAILURE

It wasn’t clear why this error would be produced when the downstream operator’s queue was setup with a size of 1024.

Thanks,
Vangogh

Hi Vangogh,

The error seems unrelated to the downstream queue. It is coming from the upstream operator failing to push to the queue on emit (output port “burst_out” queue is full).

I’m not sure yet from just the above description why this is. Is it possible to make some kind of minimal reproducer app you could share so we can help better debug the issue? How many times are you planning to call emit in the upstream operator’s compute call?

Thanks,
Greg

Greg,
Thanks. That is a good suggestion. I will look into creating such an app.

Do you have any thoughts on the other question: The context is that in our application we register buffers and reuse them as unregister/register is intensive and costly. That is the reason I am investigating how a downstream operator can return a buffer to an upstream operator.

Thanks

Hi Vangogh,

When a MemoryBuffer object is created, it allocates memory from a memory pool, as indicated by the first argument of the MemoryBuffer::resize() method.

Upon destruction, the MemoryBuffer instance calls the freeBuffer() method, which resets the MemoryBuffer and invokes the allocator’s free(void*) method to return the memory to the pool. Note that MemoryBuffer::resize() also internally calls freeBuffer() before reallocating memory from the pool.

If you want to explicitly return memory in a downstream operator, you can call the freeBuffer() method within that operator.

Thanks,
Gigon

This topic was automatically closed 14 days after the last reply. New replies are no longer allowed.