C++ pipelining APIs
The pipeline API allows you to run inferencing for a segmented model across multiple Edge TPUs.
For more information and a walkthrough of this API, read Pipeline a model with multiple Edge TPUs.
[pipelined_model_runner.h source]
-
class
coral
::
PipelinedModelRunner
¶ Runs inferencing for a segmented model, using a pipeline of Edge TPUs.
This class assumes each segment has a dedicated Edge TPU, which allows all segments to run in parallel and improves throughput.
For example, if you have a pool of requests to process:
auto model_segments_interpreters = ModelSegmentsInterpreters(model_segments_paths); // Caller can set custom allocators for input and output tensors with // `input_tensor_allocator` and `output_tensor_allocator` arguments. auto runner = PipelinedModelRunner(model_segments_interpreters); auto* input_tensor_allocator = runner.GetInputTensorAllocator(); auto* output_tensor_allocator = runner.GetOutputTensorAllocator(); const int total_num_requests = 1000; auto request_producer = [&runner, &total_num_requests]() { for (int i = 0; i < total_num_requests; ++i) { // Caller is responsible for allocating input tensors. CHECK(runner.Push(CreateInputTensors(input_tensor_allocator)).ok()); } }; auto result_consumer = [&runner, &total_num_requests]() { for (int i = 0; i < total_num_requests; ++i) { std::vector<Tensor> output_tensors; CHECK(runner.Pop(&output_tensors).ok()); ConsumeOutputTensors(output_tensors); // Caller is responsible for deallocating output tensors. FreeTensors(output_tensor_allocator, output_tensors); } }; auto producer_thread = std::thread(request_producer); auto consumer_thread = std::thread(result_consumer);
Or, if you have a stream of requests to process:
auto model_segments_interpreters = ModelSegmentsInterpreters(model_segments_paths); // Caller can set custom allocators for input and output tensors with // `input_tensor_allocator` and `output_tensor_allocator` arguments. auto runner = PipelinedModelRunner(model_segments_interpreters); auto* input_tensor_allocator = runner.GetInputTensorAllocator(); auto* output_tensor_allocator = runner.GetOutputTensorAllocator(); auto request_producer = [&runner]() { while (true) { // Caller is responsible for allocating input tensors. CHECK(runner.Push(CreateInputTensors(input_tensor_allocator)).ok()); if (ShouldStop()) { // Pushing special inputs to signal no more inputs will be pushed. CHECK(runner.Push({}).ok()); break; } } }; auto result_consumer = [&runner]() { std::vector<Tensor> output_tensors; while (runner.Pop(&output_tensors).ok() && !output_tensors.empty()) { ConsumeOutputTensors(output_tensors); // Caller is responsible for deallocating output tensors. FreeTensors(output_tensor_allocator, output_tensors); } }; auto producer_thread = std::thread(request_producer); auto consumer_thread = std::thread(result_consumer);
This class is thread-safe.
Public Functions
-
PipelinedModelRunner
(const std::vector<tflite::Interpreter*> &model_segments_interpreters, Allocator *input_tensor_allocator = nullptr, Allocator *output_tensor_allocator = nullptr)¶ Initializes the PipelinedModelRunner with model segments.
Note:
input_tensor_allocator
is only used to free the input tensors, as this class assumes that input tensors are allocated by caller.output_tensor_allocator
is only used to allocate output tensors, as this class assumes that output tensors are freed by caller after consuming them.
- Parameters
model_segments_interpreters
: A vector of pointers to tflite::Interpreter objects, each representing a model segment and unique Edge TPU context.model_segments_interpreters[0]
should be the first segment interpreter of the model,model_segments_interpreters[1]
is the second segment, and so on.input_tensor_allocator
: A custom Allocator for input tensors. By default (nullptr
), it uses an allocator provided by this class.output_tensor_allocator
: A custom Allocator for output tensors. By default (nullptr
), it uses an allocator provided by this class.
-
Allocator *
GetInputTensorAllocator
() const¶ Returns the default input tensor allocator (or the allocator given to the constructor).
-
Allocator *
GetOutputTensorAllocator
() const¶ Returns the default output tensor allocator (or the allocator given to the constructor).
-
void
SetInputQueueSize
(size_t size)¶ Sets input queue size.
By default, input queue size is unlimited.
Note: It is OK to change queue size threshold when
PipelinedModelRunner is active. If new threshold is smaller than current queue size, push to the queue will be blocking until the current queue size drops below the new threshold.- Parameters
size
: Input queue size.
-
void
SetOutputQueueSize
(size_t size)¶ Sets output queue size.
By default, output queue size is unlimited.
Note: It is OK to change queue size threshold when
PipelinedModelRunner is active. If new threshold is smaller than current queue size, push to the queue will be blocking until the current queue size drops below the new threshold.- Parameters
size
: Output queue size.
-
absl::Status
Push
(const std::vector<PipelineTensor> &input_tensors)¶ Pushes input tensors to be processed by the pipeline.
Note:
Caller is responsible for allocating memory for input tensors. By default, this class will free those tensors when they are consumed. Caller can set a custom allocator for input tensors if needed.
Pushing an empty vector
{}
is allowed, which signals the class that no more inputs will be added (the function will return false if inputs were pushed after this special push). This special push allows Pop()’s consumer to properly drain unconsumed output tensors. See above example for details.Caller will get blocked if current input queue size is greater than input queue size threshold. By default, input queue size threshold is unlimited, i.e., call to Push() is non-blocking.
- Parameters
input_tensors
: A vector of input tensors, each wrapped as a PipelineTensor. The order must match Interpreter::inputs() from the first model segment.
- Return
absl::OkStatus if successful; absl::InternalError otherwise.
-
absl::Status
Pop
(std::vector<PipelineTensor> *output_tensors)¶ Gets output tensors from the pipeline.
Note:
Caller is responsible for deallocating memory for output tensors after consuming the tensors. By default, the output tensors are allocated using default tensor allocator. Caller can set a custom allocator for output tensors if needed.
Caller will get blocked if there is no output tensors available and no empty push is received.
- Parameters
output_tensors
: A pointer to a vector of PipelineTensor objects where outputs should be stored. Returned output tensors order matches Interpreter::outputs() of the last model segment.
- Return
absl::OkStatus when output is received, or the pipeline input queue has already been stopped, and is empty, in which case
output_tensors
will be empty. Otherwise absl::InternalError.
-
std::vector<SegmentStats>
GetSegmentStats
() const¶ Returns performance stats for each segment.
-
-
struct
coral
::
PipelineTensor
¶ A tensor in the pipeline system.
This is a simplified version of
TfLiteTensor
.Public Members
-
std::string
name
¶ Unique tensor name.
-
TfLiteType
type
¶ The data type specification for data stored in
data
.This affects what member of
data
union should be used.
-
size_t
bytes
¶ The number of bytes required to store the data of this tensor.
That is:
(bytes of each element) * dims[0] * ... * dims[n-1]
. For example, if type is kTfLiteFloat32 anddims = {3, 2}
thenbytes = sizeof(float) * 3 * 2 = 4 * 3 * 2 = 24
.
-
std::string
-
void
coral
::
FreePipelineTensors
(const std::vector<PipelineTensor> &tensors, Allocator *allocator)¶ Deallocates the memory for the given tensors.
Use this to free output tensors each time you process the results.
- Parameters
tensors
: A vector of PipelineTensor objects to release.allocator
: The Allocator originally used to allocate the tensors.
-
struct
coral
::
SegmentStats
¶ Performance statistics for one segment of model pipeline.
-
class
coral
::
Allocator
¶
-
class
coral
::
Buffer
¶ Public Functions
-
void *
ptr
() = 0¶ Returns user space pointer. Returned pointer can be nullptr.
-
void *
MapToHost
()¶ Maps buffer to host address space and returns the pointer.
Returns nullptr if the mapping was not successful. If the buffer was mapped before or does not require mapping, then this is an no-op.
Note if the underlying buffer is backed by DMA, DMA_BUF_IOCTL_SYNC ioctl call might be needed for cache coherence. When such memory is consumed by Edge TPU, such ioctl call is NOT necessary as the driver only uses the pointer to identify the backing physical pages for DMA.
-
bool
UnmapFromHost
()¶ Unmaps buffer from host address space.
Returns true if successful; false otherwise. This should be called explicitly if
MapToHost()
was called before. Calling on an already unmapped buffer is a no-op.
-
int
fd
()¶ Returns file descriptor, -1 if buffer is NOT backed by file descriptor.
-
void *
Is this content helpful?