
/**
 * Module dependencies.
 */

var assert = require('assert');
var debug = require('debug')('stream-parser');

/**
 * Module exports.
 */

module.exports = Parser;

/**
 * Parser states.
 */

var INIT        = -1;
var BUFFERING   = 0;
var SKIPPING    = 1;
var PASSTHROUGH = 2;

/**
 * The `Parser` stream mixin works with either `Writable` or `Transform` stream
 * instances/subclasses. Provides a convenient generic "parsing" API:
 *
 *   _bytes(n, cb) - buffers "n" bytes and then calls "cb" with the "chunk"
 *   _skipBytes(n, cb) - skips "n" bytes and then calls "cb" when done
 *
 * If you extend a `Transform` stream, then the `_passthrough()` function is also
 * added:
 *
 *   _passthrough(n, cb) - passes through "n" bytes untouched and then calls "cb"
 *
 * @param {Stream} stream Transform or Writable stream instance to extend
 * @api public
 */

function Parser (stream) {
  var isTransform = stream && 'function' == typeof stream._transform;
  var isWritable = stream && 'function' == typeof stream._write;

  if (!isTransform && !isWritable) throw new Error('must pass a Writable or Transform stream in');
  debug('extending Parser into stream');

  // Transform streams and Writable streams get `_bytes()` and `_skipBytes()`
  stream._bytes = _bytes;
  stream._skipBytes = _skipBytes;

  // only Transform streams get the `_passthrough()` function
  if (isTransform) stream._passthrough = _passthrough;

  // take control of the streams2 callback functions for this stream
  if (isTransform) {
    stream._transform = transform;
  } else {
    stream._write = write;
  }
}

function init (stream) {
  debug('initializing parser stream');

  // number of bytes left to parser for the next "chunk"
  stream._parserBytesLeft = 0;

  // array of Buffer instances that make up the next "chunk"
  stream._parserBuffers = [];

  // number of bytes parsed so far for the next "chunk"
  stream._parserBuffered = 0;

  // flag that keeps track of if what the parser should do with bytes received
  stream._parserState = INIT;

  // the callback for the next "chunk"
  stream._parserCallback = null;

  // XXX: backwards compat with the old Transform API... remove at some point..
  if ('function' == typeof stream.push) {
    stream._parserOutput = stream.push.bind(stream);
  }

  stream._parserInit = true;
}

/**
 * Buffers `n` bytes and then invokes `fn` once that amount has been collected.
 *
 * @param {Number} n the number of bytes to buffer
 * @param {Function} fn callback function to invoke when `n` bytes are buffered
 * @api public
 */

function _bytes (n, fn) {
  assert(!this._parserCallback, 'there is already a "callback" set!');
  assert(isFinite(n) && n > 0, 'can only buffer a finite number of bytes > 0, got "' + n + '"');
  if (!this._parserInit) init(this);
  debug('buffering %o bytes', n);
  this._parserBytesLeft = n;
  this._parserCallback = fn;
  this._parserState = BUFFERING;
}

/**
 * Skips over the next `n` bytes, then invokes `fn` once that amount has
 * been discarded.
 *
 * @param {Number} n the number of bytes to discard
 * @param {Function} fn callback function to invoke when `n` bytes have been skipped
 * @api public
 */

function _skipBytes (n, fn) {
  assert(!this._parserCallback, 'there is already a "callback" set!');
  assert(n > 0, 'can only skip > 0 bytes, got "' + n + '"');
  if (!this._parserInit) init(this);
  debug('skipping %o bytes', n);
  this._parserBytesLeft = n;
  this._parserCallback = fn;
  this._parserState = SKIPPING;
}

/**
 * Passes through `n` bytes to the readable side of this stream untouched,
 * then invokes `fn` once that amount has been passed through.
 *
 * @param {Number} n the number of bytes to pass through
 * @param {Function} fn callback function to invoke when `n` bytes have passed through
 * @api public
 */

function _passthrough (n, fn) {
  assert(!this._parserCallback, 'There is already a "callback" set!');
  assert(n > 0, 'can only pass through > 0 bytes, got "' + n + '"');
  if (!this._parserInit) init(this);
  debug('passing through %o bytes', n);
  this._parserBytesLeft = n;
  this._parserCallback = fn;
  this._parserState = PASSTHROUGH;
}

/**
 * The `_write()` callback function implementation.
 *
 * @api private
 */

function write (chunk, encoding, fn) {
  if (!this._parserInit) init(this);
  debug('write(%o bytes)', chunk.length);

  // XXX: old Writable stream API compat... remove at some point...
  if ('function' == typeof encoding) fn = encoding;

  data(this, chunk, null, fn);
}

/**
 * The `_transform()` callback function implementation.
 *
 * @api private
 */


function transform (chunk, output, fn) {
  if (!this._parserInit) init(this);
  debug('transform(%o bytes)', chunk.length);

  // XXX: old Transform stream API compat... remove at some point...
  if ('function' != typeof output) {
    output = this._parserOutput;
  }

  data(this, chunk, output, fn);
}

/**
 * The internal buffering/passthrough logic...
 *
 * This `_data` function get's "trampolined" to prevent stack overflows for tight
 * loops. This technique requires us to return a "thunk" function for any
 * synchronous action. Async stuff breaks the trampoline, but that's ok since it's
 * working with a new stack at that point anyway.
 *
 * @api private
 */

function _data (stream, chunk, output, fn) {
  if (stream._parserBytesLeft <= 0) {
    return fn(new Error('got data but not currently parsing anything'));
  }

  if (chunk.length <= stream._parserBytesLeft) {
    // small buffer fits within the "_parserBytesLeft" window
    return function () {
      return process(stream, chunk, output, fn);
    };
  } else {
    // large buffer needs to be sliced on "_parserBytesLeft" and processed
    return function () {
      var b = chunk.slice(0, stream._parserBytesLeft);
      return process(stream, b, output, function (err) {
        if (err) return fn(err);
        if (chunk.length > b.length) {
          return function () {
            return _data(stream, chunk.slice(b.length), output, fn);
          };
        }
      });
    };
  }
}

/**
 * The internal `process` function gets called by the `data` function when
 * something "interesting" happens. This function takes care of buffering the
 * bytes when buffering, passing through the bytes when doing that, and invoking
 * the user callback when the number of bytes has been reached.
 *
 * @api private
 */

function process (stream, chunk, output, fn) {
  stream._parserBytesLeft -= chunk.length;
  debug('%o bytes left for stream piece', stream._parserBytesLeft);

  if (stream._parserState === BUFFERING) {
    // buffer
    stream._parserBuffers.push(chunk);
    stream._parserBuffered += chunk.length;
  } else if (stream._parserState === PASSTHROUGH) {
    // passthrough
    output(chunk);
  }
  // don't need to do anything for the SKIPPING case

  if (0 === stream._parserBytesLeft) {
    // done with stream "piece", invoke the callback
    var cb = stream._parserCallback;
    if (cb && stream._parserState === BUFFERING && stream._parserBuffers.length > 1) {
      chunk = Buffer.concat(stream._parserBuffers, stream._parserBuffered);
    }
    if (stream._parserState !== BUFFERING) {
      chunk = null;
    }
    stream._parserCallback = null;
    stream._parserBuffered = 0;
    stream._parserState = INIT;
    stream._parserBuffers.splice(0); // empty

    if (cb) {
      var args = [];
      if (chunk) {
        // buffered
        args.push(chunk);
      } else {
        // passthrough
      }
      if (output) {
        // on a Transform stream, has "output" function
        args.push(output);
      }
      var async = cb.length > args.length;
      if (async) {
        args.push(trampoline(fn));
      }
      // invoke cb
      var rtn = cb.apply(stream, args);
      if (!async || fn === rtn) return fn;
    }
  } else {
    // need more bytes
    return fn;
  }
}

var data = trampoline(_data);

/**
 * Generic thunk-based "trampoline" helper function.
 *
 * @param {Function} input function
 * @return {Function} "trampolined" function
 * @api private
 */

function trampoline (fn) {
  return function () {
    var result = fn.apply(this, arguments);

    while ('function' == typeof result) {
      result = result();
    }

    return result;
  };
}
