'use strict' const { Buffer } = require('buffer') const WS = require('ws') const debug = require('debug')('mqttjs:ws') const duplexify = require('duplexify') const Transform = require('readable-stream').Transform const WSS_OPTIONS = [ 'rejectUnauthorized', 'ca', 'cert', 'key', 'pfx', 'passphrase' ] // eslint-disable-next-line camelcase const IS_BROWSER = (typeof process !== 'undefined' && process.title === 'browser') || typeof __webpack_require__ === 'function' function buildUrl (opts, client) { let url = opts.protocol + '://' + opts.hostname + ':' + opts.port + opts.path if (typeof (opts.transformWsUrl) === 'function') { url = opts.transformWsUrl(url, opts, client) } return url } function setDefaultOpts (opts) { const options = opts if (!opts.hostname) { options.hostname = 'localhost' } if (!opts.port) { if (opts.protocol === 'wss') { options.port = 443 } else { options.port = 80 } } if (!opts.path) { options.path = '/' } if (!opts.wsOptions) { options.wsOptions = {} } if (!IS_BROWSER && opts.protocol === 'wss') { // Add cert/key/ca etc options WSS_OPTIONS.forEach(function (prop) { if (Object.prototype.hasOwnProperty.call(opts, prop) && !Object.prototype.hasOwnProperty.call(opts.wsOptions, prop)) { options.wsOptions[prop] = opts[prop] } }) } return options } function setDefaultBrowserOpts (opts) { const options = setDefaultOpts(opts) if (!options.hostname) { options.hostname = options.host } if (!options.hostname) { // Throwing an error in a Web Worker if no `hostname` is given, because we // can not determine the `hostname` automatically. If connecting to // localhost, please supply the `hostname` as an argument. if (typeof (document) === 'undefined') { throw new Error('Could not determine host. Specify host manually.') } const parsed = new URL(document.URL) options.hostname = parsed.hostname if (!options.port) { options.port = parsed.port } } // objectMode should be defined for logic if (options.objectMode === undefined) { options.objectMode = !(options.binary === true || options.binary === undefined) } return options } function createWebSocket (client, url, opts) { debug('createWebSocket') debug('protocol: ' + opts.protocolId + ' ' + opts.protocolVersion) const websocketSubProtocol = (opts.protocolId === 'MQIsdp') && (opts.protocolVersion === 3) ? 'mqttv3.1' : 'mqtt' debug('creating new Websocket for url: ' + url + ' and protocol: ' + websocketSubProtocol) const socket = new WS(url, [websocketSubProtocol], opts.wsOptions) return socket } function createBrowserWebSocket (client, opts) { const websocketSubProtocol = (opts.protocolId === 'MQIsdp') && (opts.protocolVersion === 3) ? 'mqttv3.1' : 'mqtt' const url = buildUrl(opts, client) /* global WebSocket */ const socket = new WebSocket(url, [websocketSubProtocol]) socket.binaryType = 'arraybuffer' return socket } function streamBuilder (client, opts) { debug('streamBuilder') const options = setDefaultOpts(opts) const url = buildUrl(options, client) const socket = createWebSocket(client, url, options) const webSocketStream = WS.createWebSocketStream(socket, options.wsOptions) webSocketStream.url = url socket.on('close', () => { webSocketStream.destroy() }) return webSocketStream } function browserStreamBuilder (client, opts) { debug('browserStreamBuilder') let stream const options = setDefaultBrowserOpts(opts) // sets the maximum socket buffer size before throttling const bufferSize = options.browserBufferSize || 1024 * 512 const bufferTimeout = opts.browserBufferTimeout || 1000 const coerceToBuffer = !opts.objectMode const socket = createBrowserWebSocket(client, opts) const proxy = buildProxy(opts, socketWriteBrowser, socketEndBrowser) if (!opts.objectMode) { proxy._writev = writev } proxy.on('close', () => { socket.close() }) const eventListenerSupport = (typeof socket.addEventListener !== 'undefined') // was already open when passed in if (socket.readyState === socket.OPEN) { stream = proxy } else { stream = stream = duplexify(undefined, undefined, opts) if (!opts.objectMode) { stream._writev = writev } if (eventListenerSupport) { socket.addEventListener('open', onopen) } else { socket.onopen = onopen } } stream.socket = socket if (eventListenerSupport) { socket.addEventListener('close', onclose) socket.addEventListener('error', onerror) socket.addEventListener('message', onmessage) } else { socket.onclose = onclose socket.onerror = onerror socket.onmessage = onmessage } // methods for browserStreamBuilder function buildProxy (options, socketWrite, socketEnd) { const proxy = new Transform({ objectModeMode: options.objectMode }) proxy._write = socketWrite proxy._flush = socketEnd return proxy } function onopen () { stream.setReadable(proxy) stream.setWritable(proxy) stream.emit('connect') } function onclose () { stream.end() stream.destroy() } function onerror (err) { stream.destroy(err) } function onmessage (event) { let data = event.data if (data instanceof ArrayBuffer) data = Buffer.from(data) else data = Buffer.from(data, 'utf8') proxy.push(data) } // this is to be enabled only if objectMode is false function writev (chunks, cb) { const buffers = new Array(chunks.length) for (let i = 0; i < chunks.length; i++) { if (typeof chunks[i].chunk === 'string') { buffers[i] = Buffer.from(chunks[i], 'utf8') } else { buffers[i] = chunks[i].chunk } } this._write(Buffer.concat(buffers), 'binary', cb) } function socketWriteBrowser (chunk, enc, next) { if (socket.bufferedAmount > bufferSize) { // throttle data until buffered amount is reduced. setTimeout(socketWriteBrowser, bufferTimeout, chunk, enc, next) } if (coerceToBuffer && typeof chunk === 'string') { chunk = Buffer.from(chunk, 'utf8') } try { socket.send(chunk) } catch (err) { return next(err) } next() } function socketEndBrowser (done) { socket.close() done() } // end methods for browserStreamBuilder return stream } if (IS_BROWSER) { module.exports = browserStreamBuilder } else { module.exports = streamBuilder }