C++ pipelining APIs
The pipeline API allows you to run inferencing for a segmented model across multiple Edge TPUs.
For more information and a walkthrough of this API, read Pipeline a model with multiple Edge TPUs.
Source code and header files are at https://github.com/google-coral/edgetpu/tree/master/src/cpp/pipeline/.
coral/pipeline/utils.h
-
namespace
coral
Functions
-
std::unordered_set<std::string>
GetInputTensorNames
(const tflite::Interpreter &interpreter)¶ Returns all input tensor names for the given tflite::Interpreter.
-
void
FreeTensors
(const std::vector<PipelineTensor> &tensors, Allocator *allocator)¶ Deallocates the memory for the given tensors.
Use this to free output tensors each time you process the results.
- Parameters
tensors
: A vector of PipelineTensor objects to release.allocator
: The Allocator originally used to allocate the tensors.
-
const TfLiteTensor *
GetInputTensor
(const tflite::Interpreter &interpreter, const char *name)¶ Returns the input tensor matching
name
in the given tflite::Interpreter.Returns nullptr if such tensor does not exist.
-
std::unordered_set<std::string>
-
class
coral
::
Allocator
¶
-
class
coral
::
PipelinedModelRunner
¶ Runs inferencing for a segmented model, using a pipeline of Edge TPUs.
This class assumes each segment has a dedicated Edge TPU, which allows all segments to run in parallel and improves throughput.
For example, if you have a pool of requests to process:
auto model_segments_interpreters = ModelSegmentsInterpreters(model_segments_paths); // Caller can set custom allocators for input and output tensors with // `input_tensor_allocator` and `output_tensor_allocator` arguments. auto runner = PipelinedModelRunner(model_segments_interpreters); auto* input_tensor_allocator = runner.GetInputTensorAllocator(); auto* output_tensor_allocator = runner.GetOutputTensorAllocator(); const int total_num_requests = 1000; auto request_producer = [&runner, &total_num_requests]() { for (int i = 0; i < total_num_requests; ++i) { // Caller is responsible for allocating input tensors. runner.Push(CreateInputTensors(input_tensor_allocator)); } }; auto result_consumer = [&runner, &total_num_requests]() { for (int i = 0; i < total_num_requests; ++i) { std::vector<Tensor> output_tensors; runner.Pop(&output_tensors); ConsumeOutputTensors(output_tensors); // Caller is responsible for deallocating output tensors. FreeTensors(output_tensor_allocator, output_tensors); } }; auto producer_thread = std::thread(request_producer); auto consumer_thread = std::thread(result_consumer);
Or, if you have a stream of requests to process:
auto model_segments_interpreters = ModelSegmentsInterpreters(model_segments_paths); // Caller can set custom allocators for input and output tensors with // `input_tensor_allocator` and `output_tensor_allocator` arguments. auto runner = PipelinedModelRunner(model_segments_interpreters); auto* input_tensor_allocator = runner.GetInputTensorAllocator(); auto* output_tensor_allocator = runner.GetOutputTensorAllocator(); auto request_producer = [&runner]() { while (true) { // Caller is responsible for allocating input tensors. runner.Push(CreateInputTensors(input_tensor_allocator)); if (ShouldStop()) { // Pushing special inputs to signal no more inputs will be pushed. runner.Push({}); break; } } }; auto result_consumer = [&runner]() { std::vector<Tensor> output_tensors; while (runner.Pop(&output_tensors)) { ConsumeOutputTensors(output_tensors); // Caller is responsible for deallocating output tensors. FreeTensors(output_tensor_allocator, output_tensors); } }; auto producer_thread = std::thread(request_producer); auto consumer_thread = std::thread(result_consumer);
This class is thread-safe.
Public Functions
-
PipelinedModelRunner
(const std::vector<tflite::Interpreter*> &model_segments_interpreters, Allocator *input_tensor_allocator = nullptr, Allocator *output_tensor_allocator = nullptr)¶ Initializes the PipelinedModelRunner with model segments.
Note:
input_tensor_allocator
is only used to free the input tensors, as this class assumes that input tensors are allocated by caller.output_tensor_allocator
is only used to allocate output tensors, as this class assumes that output tensors are freed by caller after consuming them.
- Parameters
model_segments_interpreters
: A vector of pointers to tflite::Interpreter objects, each representing a model segment and unique Edge TPU context.model_segments_interpreters[0]
should be the first segment interpreter of the model,model_segments_interpreters[1]
is the second segment, and so on.input_tensor_allocator
: A custom Allocator for input tensors. By default (nullptr
), it uses an allocator provided by this class.output_tensor_allocator
: A custom Allocator for output tensors. By default (nullptr
), it uses an allocator provided by this class.
-
Allocator *
GetInputTensorAllocator
() const¶ Returns the default input tensor allocator (or the allocator given to the constructor).
-
Allocator *
GetOutputTensorAllocator
() const¶ Returns the default output tensor allocator (or the allocator given to the constructor).
-
void
SetInputQueueSize
(size_t size)¶ Sets input queue size.
By default, input queue size is unlimited.
Note: It is OK to change queue size threshold when
PipelinedModelRunner is active. If new threshold is smaller than current queue size, push to the queue will be blocking until the current queue size drops below the new threshold.- Parameters
size
: Input queue size.
-
void
SetOutputQueueSize
(size_t size)¶ Sets output queue size.
By default, output queue size is unlimited.
Note: It is OK to change queue size threshold when
PipelinedModelRunner is active. If new threshold is smaller than current queue size, push to the queue will be blocking until the current queue size drops below the new threshold.- Parameters
size
: Output queue size.
-
bool
Push
(const std::vector<PipelineTensor> &input_tensors)¶ Pushes input tensors to be processed by the pipeline.
Note:
Caller is responsible for allocating memory for input tensors. By default, this class will free those tensors when they are consumed. Caller can set a custom allocator for input tensors if needed.
Pushing an empty vector
{}
is allowed, which signals the class that no more inputs will be added (the function will return false if inputs were pushed after this special push). This special push allows Pop()’s consumer to properly drain unconsumed output tensors. See above example for details.Caller will get blocked if current input queue size is greater than input queue size threshold. By default, input queue size threshold is unlimited, i.e., call to Push() is non-blocking.
- Return
True if successful; false otherwise.
- Parameters
input_tensors
: A vector of input tensors, each wrapped as a PipelineTensor. The order must match Interpreter::inputs() from the first model segment.
-
bool
Pop
(std::vector<PipelineTensor> *output_tensors)¶ Gets output tensors from the pipeline.
Note:
Caller is responsible for deallocating memory for output tensors after consuming the tensors. By default, the output tensors are allocated using default tensor allocator. Caller can set a custom allocator for output tensors if needed.
Caller will get blocked if there is no output tensors available and no empty push is received.
- Return
True when output is received, or false when special empty push is given to Push() and there is no more output tensors available.
- Parameters
output_tensors
: A pointer to a vector of PipelineTensor objects where outputs should be stored. Returned output tensors order matches Interpreter::outputs() of the last model segment.
-
std::vector<SegmentStats>
GetSegmentStats
() const¶ Returns performance stats for each segment.
-
-
struct
coral
::
PipelineTensor
¶ A tensor in the pipeline system.
This is a simplified version of
TfLiteTensor
.Public Members
-
TfLiteType
type
¶ The data type specification for data stored in
data
.This affects what member of
data
union should be used.
-
size_t
bytes
¶ The number of bytes required to store the data of this tensor.
That is:
(bytes of each element) * dims[0] * ... * dims[n-1]
. For example, if type is kTfLiteFloat32 anddims = {3, 2}
thenbytes = sizeof(float) * 3 * 2 = 4 * 3 * 2 = 24
.
-
TfLiteType
-
struct
coral
::
SegmentStats
¶ Performance statistics for one segment of model pipeline.
Is this content helpful?