Monday, April 27, 2015

Concurrent video stream processing in Java

    Some time ago I was asked how to deal with video stream processing. The problem was quite simple. We have sequential input of video frames. Each frame must be processed, e.g. some filter must be applied. The trick is to utilize multiple cores to process these video frames. The output should contain processed video frames in correct order. I proposed one solution.

The problem can be described by means of this picture:

The video frame can be represented by:
class Frame {
 private int position;
 private byte[] data;
 
 public Frame(int position, byte[] data) {
  this.position = position;
  this.data = data;
 }
 
 public int getPosition() {
  return position;
 }
 
 public byte[] getData() {
  return data;
 }
}
The actual processor implementation:
public class FrameProcessor {
  private int threadsNumber;
    
  public FrameProcessor(int threadsNumber) {
    this.threadsNumber = threadsNumber;
  }
    
  public List<Frame> getProcessedFrames(List<Frame> framesToProcess) throws Exception {
    Function<Frame, Callable<Frame>> transformationFunction = frame -> {
      return new Callable<Frame>() {
        @Override
        public Frame call() throws Exception {
          System.out.println("Processing frame " + frame.getPosition() + " from Thread " 
                                                + Thread.currentThread().getName());
          Thread.sleep(5); // to simulate CPU intensive computations 
          return frame;
        }
      };    
    };
        
    ExecutorService exec = Executors.newFixedThreadPool(threadsNumber);
    List<Future<Frame>> processing = exec.invokeAll(framesToProcess
                                     .stream().map(transformationFunction).collect(Collectors.toList()));
    exec.shutdown();
        
    return processing.stream().map(future -> { 
      try {
        return future.get();
      } catch (Exception e) {
        throw new RuntimeException(e);
      } 
      }).sorted((frame1, frame2) -> {return frame1.getPosition() - frame2.getPosition()})
        .collect(Collectors.toList());
    }
}
and usage example:
public static void main(String[] args) throws Exception {
        FrameProcessor processor = new FrameProcessor(4);
        List<Frame> framesToProcess = new ArrayList<>();
        for (int i = 0; i < 100; i++) {
            framesToProcess.add(new Frame(i, new byte[1024]));
        }

        long start = System.currentTimeMillis();
        for (Frame processedFrame : processor.getProcessedFrames(framesToProcess)) {
            System.out.print("Frame number " + processedFrame.getPosition() + " processed.\n");
        }
        System.out.println(System.currentTimeMillis() - start);
}
This is just an initial concept of the engine. On the higher level some buffering and flushing can be added in order to avoid infamous OutOfMemoryError. The problem can be generalized as well:
1. Input -> sequential data which is ordered,
2. Concurrent processing of data chunks,
3. Output -> processed sequential data with correct order.

No comments :

Post a Comment