WebSocketServer.js (16935B)
1/*! 2 * ws: a node.js websocket client 3 * Copyright(c) 2011 Einar Otto Stangvik <einaros@gmail.com> 4 * MIT Licensed 5 */ 6 7var util = require('util') 8 , events = require('events') 9 , http = require('http') 10 , crypto = require('crypto') 11 , Options = require('options') 12 , WebSocket = require('./WebSocket') 13 , Extensions = require('./Extensions') 14 , PerMessageDeflate = require('./PerMessageDeflate') 15 , tls = require('tls') 16 , url = require('url'); 17 18/** 19 * WebSocket Server implementation 20 */ 21 22function WebSocketServer(options, callback) { 23 if (this instanceof WebSocketServer === false) { 24 return new WebSocketServer(options, callback); 25 } 26 27 events.EventEmitter.call(this); 28 29 options = new Options({ 30 host: '0.0.0.0', 31 port: null, 32 server: null, 33 verifyClient: null, 34 handleProtocols: null, 35 path: null, 36 noServer: false, 37 disableHixie: false, 38 clientTracking: true, 39 perMessageDeflate: true, 40 maxPayload: 100 * 1024 * 1024 41 }).merge(options); 42 43 if (!options.isDefinedAndNonNull('port') && !options.isDefinedAndNonNull('server') && !options.value.noServer) { 44 throw new TypeError('`port` or a `server` must be provided'); 45 } 46 47 var self = this; 48 49 if (options.isDefinedAndNonNull('port')) { 50 this._server = http.createServer(function (req, res) { 51 var body = http.STATUS_CODES[426]; 52 res.writeHead(426, { 53 'Content-Length': body.length, 54 'Content-Type': 'text/plain' 55 }); 56 res.end(body); 57 }); 58 this._server.allowHalfOpen = false; 59 this._server.listen(options.value.port, options.value.host, callback); 60 this._closeServer = function() { if (self._server) self._server.close(); }; 61 } 62 else if (options.value.server) { 63 this._server = options.value.server; 64 if (options.value.path) { 65 // take note of the path, to avoid collisions when multiple websocket servers are 66 // listening on the same http server 67 if (this._server._webSocketPaths && options.value.server._webSocketPaths[options.value.path]) { 68 throw new Error('two instances of WebSocketServer cannot listen on the same http server path'); 69 } 70 if (typeof this._server._webSocketPaths !== 'object') { 71 this._server._webSocketPaths = {}; 72 } 73 this._server._webSocketPaths[options.value.path] = 1; 74 } 75 } 76 if (this._server) { 77 this._onceServerListening = function() { self.emit('listening'); }; 78 this._server.once('listening', this._onceServerListening); 79 } 80 81 if (typeof this._server != 'undefined') { 82 this._onServerError = function(error) { self.emit('error', error) }; 83 this._server.on('error', this._onServerError); 84 this._onServerUpgrade = function(req, socket, upgradeHead) { 85 //copy upgradeHead to avoid retention of large slab buffers used in node core 86 var head = new Buffer(upgradeHead.length); 87 upgradeHead.copy(head); 88 89 self.handleUpgrade(req, socket, head, function(client) { 90 self.emit('connection'+req.url, client); 91 self.emit('connection', client); 92 }); 93 }; 94 this._server.on('upgrade', this._onServerUpgrade); 95 } 96 97 this.options = options.value; 98 this.path = options.value.path; 99 this.clients = []; 100} 101 102/** 103 * Inherits from EventEmitter. 104 */ 105 106util.inherits(WebSocketServer, events.EventEmitter); 107 108/** 109 * Immediately shuts down the connection. 110 * 111 * @api public 112 */ 113 114WebSocketServer.prototype.close = function(callback) { 115 // terminate all associated clients 116 var error = null; 117 try { 118 for (var i = 0, l = this.clients.length; i < l; ++i) { 119 this.clients[i].terminate(); 120 } 121 } 122 catch (e) { 123 error = e; 124 } 125 126 // remove path descriptor, if any 127 if (this.path && this._server._webSocketPaths) { 128 delete this._server._webSocketPaths[this.path]; 129 if (Object.keys(this._server._webSocketPaths).length == 0) { 130 delete this._server._webSocketPaths; 131 } 132 } 133 134 // close the http server if it was internally created 135 try { 136 if (typeof this._closeServer !== 'undefined') { 137 this._closeServer(); 138 } 139 } 140 finally { 141 if (this._server) { 142 this._server.removeListener('listening', this._onceServerListening); 143 this._server.removeListener('error', this._onServerError); 144 this._server.removeListener('upgrade', this._onServerUpgrade); 145 } 146 delete this._server; 147 } 148 if(callback) 149 callback(error); 150 else if(error) 151 throw error; 152} 153 154/** 155 * Handle a HTTP Upgrade request. 156 * 157 * @api public 158 */ 159 160WebSocketServer.prototype.handleUpgrade = function(req, socket, upgradeHead, cb) { 161 // check for wrong path 162 if (this.options.path) { 163 var u = url.parse(req.url); 164 if (u && u.pathname !== this.options.path) return; 165 } 166 167 if (typeof req.headers.upgrade === 'undefined' || req.headers.upgrade.toLowerCase() !== 'websocket') { 168 abortConnection(socket, 400, 'Bad Request'); 169 return; 170 } 171 172 if (req.headers['sec-websocket-key1']) handleHixieUpgrade.apply(this, arguments); 173 else handleHybiUpgrade.apply(this, arguments); 174} 175 176module.exports = WebSocketServer; 177 178/** 179 * Entirely private apis, 180 * which may or may not be bound to a sepcific WebSocket instance. 181 */ 182 183function handleHybiUpgrade(req, socket, upgradeHead, cb) { 184 // handle premature socket errors 185 var errorHandler = function() { 186 try { socket.destroy(); } catch (e) {} 187 } 188 socket.on('error', errorHandler); 189 190 // verify key presence 191 if (!req.headers['sec-websocket-key']) { 192 abortConnection(socket, 400, 'Bad Request'); 193 return; 194 } 195 196 // verify version 197 var version = parseInt(req.headers['sec-websocket-version']); 198 if ([8, 13].indexOf(version) === -1) { 199 abortConnection(socket, 400, 'Bad Request'); 200 return; 201 } 202 203 // verify protocol 204 var protocols = req.headers['sec-websocket-protocol']; 205 206 // verify client 207 var origin = version < 13 ? 208 req.headers['sec-websocket-origin'] : 209 req.headers['origin']; 210 211 // handle extensions offer 212 var extensionsOffer = Extensions.parse(req.headers['sec-websocket-extensions']); 213 214 // handler to call when the connection sequence completes 215 var self = this; 216 var completeHybiUpgrade2 = function(protocol) { 217 218 // calc key 219 var key = req.headers['sec-websocket-key']; 220 var shasum = crypto.createHash('sha1'); 221 shasum.update(key + "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"); 222 key = shasum.digest('base64'); 223 224 var headers = [ 225 'HTTP/1.1 101 Switching Protocols' 226 , 'Upgrade: websocket' 227 , 'Connection: Upgrade' 228 , 'Sec-WebSocket-Accept: ' + key 229 ]; 230 231 if (typeof protocol != 'undefined') { 232 headers.push('Sec-WebSocket-Protocol: ' + protocol); 233 } 234 235 var extensions = {}; 236 try { 237 extensions = acceptExtensions.call(self, extensionsOffer); 238 } catch (err) { 239 abortConnection(socket, 400, 'Bad Request'); 240 return; 241 } 242 243 if (Object.keys(extensions).length) { 244 var serverExtensions = {}; 245 Object.keys(extensions).forEach(function(token) { 246 serverExtensions[token] = [extensions[token].params] 247 }); 248 headers.push('Sec-WebSocket-Extensions: ' + Extensions.format(serverExtensions)); 249 } 250 251 // allows external modification/inspection of handshake headers 252 self.emit('headers', headers); 253 254 socket.setTimeout(0); 255 socket.setNoDelay(true); 256 try { 257 socket.write(headers.concat('', '').join('\r\n')); 258 } 259 catch (e) { 260 // if the upgrade write fails, shut the connection down hard 261 try { socket.destroy(); } catch (e) {} 262 return; 263 } 264 265 var client = new WebSocket([req, socket, upgradeHead], { 266 protocolVersion: version, 267 protocol: protocol, 268 extensions: extensions, 269 maxPayload: self.options.maxPayload 270 }); 271 272 if (self.options.clientTracking) { 273 self.clients.push(client); 274 client.on('close', function() { 275 var index = self.clients.indexOf(client); 276 if (index != -1) { 277 self.clients.splice(index, 1); 278 } 279 }); 280 } 281 282 // signal upgrade complete 283 socket.removeListener('error', errorHandler); 284 cb(client); 285 } 286 287 // optionally call external protocol selection handler before 288 // calling completeHybiUpgrade2 289 var completeHybiUpgrade1 = function() { 290 // choose from the sub-protocols 291 if (typeof self.options.handleProtocols == 'function') { 292 var protList = (protocols || "").split(/, */); 293 var callbackCalled = false; 294 var res = self.options.handleProtocols(protList, function(result, protocol) { 295 callbackCalled = true; 296 if (!result) abortConnection(socket, 401, 'Unauthorized'); 297 else completeHybiUpgrade2(protocol); 298 }); 299 if (!callbackCalled) { 300 // the handleProtocols handler never called our callback 301 abortConnection(socket, 501, 'Could not process protocols'); 302 } 303 return; 304 } else { 305 if (typeof protocols !== 'undefined') { 306 completeHybiUpgrade2(protocols.split(/, */)[0]); 307 } 308 else { 309 completeHybiUpgrade2(); 310 } 311 } 312 } 313 314 // optionally call external client verification handler 315 if (typeof this.options.verifyClient == 'function') { 316 var info = { 317 origin: origin, 318 secure: typeof req.connection.authorized !== 'undefined' || typeof req.connection.encrypted !== 'undefined', 319 req: req 320 }; 321 if (this.options.verifyClient.length == 2) { 322 this.options.verifyClient(info, function(result, code, name) { 323 if (typeof code === 'undefined') code = 401; 324 if (typeof name === 'undefined') name = http.STATUS_CODES[code]; 325 326 if (!result) abortConnection(socket, code, name); 327 else completeHybiUpgrade1(); 328 }); 329 return; 330 } 331 else if (!this.options.verifyClient(info)) { 332 abortConnection(socket, 401, 'Unauthorized'); 333 return; 334 } 335 } 336 337 completeHybiUpgrade1(); 338} 339 340function handleHixieUpgrade(req, socket, upgradeHead, cb) { 341 // handle premature socket errors 342 var errorHandler = function() { 343 try { socket.destroy(); } catch (e) {} 344 } 345 socket.on('error', errorHandler); 346 347 // bail if options prevent hixie 348 if (this.options.disableHixie) { 349 abortConnection(socket, 401, 'Hixie support disabled'); 350 return; 351 } 352 353 // verify key presence 354 if (!req.headers['sec-websocket-key2']) { 355 abortConnection(socket, 400, 'Bad Request'); 356 return; 357 } 358 359 var origin = req.headers['origin'] 360 , self = this; 361 362 // setup handshake completion to run after client has been verified 363 var onClientVerified = function() { 364 var wshost; 365 if (!req.headers['x-forwarded-host']) 366 wshost = req.headers.host; 367 else 368 wshost = req.headers['x-forwarded-host']; 369 var location = ((req.headers['x-forwarded-proto'] === 'https' || socket.encrypted) ? 'wss' : 'ws') + '://' + wshost + req.url 370 , protocol = req.headers['sec-websocket-protocol']; 371 372 // build the response header and return a Buffer 373 var buildResponseHeader = function() { 374 var headers = [ 375 'HTTP/1.1 101 Switching Protocols' 376 , 'Upgrade: WebSocket' 377 , 'Connection: Upgrade' 378 , 'Sec-WebSocket-Location: ' + location 379 ]; 380 if (typeof protocol != 'undefined') headers.push('Sec-WebSocket-Protocol: ' + protocol); 381 if (typeof origin != 'undefined') headers.push('Sec-WebSocket-Origin: ' + origin); 382 383 return new Buffer(headers.concat('', '').join('\r\n')); 384 }; 385 386 // send handshake response before receiving the nonce 387 var handshakeResponse = function() { 388 389 socket.setTimeout(0); 390 socket.setNoDelay(true); 391 392 var headerBuffer = buildResponseHeader(); 393 394 try { 395 socket.write(headerBuffer, 'binary', function(err) { 396 // remove listener if there was an error 397 if (err) socket.removeListener('data', handler); 398 return; 399 }); 400 } catch (e) { 401 try { socket.destroy(); } catch (e) {} 402 return; 403 }; 404 }; 405 406 // handshake completion code to run once nonce has been successfully retrieved 407 var completeHandshake = function(nonce, rest, headerBuffer) { 408 // calculate key 409 var k1 = req.headers['sec-websocket-key1'] 410 , k2 = req.headers['sec-websocket-key2'] 411 , md5 = crypto.createHash('md5'); 412 413 [k1, k2].forEach(function (k) { 414 var n = parseInt(k.replace(/[^\d]/g, '')) 415 , spaces = k.replace(/[^ ]/g, '').length; 416 if (spaces === 0 || n % spaces !== 0){ 417 abortConnection(socket, 400, 'Bad Request'); 418 return; 419 } 420 n /= spaces; 421 md5.update(String.fromCharCode( 422 n >> 24 & 0xFF, 423 n >> 16 & 0xFF, 424 n >> 8 & 0xFF, 425 n & 0xFF)); 426 }); 427 md5.update(nonce.toString('binary')); 428 429 socket.setTimeout(0); 430 socket.setNoDelay(true); 431 432 try { 433 var hashBuffer = new Buffer(md5.digest('binary'), 'binary'); 434 var handshakeBuffer = new Buffer(headerBuffer.length + hashBuffer.length); 435 headerBuffer.copy(handshakeBuffer, 0); 436 hashBuffer.copy(handshakeBuffer, headerBuffer.length); 437 438 // do a single write, which - upon success - causes a new client websocket to be setup 439 socket.write(handshakeBuffer, 'binary', function(err) { 440 if (err) return; // do not create client if an error happens 441 var client = new WebSocket([req, socket, rest], { 442 protocolVersion: 'hixie-76', 443 protocol: protocol 444 }); 445 if (self.options.clientTracking) { 446 self.clients.push(client); 447 client.on('close', function() { 448 var index = self.clients.indexOf(client); 449 if (index != -1) { 450 self.clients.splice(index, 1); 451 } 452 }); 453 } 454 455 // signal upgrade complete 456 socket.removeListener('error', errorHandler); 457 cb(client); 458 }); 459 } 460 catch (e) { 461 try { socket.destroy(); } catch (e) {} 462 return; 463 } 464 } 465 466 // retrieve nonce 467 var nonceLength = 8; 468 if (upgradeHead && upgradeHead.length >= nonceLength) { 469 var nonce = upgradeHead.slice(0, nonceLength); 470 var rest = upgradeHead.length > nonceLength ? upgradeHead.slice(nonceLength) : null; 471 completeHandshake.call(self, nonce, rest, buildResponseHeader()); 472 } 473 else { 474 // nonce not present in upgradeHead 475 var nonce = new Buffer(nonceLength); 476 upgradeHead.copy(nonce, 0); 477 var received = upgradeHead.length; 478 var rest = null; 479 var handler = function (data) { 480 var toRead = Math.min(data.length, nonceLength - received); 481 if (toRead === 0) return; 482 data.copy(nonce, received, 0, toRead); 483 received += toRead; 484 if (received == nonceLength) { 485 socket.removeListener('data', handler); 486 if (toRead < data.length) rest = data.slice(toRead); 487 488 // complete the handshake but send empty buffer for headers since they have already been sent 489 completeHandshake.call(self, nonce, rest, new Buffer(0)); 490 } 491 } 492 493 // handle additional data as we receive it 494 socket.on('data', handler); 495 496 // send header response before we have the nonce to fix haproxy buffering 497 handshakeResponse(); 498 } 499 } 500 501 // verify client 502 if (typeof this.options.verifyClient == 'function') { 503 var info = { 504 origin: origin, 505 secure: typeof req.connection.authorized !== 'undefined' || typeof req.connection.encrypted !== 'undefined', 506 req: req 507 }; 508 if (this.options.verifyClient.length == 2) { 509 var self = this; 510 this.options.verifyClient(info, function(result, code, name) { 511 if (typeof code === 'undefined') code = 401; 512 if (typeof name === 'undefined') name = http.STATUS_CODES[code]; 513 514 if (!result) abortConnection(socket, code, name); 515 else onClientVerified.apply(self); 516 }); 517 return; 518 } 519 else if (!this.options.verifyClient(info)) { 520 abortConnection(socket, 401, 'Unauthorized'); 521 return; 522 } 523 } 524 525 // no client verification required 526 onClientVerified(); 527} 528 529function acceptExtensions(offer) { 530 var extensions = {}; 531 var options = this.options.perMessageDeflate; 532 var maxPayload = this.options.maxPayload; 533 if (options && offer[PerMessageDeflate.extensionName]) { 534 var perMessageDeflate = new PerMessageDeflate(options !== true ? options : {}, true, maxPayload); 535 perMessageDeflate.accept(offer[PerMessageDeflate.extensionName]); 536 extensions[PerMessageDeflate.extensionName] = perMessageDeflate; 537 } 538 return extensions; 539} 540 541function abortConnection(socket, code, name) { 542 try { 543 var response = [ 544 'HTTP/1.1 ' + code + ' ' + name, 545 'Content-type: text/html' 546 ]; 547 socket.write(response.concat('', '').join('\r\n')); 548 } 549 catch (e) { /* ignore errors - we've aborted this connection */ } 550 finally { 551 // ensure that an early aborted connection is shut down completely 552 try { socket.destroy(); } catch (e) {} 553 } 554}