WebSocket.js (27620B)
1'use strict'; 2 3/*! 4 * ws: a node.js websocket client 5 * Copyright(c) 2011 Einar Otto Stangvik <einaros@gmail.com> 6 * MIT Licensed 7 */ 8 9var url = require('url') 10 , util = require('util') 11 , http = require('http') 12 , https = require('https') 13 , crypto = require('crypto') 14 , stream = require('stream') 15 , Ultron = require('ultron') 16 , Options = require('options') 17 , Sender = require('./Sender') 18 , Receiver = require('./Receiver') 19 , SenderHixie = require('./Sender.hixie') 20 , ReceiverHixie = require('./Receiver.hixie') 21 , Extensions = require('./Extensions') 22 , PerMessageDeflate = require('./PerMessageDeflate') 23 , EventEmitter = require('events').EventEmitter; 24 25/** 26 * Constants 27 */ 28 29// Default protocol version 30 31var protocolVersion = 13; 32 33// Close timeout 34 35var closeTimeout = 30 * 1000; // Allow 30 seconds to terminate the connection cleanly 36 37/** 38 * WebSocket implementation 39 * 40 * @constructor 41 * @param {String} address Connection address. 42 * @param {String|Array} protocols WebSocket protocols. 43 * @param {Object} options Additional connection options. 44 * @api public 45 */ 46function WebSocket(address, protocols, options) { 47 if (this instanceof WebSocket === false) { 48 return new WebSocket(address, protocols, options); 49 } 50 51 EventEmitter.call(this); 52 53 if (protocols && !Array.isArray(protocols) && 'object' === typeof protocols) { 54 // accept the "options" Object as the 2nd argument 55 options = protocols; 56 protocols = null; 57 } 58 59 if ('string' === typeof protocols) { 60 protocols = [ protocols ]; 61 } 62 63 if (!Array.isArray(protocols)) { 64 protocols = []; 65 } 66 67 this._socket = null; 68 this._ultron = null; 69 this._closeReceived = false; 70 this.bytesReceived = 0; 71 this.readyState = null; 72 this.supports = {}; 73 this.extensions = {}; 74 this._binaryType = 'nodebuffer'; 75 76 if (Array.isArray(address)) { 77 initAsServerClient.apply(this, address.concat(options)); 78 } else { 79 initAsClient.apply(this, [address, protocols, options]); 80 } 81} 82 83/** 84 * Inherits from EventEmitter. 85 */ 86util.inherits(WebSocket, EventEmitter); 87 88/** 89 * Ready States 90 */ 91["CONNECTING", "OPEN", "CLOSING", "CLOSED"].forEach(function each(state, index) { 92 WebSocket.prototype[state] = WebSocket[state] = index; 93}); 94 95/** 96 * Gracefully closes the connection, after sending a description message to the server 97 * 98 * @param {Object} data to be sent to the server 99 * @api public 100 */ 101WebSocket.prototype.close = function close(code, data) { 102 if (this.readyState === WebSocket.CLOSED) return; 103 104 if (this.readyState === WebSocket.CONNECTING) { 105 this.readyState = WebSocket.CLOSED; 106 return; 107 } 108 109 if (this.readyState === WebSocket.CLOSING) { 110 if (this._closeReceived && this._isServer) { 111 this.terminate(); 112 } 113 return; 114 } 115 116 var self = this; 117 try { 118 this.readyState = WebSocket.CLOSING; 119 this._closeCode = code; 120 this._closeMessage = data; 121 var mask = !this._isServer; 122 this._sender.close(code, data, mask, function(err) { 123 if (err) self.emit('error', err); 124 125 if (self._closeReceived && self._isServer) { 126 self.terminate(); 127 } else { 128 // ensure that the connection is cleaned up even when no response of closing handshake. 129 clearTimeout(self._closeTimer); 130 self._closeTimer = setTimeout(cleanupWebsocketResources.bind(self, true), closeTimeout); 131 } 132 }); 133 } catch (e) { 134 this.emit('error', e); 135 } 136}; 137 138/** 139 * Pause the client stream 140 * 141 * @api public 142 */ 143WebSocket.prototype.pause = function pauser() { 144 if (this.readyState !== WebSocket.OPEN) throw new Error('not opened'); 145 146 return this._socket.pause(); 147}; 148 149/** 150 * Sends a ping 151 * 152 * @param {Object} data to be sent to the server 153 * @param {Object} Members - mask: boolean, binary: boolean 154 * @param {boolean} dontFailWhenClosed indicates whether or not to throw if the connection isnt open 155 * @api public 156 */ 157WebSocket.prototype.ping = function ping(data, options, dontFailWhenClosed) { 158 if (this.readyState !== WebSocket.OPEN) { 159 if (dontFailWhenClosed === true) return; 160 throw new Error('not opened'); 161 } 162 163 options = options || {}; 164 165 if (typeof options.mask === 'undefined') options.mask = !this._isServer; 166 167 this._sender.ping(data, options); 168}; 169 170/** 171 * Sends a pong 172 * 173 * @param {Object} data to be sent to the server 174 * @param {Object} Members - mask: boolean, binary: boolean 175 * @param {boolean} dontFailWhenClosed indicates whether or not to throw if the connection isnt open 176 * @api public 177 */ 178WebSocket.prototype.pong = function(data, options, dontFailWhenClosed) { 179 if (this.readyState !== WebSocket.OPEN) { 180 if (dontFailWhenClosed === true) return; 181 throw new Error('not opened'); 182 } 183 184 options = options || {}; 185 186 if (typeof options.mask === 'undefined') options.mask = !this._isServer; 187 188 this._sender.pong(data, options); 189}; 190 191/** 192 * Resume the client stream 193 * 194 * @api public 195 */ 196WebSocket.prototype.resume = function resume() { 197 if (this.readyState !== WebSocket.OPEN) throw new Error('not opened'); 198 199 return this._socket.resume(); 200}; 201 202/** 203 * Sends a piece of data 204 * 205 * @param {Object} data to be sent to the server 206 * @param {Object} Members - mask: boolean, binary: boolean, compress: boolean 207 * @param {function} Optional callback which is executed after the send completes 208 * @api public 209 */ 210 211WebSocket.prototype.send = function send(data, options, cb) { 212 if (typeof options === 'function') { 213 cb = options; 214 options = {}; 215 } 216 217 if (this.readyState !== WebSocket.OPEN) { 218 if (typeof cb === 'function') cb(new Error('not opened')); 219 else throw new Error('not opened'); 220 return; 221 } 222 223 if (!data) data = ''; 224 if (this._queue) { 225 var self = this; 226 this._queue.push(function() { self.send(data, options, cb); }); 227 return; 228 } 229 230 options = options || {}; 231 options.fin = true; 232 233 if (typeof options.binary === 'undefined') { 234 options.binary = (data instanceof ArrayBuffer || data instanceof Buffer || 235 data instanceof Uint8Array || 236 data instanceof Uint16Array || 237 data instanceof Uint32Array || 238 data instanceof Int8Array || 239 data instanceof Int16Array || 240 data instanceof Int32Array || 241 data instanceof Float32Array || 242 data instanceof Float64Array); 243 } 244 245 if (typeof options.mask === 'undefined') options.mask = !this._isServer; 246 if (typeof options.compress === 'undefined') options.compress = true; 247 if (!this.extensions[PerMessageDeflate.extensionName]) { 248 options.compress = false; 249 } 250 251 var readable = typeof stream.Readable === 'function' 252 ? stream.Readable 253 : stream.Stream; 254 255 if (data instanceof readable) { 256 startQueue(this); 257 var self = this; 258 259 sendStream(this, data, options, function send(error) { 260 process.nextTick(function tock() { 261 executeQueueSends(self); 262 }); 263 264 if (typeof cb === 'function') cb(error); 265 }); 266 } else { 267 this._sender.send(data, options, cb); 268 } 269}; 270 271/** 272 * Streams data through calls to a user supplied function 273 * 274 * @param {Object} Members - mask: boolean, binary: boolean, compress: boolean 275 * @param {function} 'function (error, send)' which is executed on successive ticks of which send is 'function (data, final)'. 276 * @api public 277 */ 278WebSocket.prototype.stream = function stream(options, cb) { 279 if (typeof options === 'function') { 280 cb = options; 281 options = {}; 282 } 283 284 var self = this; 285 286 if (typeof cb !== 'function') throw new Error('callback must be provided'); 287 288 if (this.readyState !== WebSocket.OPEN) { 289 if (typeof cb === 'function') cb(new Error('not opened')); 290 else throw new Error('not opened'); 291 return; 292 } 293 294 if (this._queue) { 295 this._queue.push(function () { self.stream(options, cb); }); 296 return; 297 } 298 299 options = options || {}; 300 301 if (typeof options.mask === 'undefined') options.mask = !this._isServer; 302 if (typeof options.compress === 'undefined') options.compress = true; 303 if (!this.extensions[PerMessageDeflate.extensionName]) { 304 options.compress = false; 305 } 306 307 startQueue(this); 308 309 function send(data, final) { 310 try { 311 if (self.readyState !== WebSocket.OPEN) throw new Error('not opened'); 312 options.fin = final === true; 313 self._sender.send(data, options); 314 if (!final) process.nextTick(cb.bind(null, null, send)); 315 else executeQueueSends(self); 316 } catch (e) { 317 if (typeof cb === 'function') cb(e); 318 else { 319 delete self._queue; 320 self.emit('error', e); 321 } 322 } 323 } 324 325 process.nextTick(cb.bind(null, null, send)); 326}; 327 328/** 329 * Immediately shuts down the connection 330 * 331 * @api public 332 */ 333WebSocket.prototype.terminate = function terminate() { 334 if (this.readyState === WebSocket.CLOSED) return; 335 336 if (this._socket) { 337 this.readyState = WebSocket.CLOSING; 338 339 // End the connection 340 try { this._socket.end(); } 341 catch (e) { 342 // Socket error during end() call, so just destroy it right now 343 cleanupWebsocketResources.call(this, true); 344 return; 345 } 346 347 // Add a timeout to ensure that the connection is completely 348 // cleaned up within 30 seconds, even if the clean close procedure 349 // fails for whatever reason 350 // First cleanup any pre-existing timeout from an earlier "terminate" call, 351 // if one exists. Otherwise terminate calls in quick succession will leak timeouts 352 // and hold the program open for `closeTimout` time. 353 if (this._closeTimer) { clearTimeout(this._closeTimer); } 354 this._closeTimer = setTimeout(cleanupWebsocketResources.bind(this, true), closeTimeout); 355 } else if (this.readyState === WebSocket.CONNECTING) { 356 cleanupWebsocketResources.call(this, true); 357 } 358}; 359 360/** 361 * Expose bufferedAmount 362 * 363 * @api public 364 */ 365Object.defineProperty(WebSocket.prototype, 'bufferedAmount', { 366 get: function get() { 367 var amount = 0; 368 if (this._socket) { 369 amount = this._socket.bufferSize || 0; 370 } 371 return amount; 372 } 373}); 374 375/** 376 * Expose binaryType 377 * 378 * This deviates from the W3C interface since ws doesn't support the required 379 * default "blob" type (instead we define a custom "nodebuffer" type). 380 * 381 * @see http://dev.w3.org/html5/websockets/#the-websocket-interface 382 * @api public 383 */ 384Object.defineProperty(WebSocket.prototype, 'binaryType', { 385 get: function get() { 386 return this._binaryType; 387 }, 388 set: function set(type) { 389 if (type === 'arraybuffer' || type === 'nodebuffer') 390 this._binaryType = type; 391 else 392 throw new SyntaxError('unsupported binaryType: must be either "nodebuffer" or "arraybuffer"'); 393 } 394}); 395 396/** 397 * Emulates the W3C Browser based WebSocket interface using function members. 398 * 399 * @see http://dev.w3.org/html5/websockets/#the-websocket-interface 400 * @api public 401 */ 402['open', 'error', 'close', 'message'].forEach(function(method) { 403 Object.defineProperty(WebSocket.prototype, 'on' + method, { 404 /** 405 * Returns the current listener 406 * 407 * @returns {Mixed} the set function or undefined 408 * @api public 409 */ 410 get: function get() { 411 var listener = this.listeners(method)[0]; 412 return listener ? (listener._listener ? listener._listener : listener) : undefined; 413 }, 414 415 /** 416 * Start listening for events 417 * 418 * @param {Function} listener the listener 419 * @returns {Mixed} the set function or undefined 420 * @api public 421 */ 422 set: function set(listener) { 423 this.removeAllListeners(method); 424 this.addEventListener(method, listener); 425 } 426 }); 427}); 428 429/** 430 * Emulates the W3C Browser based WebSocket interface using addEventListener. 431 * 432 * @see https://developer.mozilla.org/en/DOM/element.addEventListener 433 * @see http://dev.w3.org/html5/websockets/#the-websocket-interface 434 * @api public 435 */ 436WebSocket.prototype.addEventListener = function(method, listener) { 437 var target = this; 438 439 function onMessage (data, flags) { 440 if (flags.binary && this.binaryType === 'arraybuffer') 441 data = new Uint8Array(data).buffer; 442 listener.call(target, new MessageEvent(data, !!flags.binary, target)); 443 } 444 445 function onClose (code, message) { 446 listener.call(target, new CloseEvent(code, message, target)); 447 } 448 449 function onError (event) { 450 event.type = 'error'; 451 event.target = target; 452 listener.call(target, event); 453 } 454 455 function onOpen () { 456 listener.call(target, new OpenEvent(target)); 457 } 458 459 if (typeof listener === 'function') { 460 if (method === 'message') { 461 // store a reference so we can return the original function from the 462 // addEventListener hook 463 onMessage._listener = listener; 464 this.on(method, onMessage); 465 } else if (method === 'close') { 466 // store a reference so we can return the original function from the 467 // addEventListener hook 468 onClose._listener = listener; 469 this.on(method, onClose); 470 } else if (method === 'error') { 471 // store a reference so we can return the original function from the 472 // addEventListener hook 473 onError._listener = listener; 474 this.on(method, onError); 475 } else if (method === 'open') { 476 // store a reference so we can return the original function from the 477 // addEventListener hook 478 onOpen._listener = listener; 479 this.on(method, onOpen); 480 } else { 481 this.on(method, listener); 482 } 483 } 484}; 485 486module.exports = WebSocket; 487module.exports.buildHostHeader = buildHostHeader 488 489/** 490 * W3C MessageEvent 491 * 492 * @see http://www.w3.org/TR/html5/comms.html 493 * @constructor 494 * @api private 495 */ 496function MessageEvent(dataArg, isBinary, target) { 497 this.type = 'message'; 498 this.data = dataArg; 499 this.target = target; 500 this.binary = isBinary; // non-standard. 501} 502 503/** 504 * W3C CloseEvent 505 * 506 * @see http://www.w3.org/TR/html5/comms.html 507 * @constructor 508 * @api private 509 */ 510function CloseEvent(code, reason, target) { 511 this.type = 'close'; 512 this.wasClean = (typeof code === 'undefined' || code === 1000); 513 this.code = code; 514 this.reason = reason; 515 this.target = target; 516} 517 518/** 519 * W3C OpenEvent 520 * 521 * @see http://www.w3.org/TR/html5/comms.html 522 * @constructor 523 * @api private 524 */ 525function OpenEvent(target) { 526 this.type = 'open'; 527 this.target = target; 528} 529 530// Append port number to Host header, only if specified in the url 531// and non-default 532function buildHostHeader(isSecure, hostname, port) { 533 var headerHost = hostname; 534 if (hostname) { 535 if ((isSecure && (port != 443)) || (!isSecure && (port != 80))){ 536 headerHost = headerHost + ':' + port; 537 } 538 } 539 return headerHost; 540} 541 542/** 543 * Entirely private apis, 544 * which may or may not be bound to a sepcific WebSocket instance. 545 */ 546function initAsServerClient(req, socket, upgradeHead, options) { 547 options = new Options({ 548 protocolVersion: protocolVersion, 549 protocol: null, 550 extensions: {}, 551 maxPayload: 0 552 }).merge(options); 553 554 // expose state properties 555 this.protocol = options.value.protocol; 556 this.protocolVersion = options.value.protocolVersion; 557 this.extensions = options.value.extensions; 558 this.supports.binary = (this.protocolVersion !== 'hixie-76'); 559 this.upgradeReq = req; 560 this.readyState = WebSocket.CONNECTING; 561 this._isServer = true; 562 this.maxPayload = options.value.maxPayload; 563 // establish connection 564 if (options.value.protocolVersion === 'hixie-76') { 565 establishConnection.call(this, ReceiverHixie, SenderHixie, socket, upgradeHead); 566 } else { 567 establishConnection.call(this, Receiver, Sender, socket, upgradeHead); 568 } 569} 570 571function initAsClient(address, protocols, options) { 572 options = new Options({ 573 origin: null, 574 protocolVersion: protocolVersion, 575 host: null, 576 headers: null, 577 protocol: protocols.join(','), 578 agent: null, 579 580 // ssl-related options 581 pfx: null, 582 key: null, 583 passphrase: null, 584 cert: null, 585 ca: null, 586 ciphers: null, 587 rejectUnauthorized: null, 588 perMessageDeflate: true, 589 localAddress: null 590 }).merge(options); 591 592 if (options.value.protocolVersion !== 8 && options.value.protocolVersion !== 13) { 593 throw new Error('unsupported protocol version'); 594 } 595 596 // verify URL and establish http class 597 var serverUrl = url.parse(address); 598 var isUnixSocket = serverUrl.protocol === 'ws+unix:'; 599 if (!serverUrl.host && !isUnixSocket) throw new Error('invalid url'); 600 var isSecure = serverUrl.protocol === 'wss:' || serverUrl.protocol === 'https:'; 601 var httpObj = isSecure ? https : http; 602 var port = serverUrl.port || (isSecure ? 443 : 80); 603 var auth = serverUrl.auth; 604 605 // prepare extensions 606 var extensionsOffer = {}; 607 var perMessageDeflate; 608 if (options.value.perMessageDeflate) { 609 perMessageDeflate = new PerMessageDeflate(typeof options.value.perMessageDeflate !== true ? options.value.perMessageDeflate : {}, false); 610 extensionsOffer[PerMessageDeflate.extensionName] = perMessageDeflate.offer(); 611 } 612 613 // expose state properties 614 this._isServer = false; 615 this.url = address; 616 this.protocolVersion = options.value.protocolVersion; 617 this.supports.binary = (this.protocolVersion !== 'hixie-76'); 618 619 // begin handshake 620 var key = new Buffer(options.value.protocolVersion + '-' + Date.now()).toString('base64'); 621 var shasum = crypto.createHash('sha1'); 622 shasum.update(key + '258EAFA5-E914-47DA-95CA-C5AB0DC85B11'); 623 var expectedServerKey = shasum.digest('base64'); 624 625 var agent = options.value.agent; 626 627 var headerHost = buildHostHeader(isSecure, serverUrl.hostname, port) 628 629 var requestOptions = { 630 port: port, 631 host: serverUrl.hostname, 632 headers: { 633 'Connection': 'Upgrade', 634 'Upgrade': 'websocket', 635 'Host': headerHost, 636 'Sec-WebSocket-Version': options.value.protocolVersion, 637 'Sec-WebSocket-Key': key 638 } 639 }; 640 641 // If we have basic auth. 642 if (auth) { 643 requestOptions.headers.Authorization = 'Basic ' + new Buffer(auth).toString('base64'); 644 } 645 646 if (options.value.protocol) { 647 requestOptions.headers['Sec-WebSocket-Protocol'] = options.value.protocol; 648 } 649 650 if (options.value.host) { 651 requestOptions.headers.Host = options.value.host; 652 } 653 654 if (options.value.headers) { 655 for (var header in options.value.headers) { 656 if (options.value.headers.hasOwnProperty(header)) { 657 requestOptions.headers[header] = options.value.headers[header]; 658 } 659 } 660 } 661 662 if (Object.keys(extensionsOffer).length) { 663 requestOptions.headers['Sec-WebSocket-Extensions'] = Extensions.format(extensionsOffer); 664 } 665 666 if (options.isDefinedAndNonNull('pfx') 667 || options.isDefinedAndNonNull('key') 668 || options.isDefinedAndNonNull('passphrase') 669 || options.isDefinedAndNonNull('cert') 670 || options.isDefinedAndNonNull('ca') 671 || options.isDefinedAndNonNull('ciphers') 672 || options.isDefinedAndNonNull('rejectUnauthorized')) { 673 674 if (options.isDefinedAndNonNull('pfx')) requestOptions.pfx = options.value.pfx; 675 if (options.isDefinedAndNonNull('key')) requestOptions.key = options.value.key; 676 if (options.isDefinedAndNonNull('passphrase')) requestOptions.passphrase = options.value.passphrase; 677 if (options.isDefinedAndNonNull('cert')) requestOptions.cert = options.value.cert; 678 if (options.isDefinedAndNonNull('ca')) requestOptions.ca = options.value.ca; 679 if (options.isDefinedAndNonNull('ciphers')) requestOptions.ciphers = options.value.ciphers; 680 if (options.isDefinedAndNonNull('rejectUnauthorized')) requestOptions.rejectUnauthorized = options.value.rejectUnauthorized; 681 682 if (!agent) { 683 // global agent ignores client side certificates 684 agent = new httpObj.Agent(requestOptions); 685 } 686 } 687 688 requestOptions.path = serverUrl.path || '/'; 689 690 if (agent) { 691 requestOptions.agent = agent; 692 } 693 694 if (isUnixSocket) { 695 requestOptions.socketPath = serverUrl.pathname; 696 } 697 698 if (options.value.localAddress) { 699 requestOptions.localAddress = options.value.localAddress; 700 } 701 702 if (options.value.origin) { 703 if (options.value.protocolVersion < 13) requestOptions.headers['Sec-WebSocket-Origin'] = options.value.origin; 704 else requestOptions.headers.Origin = options.value.origin; 705 } 706 707 var self = this; 708 var req = httpObj.request(requestOptions); 709 710 req.on('error', function onerror(error) { 711 self.emit('error', error); 712 cleanupWebsocketResources.call(self, error); 713 }); 714 715 req.once('response', function response(res) { 716 var error; 717 718 if (!self.emit('unexpected-response', req, res)) { 719 error = new Error('unexpected server response (' + res.statusCode + ')'); 720 req.abort(); 721 self.emit('error', error); 722 } 723 724 cleanupWebsocketResources.call(self, error); 725 }); 726 727 req.once('upgrade', function upgrade(res, socket, upgradeHead) { 728 if (self.readyState === WebSocket.CLOSED) { 729 // client closed before server accepted connection 730 self.emit('close'); 731 self.removeAllListeners(); 732 socket.end(); 733 return; 734 } 735 736 var serverKey = res.headers['sec-websocket-accept']; 737 if (typeof serverKey === 'undefined' || serverKey !== expectedServerKey) { 738 self.emit('error', 'invalid server key'); 739 self.removeAllListeners(); 740 socket.end(); 741 return; 742 } 743 744 var serverProt = res.headers['sec-websocket-protocol']; 745 var protList = (options.value.protocol || "").split(/, */); 746 var protError = null; 747 748 if (!options.value.protocol && serverProt) { 749 protError = 'server sent a subprotocol even though none requested'; 750 } else if (options.value.protocol && !serverProt) { 751 protError = 'server sent no subprotocol even though requested'; 752 } else if (serverProt && protList.indexOf(serverProt) === -1) { 753 protError = 'server responded with an invalid protocol'; 754 } 755 756 if (protError) { 757 self.emit('error', protError); 758 self.removeAllListeners(); 759 socket.end(); 760 return; 761 } else if (serverProt) { 762 self.protocol = serverProt; 763 } 764 765 var serverExtensions = Extensions.parse(res.headers['sec-websocket-extensions']); 766 if (perMessageDeflate && serverExtensions[PerMessageDeflate.extensionName]) { 767 try { 768 perMessageDeflate.accept(serverExtensions[PerMessageDeflate.extensionName]); 769 } catch (err) { 770 self.emit('error', 'invalid extension parameter'); 771 self.removeAllListeners(); 772 socket.end(); 773 return; 774 } 775 self.extensions[PerMessageDeflate.extensionName] = perMessageDeflate; 776 } 777 778 establishConnection.call(self, Receiver, Sender, socket, upgradeHead); 779 780 // perform cleanup on http resources 781 req.removeAllListeners(); 782 req = null; 783 agent = null; 784 }); 785 786 req.end(); 787 this.readyState = WebSocket.CONNECTING; 788} 789 790function establishConnection(ReceiverClass, SenderClass, socket, upgradeHead) { 791 var ultron = this._ultron = new Ultron(socket) 792 , called = false 793 , self = this; 794 795 socket.setTimeout(0); 796 socket.setNoDelay(true); 797 798 this._receiver = new ReceiverClass(this.extensions,this.maxPayload); 799 this._socket = socket; 800 801 // socket cleanup handlers 802 ultron.on('end', cleanupWebsocketResources.bind(this)); 803 ultron.on('close', cleanupWebsocketResources.bind(this)); 804 ultron.on('error', cleanupWebsocketResources.bind(this)); 805 806 // ensure that the upgradeHead is added to the receiver 807 function firstHandler(data) { 808 if (called || self.readyState === WebSocket.CLOSED) return; 809 810 called = true; 811 socket.removeListener('data', firstHandler); 812 ultron.on('data', realHandler); 813 814 if (upgradeHead && upgradeHead.length > 0) { 815 realHandler(upgradeHead); 816 upgradeHead = null; 817 } 818 819 if (data) realHandler(data); 820 } 821 822 // subsequent packets are pushed straight to the receiver 823 function realHandler(data) { 824 self.bytesReceived += data.length; 825 self._receiver.add(data); 826 } 827 828 ultron.on('data', firstHandler); 829 830 // if data was passed along with the http upgrade, 831 // this will schedule a push of that on to the receiver. 832 // this has to be done on next tick, since the caller 833 // hasn't had a chance to set event handlers on this client 834 // object yet. 835 process.nextTick(firstHandler); 836 837 // receiver event handlers 838 self._receiver.ontext = function ontext(data, flags) { 839 flags = flags || {}; 840 841 self.emit('message', data, flags); 842 }; 843 844 self._receiver.onbinary = function onbinary(data, flags) { 845 flags = flags || {}; 846 847 flags.binary = true; 848 self.emit('message', data, flags); 849 }; 850 851 self._receiver.onping = function onping(data, flags) { 852 flags = flags || {}; 853 854 self.pong(data, { 855 mask: !self._isServer, 856 binary: flags.binary === true 857 }, true); 858 859 self.emit('ping', data, flags); 860 }; 861 862 self._receiver.onpong = function onpong(data, flags) { 863 self.emit('pong', data, flags || {}); 864 }; 865 866 self._receiver.onclose = function onclose(code, data, flags) { 867 flags = flags || {}; 868 869 self._closeReceived = true; 870 self.close(code, data); 871 }; 872 873 self._receiver.onerror = function onerror(reason, errorCode) { 874 // close the connection when the receiver reports a HyBi error code 875 self.close(typeof errorCode !== 'undefined' ? errorCode : 1002, ''); 876 self.emit('error', (reason instanceof Error) ? reason : (new Error(reason))); 877 }; 878 879 // finalize the client 880 this._sender = new SenderClass(socket, this.extensions); 881 this._sender.on('error', function onerror(error) { 882 self.close(1002, ''); 883 self.emit('error', error); 884 }); 885 886 this.readyState = WebSocket.OPEN; 887 this.emit('open'); 888} 889 890function startQueue(instance) { 891 instance._queue = instance._queue || []; 892} 893 894function executeQueueSends(instance) { 895 var queue = instance._queue; 896 if (typeof queue === 'undefined') return; 897 898 delete instance._queue; 899 for (var i = 0, l = queue.length; i < l; ++i) { 900 queue[i](); 901 } 902} 903 904function sendStream(instance, stream, options, cb) { 905 stream.on('data', function incoming(data) { 906 if (instance.readyState !== WebSocket.OPEN) { 907 if (typeof cb === 'function') cb(new Error('not opened')); 908 else { 909 delete instance._queue; 910 instance.emit('error', new Error('not opened')); 911 } 912 return; 913 } 914 915 options.fin = false; 916 instance._sender.send(data, options); 917 }); 918 919 stream.on('end', function end() { 920 if (instance.readyState !== WebSocket.OPEN) { 921 if (typeof cb === 'function') cb(new Error('not opened')); 922 else { 923 delete instance._queue; 924 instance.emit('error', new Error('not opened')); 925 } 926 return; 927 } 928 929 options.fin = true; 930 instance._sender.send(null, options); 931 932 if (typeof cb === 'function') cb(null); 933 }); 934} 935 936function cleanupWebsocketResources(error) { 937 if (this.readyState === WebSocket.CLOSED) return; 938 939 this.readyState = WebSocket.CLOSED; 940 941 clearTimeout(this._closeTimer); 942 this._closeTimer = null; 943 944 // If the connection was closed abnormally (with an error), or if 945 // the close control frame was not received then the close code 946 // must default to 1006. 947 if (error || !this._closeReceived) { 948 this._closeCode = 1006; 949 } 950 this.emit('close', this._closeCode || 1000, this._closeMessage || ''); 951 952 if (this._socket) { 953 if (this._ultron) this._ultron.destroy(); 954 this._socket.on('error', function onerror() { 955 try { this.destroy(); } 956 catch (e) {} 957 }); 958 959 try { 960 if (!error) this._socket.end(); 961 else this._socket.destroy(); 962 } catch (e) { /* Ignore termination errors */ } 963 964 this._socket = null; 965 this._ultron = null; 966 } 967 968 if (this._sender) { 969 this._sender.removeAllListeners(); 970 this._sender = null; 971 } 972 973 if (this._receiver) { 974 this._receiver.cleanup(); 975 this._receiver = null; 976 } 977 978 if (this.extensions[PerMessageDeflate.extensionName]) { 979 this.extensions[PerMessageDeflate.extensionName].cleanup(); 980 } 981 982 this.extensions = null; 983 984 this.removeAllListeners(); 985 this.on('error', function onerror() {}); // catch all errors after this 986 delete this._queue; 987}