Sender.js (7884B)
1/*! 2 * ws: a node.js websocket client 3 * Copyright(c) 2011 Einar Otto Stangvik <einaros@gmail.com> 4 * MIT Licensed 5 */ 6 7var events = require('events') 8 , util = require('util') 9 , crypto = require('crypto') 10 , EventEmitter = events.EventEmitter 11 , ErrorCodes = require('./ErrorCodes') 12 , bufferUtil = require('./BufferUtil') 13 , PerMessageDeflate = require('./PerMessageDeflate'); 14 15/** 16 * HyBi Sender implementation 17 */ 18 19function Sender(socket, extensions) { 20 if (this instanceof Sender === false) { 21 throw new TypeError("Classes can't be function-called"); 22 } 23 24 events.EventEmitter.call(this); 25 26 this._socket = socket; 27 this.extensions = extensions || {}; 28 this.firstFragment = true; 29 this.compress = false; 30 this.messageHandlers = []; 31 this.processing = false; 32} 33 34/** 35 * Inherits from EventEmitter. 36 */ 37 38util.inherits(Sender, events.EventEmitter); 39 40/** 41 * Sends a close instruction to the remote party. 42 * 43 * @api public 44 */ 45 46Sender.prototype.close = function(code, data, mask, cb) { 47 if (typeof code !== 'undefined') { 48 if (typeof code !== 'number' || 49 !ErrorCodes.isValidErrorCode(code)) throw new Error('first argument must be a valid error code number'); 50 } 51 code = code || 1000; 52 var dataBuffer = new Buffer(2 + (data ? Buffer.byteLength(data) : 0)); 53 writeUInt16BE.call(dataBuffer, code, 0); 54 if (dataBuffer.length > 2) dataBuffer.write(data, 2); 55 56 var self = this; 57 this.messageHandlers.push(function() { 58 self.frameAndSend(0x8, dataBuffer, true, mask); 59 if (typeof cb == 'function') cb(); 60 }); 61 this.flush(); 62}; 63 64/** 65 * Sends a ping message to the remote party. 66 * 67 * @api public 68 */ 69 70Sender.prototype.ping = function(data, options) { 71 var mask = options && options.mask; 72 var self = this; 73 this.messageHandlers.push(function() { 74 self.frameAndSend(0x9, data || '', true, mask); 75 }); 76 this.flush(); 77}; 78 79/** 80 * Sends a pong message to the remote party. 81 * 82 * @api public 83 */ 84 85Sender.prototype.pong = function(data, options) { 86 var mask = options && options.mask; 87 var self = this; 88 this.messageHandlers.push(function() { 89 self.frameAndSend(0xa, data || '', true, mask); 90 }); 91 this.flush(); 92}; 93 94/** 95 * Sends text or binary data to the remote party. 96 * 97 * @api public 98 */ 99 100Sender.prototype.send = function(data, options, cb) { 101 var finalFragment = options && options.fin === false ? false : true; 102 var mask = options && options.mask; 103 var compress = options && options.compress; 104 var opcode = options && options.binary ? 2 : 1; 105 if (this.firstFragment === false) { 106 opcode = 0; 107 compress = false; 108 } else { 109 this.firstFragment = false; 110 this.compress = compress; 111 } 112 if (finalFragment) this.firstFragment = true 113 114 var compressFragment = this.compress; 115 116 var self = this; 117 this.messageHandlers.push(function() { 118 if (!data || !compressFragment) { 119 self.frameAndSend(opcode, data, finalFragment, mask, compress, cb); 120 return; 121 } 122 123 self.processing = true; 124 self.applyExtensions(data, finalFragment, compressFragment, function(err, data) { 125 if (err) { 126 if (typeof cb == 'function') cb(err); 127 else self.emit('error', err); 128 return; 129 } 130 self.frameAndSend(opcode, data, finalFragment, mask, compress, cb); 131 self.processing = false; 132 self.flush(); 133 }); 134 }); 135 this.flush(); 136}; 137 138/** 139 * Frames and sends a piece of data according to the HyBi WebSocket protocol. 140 * 141 * @api private 142 */ 143 144Sender.prototype.frameAndSend = function(opcode, data, finalFragment, maskData, compressed, cb) { 145 var canModifyData = false; 146 147 if (!data) { 148 try { 149 this._socket.write(new Buffer([opcode | (finalFragment ? 0x80 : 0), 0 | (maskData ? 0x80 : 0)].concat(maskData ? [0, 0, 0, 0] : [])), 'binary', cb); 150 } 151 catch (e) { 152 if (typeof cb == 'function') cb(e); 153 else this.emit('error', e); 154 } 155 return; 156 } 157 158 if (!Buffer.isBuffer(data)) { 159 canModifyData = true; 160 if (data && (typeof data.byteLength !== 'undefined' || typeof data.buffer !== 'undefined')) { 161 data = getArrayBuffer(data); 162 } else { 163 // 164 // If people want to send a number, this would allocate the number in 165 // bytes as memory size instead of storing the number as buffer value. So 166 // we need to transform it to string in order to prevent possible 167 // vulnerabilities / memory attacks. 168 // 169 if (typeof data === 'number') data = data.toString(); 170 171 data = new Buffer(data); 172 } 173 } 174 175 var dataLength = data.length 176 , dataOffset = maskData ? 6 : 2 177 , secondByte = dataLength; 178 179 if (dataLength >= 65536) { 180 dataOffset += 8; 181 secondByte = 127; 182 } 183 else if (dataLength > 125) { 184 dataOffset += 2; 185 secondByte = 126; 186 } 187 188 var mergeBuffers = dataLength < 32768 || (maskData && !canModifyData); 189 var totalLength = mergeBuffers ? dataLength + dataOffset : dataOffset; 190 var outputBuffer = new Buffer(totalLength); 191 outputBuffer[0] = finalFragment ? opcode | 0x80 : opcode; 192 if (compressed) outputBuffer[0] |= 0x40; 193 194 switch (secondByte) { 195 case 126: 196 writeUInt16BE.call(outputBuffer, dataLength, 2); 197 break; 198 case 127: 199 writeUInt32BE.call(outputBuffer, 0, 2); 200 writeUInt32BE.call(outputBuffer, dataLength, 6); 201 } 202 203 if (maskData) { 204 outputBuffer[1] = secondByte | 0x80; 205 var mask = getRandomMask(); 206 outputBuffer[dataOffset - 4] = mask[0]; 207 outputBuffer[dataOffset - 3] = mask[1]; 208 outputBuffer[dataOffset - 2] = mask[2]; 209 outputBuffer[dataOffset - 1] = mask[3]; 210 if (mergeBuffers) { 211 bufferUtil.mask(data, mask, outputBuffer, dataOffset, dataLength); 212 try { 213 this._socket.write(outputBuffer, 'binary', cb); 214 } 215 catch (e) { 216 if (typeof cb == 'function') cb(e); 217 else this.emit('error', e); 218 } 219 } 220 else { 221 bufferUtil.mask(data, mask, data, 0, dataLength); 222 try { 223 this._socket.write(outputBuffer, 'binary'); 224 this._socket.write(data, 'binary', cb); 225 } 226 catch (e) { 227 if (typeof cb == 'function') cb(e); 228 else this.emit('error', e); 229 } 230 } 231 } 232 else { 233 outputBuffer[1] = secondByte; 234 if (mergeBuffers) { 235 data.copy(outputBuffer, dataOffset); 236 try { 237 this._socket.write(outputBuffer, 'binary', cb); 238 } 239 catch (e) { 240 if (typeof cb == 'function') cb(e); 241 else this.emit('error', e); 242 } 243 } 244 else { 245 try { 246 this._socket.write(outputBuffer, 'binary'); 247 this._socket.write(data, 'binary', cb); 248 } 249 catch (e) { 250 if (typeof cb == 'function') cb(e); 251 else this.emit('error', e); 252 } 253 } 254 } 255}; 256 257/** 258 * Execute message handler buffers 259 * 260 * @api private 261 */ 262 263Sender.prototype.flush = function() { 264 while (!this.processing && this.messageHandlers.length) { 265 this.messageHandlers.shift()(); 266 } 267}; 268 269/** 270 * Apply extensions to message 271 * 272 * @api private 273 */ 274 275Sender.prototype.applyExtensions = function(data, fin, compress, callback) { 276 if ((data.buffer || data) instanceof ArrayBuffer) { 277 data = getArrayBuffer(data); 278 } 279 this.extensions[PerMessageDeflate.extensionName].compress(data, fin, callback); 280}; 281 282module.exports = Sender; 283 284function writeUInt16BE(value, offset) { 285 this[offset] = (value & 0xff00)>>8; 286 this[offset+1] = value & 0xff; 287} 288 289function writeUInt32BE(value, offset) { 290 this[offset] = (value & 0xff000000)>>24; 291 this[offset+1] = (value & 0xff0000)>>16; 292 this[offset+2] = (value & 0xff00)>>8; 293 this[offset+3] = value & 0xff; 294} 295 296function getArrayBuffer(data) { 297 // data is either an ArrayBuffer or ArrayBufferView. 298 var array = new Uint8Array(data.buffer || data) 299 , l = data.byteLength || data.length 300 , o = data.byteOffset || 0 301 , buffer = new Buffer(l); 302 for (var i = 0; i < l; ++i) { 303 buffer[i] = array[o+i]; 304 } 305 return buffer; 306} 307 308function getRandomMask() { 309 return crypto.randomBytes(4); 310}