C++ pipelining APIs

The pipeline API allows you to run inferencing for a segmented model across multiple Edge TPUs.

For more information and a walkthrough of this API, read Pipeline a model with multiple Edge TPUs.

Source code and header files are at https://github.com/google-coral/edgetpu/tree/master/src/cpp/pipeline/.

coral/pipeline/utils.h

namespace coral

Functions

std::unordered_set<std::string> GetInputTensorNames(const tflite::Interpreter &interpreter)

Returns all input tensor names for the given tflite::Interpreter.

void FreeTensors(const std::vector<PipelineTensor> &tensors, Allocator *allocator)

Deallocates the memory for the given tensors.

Use this to free output tensors each time you process the results.

Parameters
  • tensors: A vector of PipelineTensor objects to release.

  • allocator: The Allocator originally used to allocate the tensors.

const TfLiteTensor *GetInputTensor(const tflite::Interpreter &interpreter, const char *name)

Returns the input tensor matching name in the given tflite::Interpreter.

Returns nullptr if such tensor does not exist.

class coral::Allocator

Public Functions

Buffer *Alloc(size_t size_bytes) = 0

Allocates size_bytes bytes of memory.

Return

A pointer to valid Buffer object.

Parameters
  • size_bytes: The number of bytes to allocate.

void Free(Buffer *buffer) = 0

Deallocates memory at the given block.

class coral::PipelinedModelRunner

Runs inferencing for a segmented model, using a pipeline of Edge TPUs.

This class assumes each segment has a dedicated Edge TPU, which allows all segments to run in parallel and improves throughput.

For example, if you have a pool of requests to process:

auto model_segments_interpreters =
    ModelSegmentsInterpreters(model_segments_paths);
// Caller can set custom allocators for input and output tensors with
// `input_tensor_allocator` and `output_tensor_allocator` arguments.
auto runner = PipelinedModelRunner(model_segments_interpreters);
auto* input_tensor_allocator = runner.GetInputTensorAllocator();
auto* output_tensor_allocator = runner.GetOutputTensorAllocator();

const int total_num_requests = 1000;

auto request_producer = [&runner, &total_num_requests]() {
  for (int i = 0; i < total_num_requests; ++i) {
    // Caller is responsible for allocating input tensors.
    runner.Push(CreateInputTensors(input_tensor_allocator));
  }
};

auto result_consumer = [&runner, &total_num_requests]() {
  for (int i = 0; i < total_num_requests; ++i) {
    std::vector<Tensor> output_tensors;
    runner.Pop(&output_tensors);
    ConsumeOutputTensors(output_tensors);
    // Caller is responsible for deallocating output tensors.
    FreeTensors(output_tensor_allocator, output_tensors);
  }
};

auto producer_thread = std::thread(request_producer);
auto consumer_thread = std::thread(result_consumer);

Or, if you have a stream of requests to process:

auto model_segments_interpreters =
    ModelSegmentsInterpreters(model_segments_paths);
// Caller can set custom allocators for input and output tensors with
// `input_tensor_allocator` and `output_tensor_allocator` arguments.
auto runner = PipelinedModelRunner(model_segments_interpreters);
auto* input_tensor_allocator = runner.GetInputTensorAllocator();
auto* output_tensor_allocator = runner.GetOutputTensorAllocator();

auto request_producer = [&runner]() {
  while (true) {
    // Caller is responsible for allocating input tensors.
    runner.Push(CreateInputTensors(input_tensor_allocator));
    if (ShouldStop()) {
      // Pushing special inputs to signal no more inputs will be pushed.
      runner.Push({});
      break;
    }
  }
};

auto result_consumer = [&runner]() {
  std::vector<Tensor> output_tensors;
  while (runner.Pop(&output_tensors)) {
    ConsumeOutputTensors(output_tensors);
    // Caller is responsible for deallocating output tensors.
    FreeTensors(output_tensor_allocator, output_tensors);
  }
};

auto producer_thread = std::thread(request_producer);
auto consumer_thread = std::thread(result_consumer);

This class is thread-safe.

Public Functions

PipelinedModelRunner(const std::vector<tflite::Interpreter*> &model_segments_interpreters, Allocator *input_tensor_allocator = nullptr, Allocator *output_tensor_allocator = nullptr)

Initializes the PipelinedModelRunner with model segments.

Note:

  • input_tensor_allocator is only used to free the input tensors, as this class assumes that input tensors are allocated by caller.

  • output_tensor_allocator is only used to allocate output tensors, as this class assumes that output tensors are freed by caller after consuming them.

Parameters
  • model_segments_interpreters: A vector of pointers to tflite::Interpreter objects, each representing a model segment and unique Edge TPU context. model_segments_interpreters[0] should be the first segment interpreter of the model, model_segments_interpreters[1] is the second segment, and so on.

  • input_tensor_allocator: A custom Allocator for input tensors. By default (nullptr), it uses an allocator provided by this class.

  • output_tensor_allocator: A custom Allocator for output tensors. By default (nullptr), it uses an allocator provided by this class.

Allocator *GetInputTensorAllocator() const

Returns the default input tensor allocator (or the allocator given to the constructor).

Allocator *GetOutputTensorAllocator() const

Returns the default output tensor allocator (or the allocator given to the constructor).

void SetInputQueueSize(size_t size)

Sets input queue size.

By default, input queue size is unlimited.

Note: It is OK to change queue size threshold when

PipelinedModelRunner is active. If new threshold is smaller than current queue size, push to the queue will be blocking until the current queue size drops below the new threshold.
Parameters
  • size: Input queue size.

void SetOutputQueueSize(size_t size)

Sets output queue size.

By default, output queue size is unlimited.

Note: It is OK to change queue size threshold when

PipelinedModelRunner is active. If new threshold is smaller than current queue size, push to the queue will be blocking until the current queue size drops below the new threshold.
Parameters
  • size: Output queue size.

bool Push(const std::vector<PipelineTensor> &input_tensors)

Pushes input tensors to be processed by the pipeline.

Note:

  • Caller is responsible for allocating memory for input tensors. By default, this class will free those tensors when they are consumed. Caller can set a custom allocator for input tensors if needed.

  • Pushing an empty vector {} is allowed, which signals the class that no more inputs will be added (the function will return false if inputs were pushed after this special push). This special push allows Pop()’s consumer to properly drain unconsumed output tensors. See above example for details.

  • Caller will get blocked if current input queue size is greater than input queue size threshold. By default, input queue size threshold is unlimited, i.e., call to Push() is non-blocking.

Return

True if successful; false otherwise.

Parameters
  • input_tensors: A vector of input tensors, each wrapped as a PipelineTensor. The order must match Interpreter::inputs() from the first model segment.

bool Pop(std::vector<PipelineTensor> *output_tensors)

Gets output tensors from the pipeline.

Note:

  • Caller is responsible for deallocating memory for output tensors after consuming the tensors. By default, the output tensors are allocated using default tensor allocator. Caller can set a custom allocator for output tensors if needed.

  • Caller will get blocked if there is no output tensors available and no empty push is received.

Return

True when output is received, or false when special empty push is given to Push() and there is no more output tensors available.

Parameters
  • output_tensors: A pointer to a vector of PipelineTensor objects where outputs should be stored. Returned output tensors order matches Interpreter::outputs() of the last model segment.

std::vector<SegmentStats> GetSegmentStats() const

Returns performance stats for each segment.

struct coral::PipelineTensor

A tensor in the pipeline system.

This is a simplified version of TfLiteTensor.

Public Members

TfLiteType type

The data type specification for data stored in data.

This affects what member of data union should be used.

Buffer *buffer

Underlying memory buffer for tensor. Allocated by Allocator.

size_t bytes

The number of bytes required to store the data of this tensor.

That is: (bytes of each element) * dims[0] * ... * dims[n-1]. For example, if type is kTfLiteFloat32 and dims = {3, 2} then bytes = sizeof(float) * 3 * 2 = 4 * 3 * 2 = 24.

struct coral::SegmentStats

Performance statistics for one segment of model pipeline.

Public Members

int64_t total_time_ns = 0

Total time spent traversing this segment so far (in nanoseconds).

uint64_t num_inferences = 0

Number of inferences processed so far.