Pipeline C++ API reference

The pipeline API allows you to run inferencing for a segmented model across multiple Edge TPUs.

For more information and a walkthrough of this API, read Pipeline a model with multiple Edge TPUs.

Source code and header files are at github.com/google-coral/edgetpu/tree/master/src/cpp/pipeline/.

Note: This API is in beta and may change.

Summary

Functions:

Classes/structs:

Functions

public std::unordered_set< std::string > GetInputTensorNames(consttflite::Interpreter& interpreter)

Returns all input tensor names for the given tflite::Interpreter.

public void FreeTensors(const std::vector<PipelineTensor> & tensors,Allocator* allocator)

Deallocates the memory for the given tensors.

Use this to free output tensors each time you process the results.

Parameters:

  • tensors A vector of PipelineTensor objects to release.

  • allocator The Allocator originally used to allocate the tensors.

public const TfLiteTensor * GetInputTensor(consttflite::Interpreter& interpreter, const char * name)

Returns the input tensor matching name in the given tflite::Interpreter.

Returns nullptr if such tensor does not exist.

class coral::Allocator

Memory allocator used by PipelinedModelRunner to allocate input and output tensors.

public Allocator() = default

public virtual ~Allocator() = default

public Allocator(constAllocator&) = delete

publicAllocator& operator=(constAllocator&) = delete

public void * alloc(size_t size)

Allocates size bytes of memory.

Parameters:

  • size The number of bytes to allocate.

Returns:

A pointer to the memory, or nullptr if allocation fails.

public void free(void * p, size_t size)

Deallocates memory at the given block.

Parameters:

  • p A pointer to the memory to deallocate.

  • size NOT USED by the default allocator.

class coral::PipelinedModelRunner

Runs inferencing for a segmented model, using a pipeline of Edge TPUs.

This class assumes each segment has a dedicated Edge TPU, which allows all segments to run in parallel and improves throughput.

For example, if you have a pool of requests to process:

auto model_segments_interpreters =
    ModelSegmentsInterpreters(model_segments_paths);
// Caller can set custom allocators for input and output tensors with
// `input_tensor_allocator` and `output_tensor_allocator` arguments.
auto runner = PipelinedModelRunner(model_segments_interpreters);
auto* input_tensor_allocator = runner.GetInputTensorAllocator();
auto* output_tensor_allocator = runner.GetOutputTensorAllocator();

const int total_num_requests = 1000;

auto request_producer = [&runner, &total_num_requests]() {
  for (int i = 0; i < total_num_requests; ++i) {
    // Caller is responsible for allocating input tensors.
    runner.Push(CreateInputTensors(input_tensor_allocator));
  }
};

auto result_consumer = [&runner, &total_num_requests]() {
  for (int i = 0; i < total_num_requests; ++i) {
    std::vector<Tensor> output_tensors;
    runner.Pop(&output_tensors);
    ConsumeOutputTensors(output_tensors);
    // Caller is responsible for deallocating output tensors.
    FreeTensors(output_tensor_allocator, output_tensors);
  }
};

auto producer_thread = std::thread(request_producer);
auto consumer_thread = std::thread(result_consumer);

Or, if you have a stream of requests to process:

auto model_segments_interpreters =
    ModelSegmentsInterpreters(model_segments_paths);
// Caller can set custom allocators for input and output tensors with
// `input_tensor_allocator` and `output_tensor_allocator` arguments.
auto runner = PipelinedModelRunner(model_segments_interpreters);
auto* input_tensor_allocator = runner.GetInputTensorAllocator();
auto* output_tensor_allocator = runner.GetOutputTensorAllocator();

auto request_producer = [&runner]() {
  while (true) {
    // Caller is responsible for allocating input tensors.
    runner.Push(CreateInputTensors(input_tensor_allocator));
    if (ShouldStop()) {
      // Pushing special inputs to signal no more inputs will be pushed.
      runner.Push({});
      break;
    }
  }
};

auto result_consumer = [&runner]() {
  std::vector<Tensor> output_tensors;
  while (runner.Pop(&output_tensors)) {
    ConsumeOutputTensors(output_tensors);
    // Caller is responsible for deallocating output tensors.
    FreeTensors(output_tensor_allocator, output_tensors);
  }
};

auto producer_thread = std::thread(request_producer);
auto consumer_thread = std::thread(result_consumer);

This class is thread-safe.

public explicit PipelinedModelRunner(const std::vector<tflite::Interpreter* > & model_segments_interpreters,Allocator* input_tensor_allocator,Allocator* output_tensor_allocator)

Initializes the PipelinedModelRunner with model segments.

Parameters:

  • model_segments_interpreters A vector of pointers to tflite::Interpreter objects, each representing a model segment and unique Edge TPU context. model_segments_interpreters[0] should be the first segment interpreter of the model, model_segments_interpreters[1] is the second segment, and so on.

  • input_tensor_allocator A custom Allocator for input tensors. By default (nullptr), it uses an allocator provided by this class.

  • output_tensor_allocator A custom Allocator for output tensors. By default (nullptr), it uses an allocator provided by this class.

Note:

  • input_tensor_allocator is only used to free the input tensors, as this class assumes that input tensors are allocated by caller.

  • output_tensor_allocator is only used to allocate output tensors, as this class assumes that output tensors are freed by caller after consuming them.

public ~PipelinedModelRunner()

public inlineAllocator* GetInputTensorAllocator() const

Returns the default input tensor allocator (or the allocator given to the constructor).

public inlineAllocator* GetOutputTensorAllocator() const

Returns the default output tensor allocator (or the allocator given to the constructor).

public bool Push(const std::vector<PipelineTensor> & input_tensors)

Pushes input tensors to be processed by the pipeline.

Parameters:

  • input_tensors A vector of input tensors, each wrapped as a PipelineTensor. The order must match Interpreter::inputs() from the first model segment.

Returns:

True if successful; false otherwise.

Note:

  • Caller is responsible for allocating memory for input tensors. By default, this class will free those tensors when they are consumed. Caller can set a custom allocator for input tensors if needed.

  • Pushing an empty vector {} is allowed, which signals the class that no more inputs will be added (the function will return false if inputs were pushed after this special push). This special push allows Pop()'s consumer to properly drain unconsumed output tensors. See above example for details.

public bool Pop(std::vector<PipelineTensor> * output_tensors)

Gets output tensors from the pipeline.

Parameters:

  • output_tensors A pointer to a vector of PipelineTensor objects where outputs should be stored. Returned output tensors order matches Interpreter::outputs() of the last model segment.

Returns:

True when output is received, or false when special empty push is given to Push() and there is no more output tensors available.

Note:

  • Caller is responsible for deallocating memory for output tensors after consuming the tensors. By default, the output tensors are allocated using default tensor allocator. Caller can set a custom allocator for output tensors if needed.

  • Caller will get blocked if there is no output tensors available and no empty push is received.

public std::vector<SegmentStats> GetSegmentStats() const

Returns performance stats for each segment.

struct coral::PipelineTensor

A tensor in the pipeline system.

This is a simplified version of TfLiteTensor.

public TfLiteType type

The data type specification for data stored in data.

This affects what member of data union should be used.

public TfLitePtrUnion data

A union of data pointers.

The appropriate type should be used for a typed tensor based on type.

public size_t bytes

The number of bytes required to store the data of this tensor.

That is: (bytes of each element) * dims[0] * ... * dims[n-1]. For example, if type is kTfLiteFloat32 and dims = {3, 2} then bytes = sizeof(float) * 3 * 2 = 4 * 3 * 2 = 24.

struct coral::SegmentStats

Performance statistics for one segment of model pipeline.

public int64_t total_time_ns

Total time spent traversing this segment so far (in nanoseconds).

public uint64_t num_inferences

Number of inferences processed so far.