cscg24-guacamole

CSCG 2024 Challenge 'Guacamole Mashup'
git clone https://git.sinitax.com/sinitax/cscg24-guacamole
Log | Files | Refs | sfeed.txt

WebSocket.js (27620B)


      1'use strict';
      2
      3/*!
      4 * ws: a node.js websocket client
      5 * Copyright(c) 2011 Einar Otto Stangvik <einaros@gmail.com>
      6 * MIT Licensed
      7 */
      8
      9var url = require('url')
     10  , util = require('util')
     11  , http = require('http')
     12  , https = require('https')
     13  , crypto = require('crypto')
     14  , stream = require('stream')
     15  , Ultron = require('ultron')
     16  , Options = require('options')
     17  , Sender = require('./Sender')
     18  , Receiver = require('./Receiver')
     19  , SenderHixie = require('./Sender.hixie')
     20  , ReceiverHixie = require('./Receiver.hixie')
     21  , Extensions = require('./Extensions')
     22  , PerMessageDeflate = require('./PerMessageDeflate')
     23  , EventEmitter = require('events').EventEmitter;
     24
     25/**
     26 * Constants
     27 */
     28
     29// Default protocol version
     30
     31var protocolVersion = 13;
     32
     33// Close timeout
     34
     35var closeTimeout = 30 * 1000; // Allow 30 seconds to terminate the connection cleanly
     36
     37/**
     38 * WebSocket implementation
     39 *
     40 * @constructor
     41 * @param {String} address Connection address.
     42 * @param {String|Array} protocols WebSocket protocols.
     43 * @param {Object} options Additional connection options.
     44 * @api public
     45 */
     46function WebSocket(address, protocols, options) {
     47  if (this instanceof WebSocket === false) {
     48    return new WebSocket(address, protocols, options);
     49  }
     50
     51  EventEmitter.call(this);
     52
     53  if (protocols && !Array.isArray(protocols) && 'object' === typeof protocols) {
     54    // accept the "options" Object as the 2nd argument
     55    options = protocols;
     56    protocols = null;
     57  }
     58
     59  if ('string' === typeof protocols) {
     60    protocols = [ protocols ];
     61  }
     62
     63  if (!Array.isArray(protocols)) {
     64    protocols = [];
     65  }
     66
     67  this._socket = null;
     68  this._ultron = null;
     69  this._closeReceived = false;
     70  this.bytesReceived = 0;
     71  this.readyState = null;
     72  this.supports = {};
     73  this.extensions = {};
     74  this._binaryType = 'nodebuffer';
     75
     76  if (Array.isArray(address)) {
     77    initAsServerClient.apply(this, address.concat(options));
     78  } else {
     79    initAsClient.apply(this, [address, protocols, options]);
     80  }
     81}
     82
     83/**
     84 * Inherits from EventEmitter.
     85 */
     86util.inherits(WebSocket, EventEmitter);
     87
     88/**
     89 * Ready States
     90 */
     91["CONNECTING", "OPEN", "CLOSING", "CLOSED"].forEach(function each(state, index) {
     92    WebSocket.prototype[state] = WebSocket[state] = index;
     93});
     94
     95/**
     96 * Gracefully closes the connection, after sending a description message to the server
     97 *
     98 * @param {Object} data to be sent to the server
     99 * @api public
    100 */
    101WebSocket.prototype.close = function close(code, data) {
    102  if (this.readyState === WebSocket.CLOSED) return;
    103
    104  if (this.readyState === WebSocket.CONNECTING) {
    105    this.readyState = WebSocket.CLOSED;
    106    return;
    107  }
    108
    109  if (this.readyState === WebSocket.CLOSING) {
    110    if (this._closeReceived && this._isServer) {
    111      this.terminate();
    112    }
    113    return;
    114  }
    115
    116  var self = this;
    117  try {
    118    this.readyState = WebSocket.CLOSING;
    119    this._closeCode = code;
    120    this._closeMessage = data;
    121    var mask = !this._isServer;
    122    this._sender.close(code, data, mask, function(err) {
    123      if (err) self.emit('error', err);
    124
    125      if (self._closeReceived && self._isServer) {
    126        self.terminate();
    127      } else {
    128        // ensure that the connection is cleaned up even when no response of closing handshake.
    129        clearTimeout(self._closeTimer);
    130        self._closeTimer = setTimeout(cleanupWebsocketResources.bind(self, true), closeTimeout);
    131      }
    132    });
    133  } catch (e) {
    134    this.emit('error', e);
    135  }
    136};
    137
    138/**
    139 * Pause the client stream
    140 *
    141 * @api public
    142 */
    143WebSocket.prototype.pause = function pauser() {
    144  if (this.readyState !== WebSocket.OPEN) throw new Error('not opened');
    145
    146  return this._socket.pause();
    147};
    148
    149/**
    150 * Sends a ping
    151 *
    152 * @param {Object} data to be sent to the server
    153 * @param {Object} Members - mask: boolean, binary: boolean
    154 * @param {boolean} dontFailWhenClosed indicates whether or not to throw if the connection isnt open
    155 * @api public
    156 */
    157WebSocket.prototype.ping = function ping(data, options, dontFailWhenClosed) {
    158  if (this.readyState !== WebSocket.OPEN) {
    159    if (dontFailWhenClosed === true) return;
    160    throw new Error('not opened');
    161  }
    162
    163  options = options || {};
    164
    165  if (typeof options.mask === 'undefined') options.mask = !this._isServer;
    166
    167  this._sender.ping(data, options);
    168};
    169
    170/**
    171 * Sends a pong
    172 *
    173 * @param {Object} data to be sent to the server
    174 * @param {Object} Members - mask: boolean, binary: boolean
    175 * @param {boolean} dontFailWhenClosed indicates whether or not to throw if the connection isnt open
    176 * @api public
    177 */
    178WebSocket.prototype.pong = function(data, options, dontFailWhenClosed) {
    179  if (this.readyState !== WebSocket.OPEN) {
    180    if (dontFailWhenClosed === true) return;
    181    throw new Error('not opened');
    182  }
    183
    184  options = options || {};
    185
    186  if (typeof options.mask === 'undefined') options.mask = !this._isServer;
    187
    188  this._sender.pong(data, options);
    189};
    190
    191/**
    192 * Resume the client stream
    193 *
    194 * @api public
    195 */
    196WebSocket.prototype.resume = function resume() {
    197  if (this.readyState !== WebSocket.OPEN) throw new Error('not opened');
    198
    199  return this._socket.resume();
    200};
    201
    202/**
    203 * Sends a piece of data
    204 *
    205 * @param {Object} data to be sent to the server
    206 * @param {Object} Members - mask: boolean, binary: boolean, compress: boolean
    207 * @param {function} Optional callback which is executed after the send completes
    208 * @api public
    209 */
    210
    211WebSocket.prototype.send = function send(data, options, cb) {
    212  if (typeof options === 'function') {
    213    cb = options;
    214    options = {};
    215  }
    216
    217  if (this.readyState !== WebSocket.OPEN) {
    218    if (typeof cb === 'function') cb(new Error('not opened'));
    219    else throw new Error('not opened');
    220    return;
    221  }
    222
    223  if (!data) data = '';
    224  if (this._queue) {
    225    var self = this;
    226    this._queue.push(function() { self.send(data, options, cb); });
    227    return;
    228  }
    229
    230  options = options || {};
    231  options.fin = true;
    232
    233  if (typeof options.binary === 'undefined') {
    234    options.binary = (data instanceof ArrayBuffer || data instanceof Buffer ||
    235      data instanceof Uint8Array ||
    236      data instanceof Uint16Array ||
    237      data instanceof Uint32Array ||
    238      data instanceof Int8Array ||
    239      data instanceof Int16Array ||
    240      data instanceof Int32Array ||
    241      data instanceof Float32Array ||
    242      data instanceof Float64Array);
    243  }
    244
    245  if (typeof options.mask === 'undefined') options.mask = !this._isServer;
    246  if (typeof options.compress === 'undefined') options.compress = true;
    247  if (!this.extensions[PerMessageDeflate.extensionName]) {
    248    options.compress = false;
    249  }
    250
    251  var readable = typeof stream.Readable === 'function'
    252    ? stream.Readable
    253    : stream.Stream;
    254
    255  if (data instanceof readable) {
    256    startQueue(this);
    257    var self = this;
    258
    259    sendStream(this, data, options, function send(error) {
    260      process.nextTick(function tock() {
    261        executeQueueSends(self);
    262      });
    263
    264      if (typeof cb === 'function') cb(error);
    265    });
    266  } else {
    267    this._sender.send(data, options, cb);
    268  }
    269};
    270
    271/**
    272 * Streams data through calls to a user supplied function
    273 *
    274 * @param {Object} Members - mask: boolean, binary: boolean, compress: boolean
    275 * @param {function} 'function (error, send)' which is executed on successive ticks of which send is 'function (data, final)'.
    276 * @api public
    277 */
    278WebSocket.prototype.stream = function stream(options, cb) {
    279  if (typeof options === 'function') {
    280    cb = options;
    281    options = {};
    282  }
    283
    284  var self = this;
    285
    286  if (typeof cb !== 'function') throw new Error('callback must be provided');
    287
    288  if (this.readyState !== WebSocket.OPEN) {
    289    if (typeof cb === 'function') cb(new Error('not opened'));
    290    else throw new Error('not opened');
    291    return;
    292  }
    293
    294  if (this._queue) {
    295    this._queue.push(function () { self.stream(options, cb); });
    296    return;
    297  }
    298
    299  options = options || {};
    300
    301  if (typeof options.mask === 'undefined') options.mask = !this._isServer;
    302  if (typeof options.compress === 'undefined') options.compress = true;
    303  if (!this.extensions[PerMessageDeflate.extensionName]) {
    304    options.compress = false;
    305  }
    306
    307  startQueue(this);
    308
    309  function send(data, final) {
    310    try {
    311      if (self.readyState !== WebSocket.OPEN) throw new Error('not opened');
    312      options.fin = final === true;
    313      self._sender.send(data, options);
    314      if (!final) process.nextTick(cb.bind(null, null, send));
    315      else executeQueueSends(self);
    316    } catch (e) {
    317      if (typeof cb === 'function') cb(e);
    318      else {
    319        delete self._queue;
    320        self.emit('error', e);
    321      }
    322    }
    323  }
    324
    325  process.nextTick(cb.bind(null, null, send));
    326};
    327
    328/**
    329 * Immediately shuts down the connection
    330 *
    331 * @api public
    332 */
    333WebSocket.prototype.terminate = function terminate() {
    334  if (this.readyState === WebSocket.CLOSED) return;
    335
    336  if (this._socket) {
    337    this.readyState = WebSocket.CLOSING;
    338
    339    // End the connection
    340    try { this._socket.end(); }
    341    catch (e) {
    342      // Socket error during end() call, so just destroy it right now
    343      cleanupWebsocketResources.call(this, true);
    344      return;
    345    }
    346
    347    // Add a timeout to ensure that the connection is completely
    348    // cleaned up within 30 seconds, even if the clean close procedure
    349    // fails for whatever reason
    350    // First cleanup any pre-existing timeout from an earlier "terminate" call,
    351    // if one exists.  Otherwise terminate calls in quick succession will leak timeouts
    352    // and hold the program open for `closeTimout` time.
    353    if (this._closeTimer) { clearTimeout(this._closeTimer); }
    354    this._closeTimer = setTimeout(cleanupWebsocketResources.bind(this, true), closeTimeout);
    355  } else if (this.readyState === WebSocket.CONNECTING) {
    356    cleanupWebsocketResources.call(this, true);
    357  }
    358};
    359
    360/**
    361 * Expose bufferedAmount
    362 *
    363 * @api public
    364 */
    365Object.defineProperty(WebSocket.prototype, 'bufferedAmount', {
    366  get: function get() {
    367    var amount = 0;
    368    if (this._socket) {
    369      amount = this._socket.bufferSize || 0;
    370    }
    371    return amount;
    372  }
    373});
    374
    375/**
    376 * Expose binaryType
    377 *
    378 * This deviates from the W3C interface since ws doesn't support the required
    379 * default "blob" type (instead we define a custom "nodebuffer" type).
    380 *
    381 * @see http://dev.w3.org/html5/websockets/#the-websocket-interface
    382 * @api public
    383 */
    384Object.defineProperty(WebSocket.prototype, 'binaryType', {
    385  get: function get() {
    386    return this._binaryType;
    387  },
    388  set: function set(type) {
    389    if (type === 'arraybuffer' || type === 'nodebuffer')
    390      this._binaryType = type;
    391    else
    392      throw new SyntaxError('unsupported binaryType: must be either "nodebuffer" or "arraybuffer"');
    393  }
    394});
    395
    396/**
    397 * Emulates the W3C Browser based WebSocket interface using function members.
    398 *
    399 * @see http://dev.w3.org/html5/websockets/#the-websocket-interface
    400 * @api public
    401 */
    402['open', 'error', 'close', 'message'].forEach(function(method) {
    403  Object.defineProperty(WebSocket.prototype, 'on' + method, {
    404    /**
    405     * Returns the current listener
    406     *
    407     * @returns {Mixed} the set function or undefined
    408     * @api public
    409     */
    410    get: function get() {
    411      var listener = this.listeners(method)[0];
    412      return listener ? (listener._listener ? listener._listener : listener) : undefined;
    413    },
    414
    415    /**
    416     * Start listening for events
    417     *
    418     * @param {Function} listener the listener
    419     * @returns {Mixed} the set function or undefined
    420     * @api public
    421     */
    422    set: function set(listener) {
    423      this.removeAllListeners(method);
    424      this.addEventListener(method, listener);
    425    }
    426  });
    427});
    428
    429/**
    430 * Emulates the W3C Browser based WebSocket interface using addEventListener.
    431 *
    432 * @see https://developer.mozilla.org/en/DOM/element.addEventListener
    433 * @see http://dev.w3.org/html5/websockets/#the-websocket-interface
    434 * @api public
    435 */
    436WebSocket.prototype.addEventListener = function(method, listener) {
    437  var target = this;
    438
    439  function onMessage (data, flags) {
    440    if (flags.binary && this.binaryType === 'arraybuffer')
    441        data = new Uint8Array(data).buffer;
    442    listener.call(target, new MessageEvent(data, !!flags.binary, target));
    443  }
    444
    445  function onClose (code, message) {
    446    listener.call(target, new CloseEvent(code, message, target));
    447  }
    448
    449  function onError (event) {
    450    event.type = 'error';
    451    event.target = target;
    452    listener.call(target, event);
    453  }
    454
    455  function onOpen () {
    456    listener.call(target, new OpenEvent(target));
    457  }
    458
    459  if (typeof listener === 'function') {
    460    if (method === 'message') {
    461      // store a reference so we can return the original function from the
    462      // addEventListener hook
    463      onMessage._listener = listener;
    464      this.on(method, onMessage);
    465    } else if (method === 'close') {
    466      // store a reference so we can return the original function from the
    467      // addEventListener hook
    468      onClose._listener = listener;
    469      this.on(method, onClose);
    470    } else if (method === 'error') {
    471      // store a reference so we can return the original function from the
    472      // addEventListener hook
    473      onError._listener = listener;
    474      this.on(method, onError);
    475    } else if (method === 'open') {
    476      // store a reference so we can return the original function from the
    477      // addEventListener hook
    478      onOpen._listener = listener;
    479      this.on(method, onOpen);
    480    } else {
    481      this.on(method, listener);
    482    }
    483  }
    484};
    485
    486module.exports = WebSocket;
    487module.exports.buildHostHeader = buildHostHeader
    488
    489/**
    490 * W3C MessageEvent
    491 *
    492 * @see http://www.w3.org/TR/html5/comms.html
    493 * @constructor
    494 * @api private
    495 */
    496function MessageEvent(dataArg, isBinary, target) {
    497  this.type = 'message';
    498  this.data = dataArg;
    499  this.target = target;
    500  this.binary = isBinary; // non-standard.
    501}
    502
    503/**
    504 * W3C CloseEvent
    505 *
    506 * @see http://www.w3.org/TR/html5/comms.html
    507 * @constructor
    508 * @api private
    509 */
    510function CloseEvent(code, reason, target) {
    511  this.type = 'close';
    512  this.wasClean = (typeof code === 'undefined' || code === 1000);
    513  this.code = code;
    514  this.reason = reason;
    515  this.target = target;
    516}
    517
    518/**
    519 * W3C OpenEvent
    520 *
    521 * @see http://www.w3.org/TR/html5/comms.html
    522 * @constructor
    523 * @api private
    524 */
    525function OpenEvent(target) {
    526  this.type = 'open';
    527  this.target = target;
    528}
    529
    530// Append port number to Host header, only if specified in the url
    531// and non-default
    532function buildHostHeader(isSecure, hostname, port) {
    533  var headerHost = hostname;
    534  if (hostname) {
    535    if ((isSecure && (port != 443)) || (!isSecure && (port != 80))){
    536      headerHost = headerHost + ':' + port;
    537    }
    538  }
    539  return headerHost;
    540}
    541
    542/**
    543 * Entirely private apis,
    544 * which may or may not be bound to a sepcific WebSocket instance.
    545 */
    546function initAsServerClient(req, socket, upgradeHead, options) {
    547  options = new Options({
    548    protocolVersion: protocolVersion,
    549    protocol: null,
    550    extensions: {},
    551    maxPayload: 0
    552  }).merge(options);
    553
    554  // expose state properties
    555  this.protocol = options.value.protocol;
    556  this.protocolVersion = options.value.protocolVersion;
    557  this.extensions = options.value.extensions;
    558  this.supports.binary = (this.protocolVersion !== 'hixie-76');
    559  this.upgradeReq = req;
    560  this.readyState = WebSocket.CONNECTING;
    561  this._isServer = true;
    562  this.maxPayload = options.value.maxPayload;
    563  // establish connection
    564  if (options.value.protocolVersion === 'hixie-76') {
    565    establishConnection.call(this, ReceiverHixie, SenderHixie, socket, upgradeHead);
    566  } else {
    567    establishConnection.call(this, Receiver, Sender, socket, upgradeHead);
    568  }
    569}
    570
    571function initAsClient(address, protocols, options) {
    572  options = new Options({
    573    origin: null,
    574    protocolVersion: protocolVersion,
    575    host: null,
    576    headers: null,
    577    protocol: protocols.join(','),
    578    agent: null,
    579
    580    // ssl-related options
    581    pfx: null,
    582    key: null,
    583    passphrase: null,
    584    cert: null,
    585    ca: null,
    586    ciphers: null,
    587    rejectUnauthorized: null,
    588    perMessageDeflate: true,
    589    localAddress: null
    590  }).merge(options);
    591
    592  if (options.value.protocolVersion !== 8 && options.value.protocolVersion !== 13) {
    593    throw new Error('unsupported protocol version');
    594  }
    595
    596  // verify URL and establish http class
    597  var serverUrl = url.parse(address);
    598  var isUnixSocket = serverUrl.protocol === 'ws+unix:';
    599  if (!serverUrl.host && !isUnixSocket) throw new Error('invalid url');
    600  var isSecure = serverUrl.protocol === 'wss:' || serverUrl.protocol === 'https:';
    601  var httpObj = isSecure ? https : http;
    602  var port = serverUrl.port || (isSecure ? 443 : 80);
    603  var auth = serverUrl.auth;
    604
    605  // prepare extensions
    606  var extensionsOffer = {};
    607  var perMessageDeflate;
    608  if (options.value.perMessageDeflate) {
    609    perMessageDeflate = new PerMessageDeflate(typeof options.value.perMessageDeflate !== true ? options.value.perMessageDeflate : {}, false);
    610    extensionsOffer[PerMessageDeflate.extensionName] = perMessageDeflate.offer();
    611  }
    612
    613  // expose state properties
    614  this._isServer = false;
    615  this.url = address;
    616  this.protocolVersion = options.value.protocolVersion;
    617  this.supports.binary = (this.protocolVersion !== 'hixie-76');
    618
    619  // begin handshake
    620  var key = new Buffer(options.value.protocolVersion + '-' + Date.now()).toString('base64');
    621  var shasum = crypto.createHash('sha1');
    622  shasum.update(key + '258EAFA5-E914-47DA-95CA-C5AB0DC85B11');
    623  var expectedServerKey = shasum.digest('base64');
    624
    625  var agent = options.value.agent;
    626
    627  var headerHost = buildHostHeader(isSecure, serverUrl.hostname, port)
    628
    629  var requestOptions = {
    630    port: port,
    631    host: serverUrl.hostname,
    632    headers: {
    633      'Connection': 'Upgrade',
    634      'Upgrade': 'websocket',
    635      'Host': headerHost,
    636      'Sec-WebSocket-Version': options.value.protocolVersion,
    637      'Sec-WebSocket-Key': key
    638    }
    639  };
    640
    641  // If we have basic auth.
    642  if (auth) {
    643    requestOptions.headers.Authorization = 'Basic ' + new Buffer(auth).toString('base64');
    644  }
    645
    646  if (options.value.protocol) {
    647    requestOptions.headers['Sec-WebSocket-Protocol'] = options.value.protocol;
    648  }
    649
    650  if (options.value.host) {
    651    requestOptions.headers.Host = options.value.host;
    652  }
    653
    654  if (options.value.headers) {
    655    for (var header in options.value.headers) {
    656       if (options.value.headers.hasOwnProperty(header)) {
    657        requestOptions.headers[header] = options.value.headers[header];
    658       }
    659    }
    660  }
    661
    662  if (Object.keys(extensionsOffer).length) {
    663    requestOptions.headers['Sec-WebSocket-Extensions'] = Extensions.format(extensionsOffer);
    664  }
    665
    666  if (options.isDefinedAndNonNull('pfx')
    667   || options.isDefinedAndNonNull('key')
    668   || options.isDefinedAndNonNull('passphrase')
    669   || options.isDefinedAndNonNull('cert')
    670   || options.isDefinedAndNonNull('ca')
    671   || options.isDefinedAndNonNull('ciphers')
    672   || options.isDefinedAndNonNull('rejectUnauthorized')) {
    673
    674    if (options.isDefinedAndNonNull('pfx')) requestOptions.pfx = options.value.pfx;
    675    if (options.isDefinedAndNonNull('key')) requestOptions.key = options.value.key;
    676    if (options.isDefinedAndNonNull('passphrase')) requestOptions.passphrase = options.value.passphrase;
    677    if (options.isDefinedAndNonNull('cert')) requestOptions.cert = options.value.cert;
    678    if (options.isDefinedAndNonNull('ca')) requestOptions.ca = options.value.ca;
    679    if (options.isDefinedAndNonNull('ciphers')) requestOptions.ciphers = options.value.ciphers;
    680    if (options.isDefinedAndNonNull('rejectUnauthorized')) requestOptions.rejectUnauthorized = options.value.rejectUnauthorized;
    681
    682    if (!agent) {
    683        // global agent ignores client side certificates
    684        agent = new httpObj.Agent(requestOptions);
    685    }
    686  }
    687
    688  requestOptions.path = serverUrl.path || '/';
    689
    690  if (agent) {
    691    requestOptions.agent = agent;
    692  }
    693
    694  if (isUnixSocket) {
    695    requestOptions.socketPath = serverUrl.pathname;
    696  }
    697
    698  if (options.value.localAddress) {
    699    requestOptions.localAddress = options.value.localAddress;
    700  }
    701
    702  if (options.value.origin) {
    703    if (options.value.protocolVersion < 13) requestOptions.headers['Sec-WebSocket-Origin'] = options.value.origin;
    704    else requestOptions.headers.Origin = options.value.origin;
    705  }
    706
    707  var self = this;
    708  var req = httpObj.request(requestOptions);
    709
    710  req.on('error', function onerror(error) {
    711    self.emit('error', error);
    712    cleanupWebsocketResources.call(self, error);
    713  });
    714
    715  req.once('response', function response(res) {
    716    var error;
    717
    718    if (!self.emit('unexpected-response', req, res)) {
    719      error = new Error('unexpected server response (' + res.statusCode + ')');
    720      req.abort();
    721      self.emit('error', error);
    722    }
    723
    724    cleanupWebsocketResources.call(self, error);
    725  });
    726
    727  req.once('upgrade', function upgrade(res, socket, upgradeHead) {
    728    if (self.readyState === WebSocket.CLOSED) {
    729      // client closed before server accepted connection
    730      self.emit('close');
    731      self.removeAllListeners();
    732      socket.end();
    733      return;
    734    }
    735
    736    var serverKey = res.headers['sec-websocket-accept'];
    737    if (typeof serverKey === 'undefined' || serverKey !== expectedServerKey) {
    738      self.emit('error', 'invalid server key');
    739      self.removeAllListeners();
    740      socket.end();
    741      return;
    742    }
    743
    744    var serverProt = res.headers['sec-websocket-protocol'];
    745    var protList = (options.value.protocol || "").split(/, */);
    746    var protError = null;
    747
    748    if (!options.value.protocol && serverProt) {
    749      protError = 'server sent a subprotocol even though none requested';
    750    } else if (options.value.protocol && !serverProt) {
    751      protError = 'server sent no subprotocol even though requested';
    752    } else if (serverProt && protList.indexOf(serverProt) === -1) {
    753      protError = 'server responded with an invalid protocol';
    754    }
    755
    756    if (protError) {
    757      self.emit('error', protError);
    758      self.removeAllListeners();
    759      socket.end();
    760      return;
    761    } else if (serverProt) {
    762      self.protocol = serverProt;
    763    }
    764
    765    var serverExtensions = Extensions.parse(res.headers['sec-websocket-extensions']);
    766    if (perMessageDeflate && serverExtensions[PerMessageDeflate.extensionName]) {
    767      try {
    768        perMessageDeflate.accept(serverExtensions[PerMessageDeflate.extensionName]);
    769      } catch (err) {
    770        self.emit('error', 'invalid extension parameter');
    771        self.removeAllListeners();
    772        socket.end();
    773        return;
    774      }
    775      self.extensions[PerMessageDeflate.extensionName] = perMessageDeflate;
    776    }
    777
    778    establishConnection.call(self, Receiver, Sender, socket, upgradeHead);
    779
    780    // perform cleanup on http resources
    781    req.removeAllListeners();
    782    req = null;
    783    agent = null;
    784  });
    785
    786  req.end();
    787  this.readyState = WebSocket.CONNECTING;
    788}
    789
    790function establishConnection(ReceiverClass, SenderClass, socket, upgradeHead) {
    791  var ultron = this._ultron = new Ultron(socket)
    792    , called = false
    793    , self = this;
    794
    795  socket.setTimeout(0);
    796  socket.setNoDelay(true);
    797
    798  this._receiver = new ReceiverClass(this.extensions,this.maxPayload);
    799  this._socket = socket;
    800
    801  // socket cleanup handlers
    802  ultron.on('end', cleanupWebsocketResources.bind(this));
    803  ultron.on('close', cleanupWebsocketResources.bind(this));
    804  ultron.on('error', cleanupWebsocketResources.bind(this));
    805
    806  // ensure that the upgradeHead is added to the receiver
    807  function firstHandler(data) {
    808    if (called || self.readyState === WebSocket.CLOSED) return;
    809
    810    called = true;
    811    socket.removeListener('data', firstHandler);
    812    ultron.on('data', realHandler);
    813
    814    if (upgradeHead && upgradeHead.length > 0) {
    815      realHandler(upgradeHead);
    816      upgradeHead = null;
    817    }
    818
    819    if (data) realHandler(data);
    820  }
    821
    822  // subsequent packets are pushed straight to the receiver
    823  function realHandler(data) {
    824    self.bytesReceived += data.length;
    825    self._receiver.add(data);
    826  }
    827
    828  ultron.on('data', firstHandler);
    829
    830  // if data was passed along with the http upgrade,
    831  // this will schedule a push of that on to the receiver.
    832  // this has to be done on next tick, since the caller
    833  // hasn't had a chance to set event handlers on this client
    834  // object yet.
    835  process.nextTick(firstHandler);
    836
    837  // receiver event handlers
    838  self._receiver.ontext = function ontext(data, flags) {
    839    flags = flags || {};
    840
    841    self.emit('message', data, flags);
    842  };
    843
    844  self._receiver.onbinary = function onbinary(data, flags) {
    845    flags = flags || {};
    846
    847    flags.binary = true;
    848    self.emit('message', data, flags);
    849  };
    850
    851  self._receiver.onping = function onping(data, flags) {
    852    flags = flags || {};
    853
    854    self.pong(data, {
    855      mask: !self._isServer,
    856      binary: flags.binary === true
    857    }, true);
    858
    859    self.emit('ping', data, flags);
    860  };
    861
    862  self._receiver.onpong = function onpong(data, flags) {
    863    self.emit('pong', data, flags || {});
    864  };
    865
    866  self._receiver.onclose = function onclose(code, data, flags) {
    867    flags = flags || {};
    868
    869    self._closeReceived = true;
    870    self.close(code, data);
    871  };
    872
    873  self._receiver.onerror = function onerror(reason, errorCode) {
    874    // close the connection when the receiver reports a HyBi error code
    875    self.close(typeof errorCode !== 'undefined' ? errorCode : 1002, '');
    876    self.emit('error', (reason instanceof Error) ? reason : (new Error(reason)));
    877  };
    878
    879  // finalize the client
    880  this._sender = new SenderClass(socket, this.extensions);
    881  this._sender.on('error', function onerror(error) {
    882    self.close(1002, '');
    883    self.emit('error', error);
    884  });
    885
    886  this.readyState = WebSocket.OPEN;
    887  this.emit('open');
    888}
    889
    890function startQueue(instance) {
    891  instance._queue = instance._queue || [];
    892}
    893
    894function executeQueueSends(instance) {
    895  var queue = instance._queue;
    896  if (typeof queue === 'undefined') return;
    897
    898  delete instance._queue;
    899  for (var i = 0, l = queue.length; i < l; ++i) {
    900    queue[i]();
    901  }
    902}
    903
    904function sendStream(instance, stream, options, cb) {
    905  stream.on('data', function incoming(data) {
    906    if (instance.readyState !== WebSocket.OPEN) {
    907      if (typeof cb === 'function') cb(new Error('not opened'));
    908      else {
    909        delete instance._queue;
    910        instance.emit('error', new Error('not opened'));
    911      }
    912      return;
    913    }
    914
    915    options.fin = false;
    916    instance._sender.send(data, options);
    917  });
    918
    919  stream.on('end', function end() {
    920    if (instance.readyState !== WebSocket.OPEN) {
    921      if (typeof cb === 'function') cb(new Error('not opened'));
    922      else {
    923        delete instance._queue;
    924        instance.emit('error', new Error('not opened'));
    925      }
    926      return;
    927    }
    928
    929    options.fin = true;
    930    instance._sender.send(null, options);
    931
    932    if (typeof cb === 'function') cb(null);
    933  });
    934}
    935
    936function cleanupWebsocketResources(error) {
    937  if (this.readyState === WebSocket.CLOSED) return;
    938
    939  this.readyState = WebSocket.CLOSED;
    940
    941  clearTimeout(this._closeTimer);
    942  this._closeTimer = null;
    943
    944  // If the connection was closed abnormally (with an error), or if
    945  // the close control frame was not received then the close code
    946  // must default to 1006.
    947  if (error || !this._closeReceived) {
    948    this._closeCode = 1006;
    949  }
    950  this.emit('close', this._closeCode || 1000, this._closeMessage || '');
    951
    952  if (this._socket) {
    953    if (this._ultron) this._ultron.destroy();
    954    this._socket.on('error', function onerror() {
    955      try { this.destroy(); }
    956      catch (e) {}
    957    });
    958
    959    try {
    960      if (!error) this._socket.end();
    961      else this._socket.destroy();
    962    } catch (e) { /* Ignore termination errors */ }
    963
    964    this._socket = null;
    965    this._ultron = null;
    966  }
    967
    968  if (this._sender) {
    969    this._sender.removeAllListeners();
    970    this._sender = null;
    971  }
    972
    973  if (this._receiver) {
    974    this._receiver.cleanup();
    975    this._receiver = null;
    976  }
    977
    978  if (this.extensions[PerMessageDeflate.extensionName]) {
    979    this.extensions[PerMessageDeflate.extensionName].cleanup();
    980  }
    981
    982  this.extensions = null;
    983
    984  this.removeAllListeners();
    985  this.on('error', function onerror() {}); // catch all errors after this
    986  delete this._queue;
    987}