Running Async Predictions


#1

I’d like to run a number of predictions at the same time using MXNet to get a good combination of GPU usage and low latency for a use case I’m considering. That is to say I don’t want to buffer and batch my input, I’d like to submit it as soon as it’s available, and have multiple predictions running at once.

Multi-threaded MXNet inference isn’t supported, so my plan is to use an async pattern on my main thread. In principle it seems NDArray is setup to do exactly this, however I’m running into some gaps in the API that appear to force me to block my main thread to wait for results.

Does anyone have advice for how I could do this?

I’ve made a change locally as a work around, but what I’d like to do is something like this (not the can_read on the NDArray is not exposed in MXNet currently):

from time import sleep

import datetime
import mxnet as mx

last_frame = datetime.datetime.now()


def get_frame():
    global last_frame
    this_frame = datetime.datetime.now()
    if (this_frame - last_frame).microseconds > 16600:  # 60FPS
        return mx.nd.random.uniform(0, 1, (3, 640, 480))
    else:
        return None


def main():
    handler = FrameClassifier(8)
    while True:
        frame = get_frame()
        if frame is not None:
            handler.enqueue_frame(frame)
        results = handler.get_finished_results()
        print(results)
        sleep(.0001)


class FrameClassifier:

    def __init__(self, active_prediction_limit):
        self.active_prediction_limit = active_prediction_limit
        self.active_predictions = []

    def get_finished_results(self):
        finished_predictions = []

        # We need some way to process finished predictions.
        for prediction in self.active_predictions:
            if prediction.can_read():  # Not yet exposed.
                finished_predictions.append(prediction)
        [self.active_predictions.remove(p) for p in finished_predictions]
        return finished_predictions

    def enqueue_frame(self, frame: mx.ndarray):
        if len(self.active_predictions) > self.active_prediction_limit:
            raise Exception('Too many requests')
        executor = self.get_executor(frame)
        prediction = executor.forward(is_train=False)
        self.active_predictions.append(prediction[0])

    @staticmethod
    def get_executor(frame: mx.ndarray):
        # Placeholder network.
        x = mx.sym.Variable('x')
        y = mx.sym.FullyConnected(x, num_hidden=1024)
        for i in range(0, 4):
            y = x + 1
        executor = y.bind(mx.cpu(), {'x': frame})
        return executor


if __name__ == "__main__":
    main()

#2

Doesn’t sounds like an easy task as currently the mxnet dependency engine doesn’t expose such interface.

Also it’s not clear what this API means - does it return True if these no pending operations scheduled for this NDArray? What if as soon as it returns True, some other thread pushes an operator to this NDArray?


#3

Good question Haibin. So in this design the MXNet consumer is in control of their delegation thread. There would be a single thread they would submit work to, and read results from. Because they’re in control of the thread, they could guarantee that no other thread would submit work in between their check and read. The point of being able to check in a non-blocking fashion is that if they choose to they could submit more work. If they’re forced to do a blocking read then they lose that ability.