258 lines
6.4 KiB
JavaScript
258 lines
6.4 KiB
JavaScript
'use strict'
|
|
|
|
const { Buffer } = require('buffer')
|
|
const WS = require('ws')
|
|
const debug = require('debug')('mqttjs:ws')
|
|
const duplexify = require('duplexify')
|
|
const Transform = require('readable-stream').Transform
|
|
|
|
const WSS_OPTIONS = [
|
|
'rejectUnauthorized',
|
|
'ca',
|
|
'cert',
|
|
'key',
|
|
'pfx',
|
|
'passphrase'
|
|
]
|
|
// eslint-disable-next-line camelcase
|
|
const IS_BROWSER = (typeof process !== 'undefined' && process.title === 'browser') || typeof __webpack_require__ === 'function'
|
|
function buildUrl (opts, client) {
|
|
let url = opts.protocol + '://' + opts.hostname + ':' + opts.port + opts.path
|
|
if (typeof (opts.transformWsUrl) === 'function') {
|
|
url = opts.transformWsUrl(url, opts, client)
|
|
}
|
|
return url
|
|
}
|
|
|
|
function setDefaultOpts (opts) {
|
|
const options = opts
|
|
if (!opts.hostname) {
|
|
options.hostname = 'localhost'
|
|
}
|
|
if (!opts.port) {
|
|
if (opts.protocol === 'wss') {
|
|
options.port = 443
|
|
} else {
|
|
options.port = 80
|
|
}
|
|
}
|
|
if (!opts.path) {
|
|
options.path = '/'
|
|
}
|
|
|
|
if (!opts.wsOptions) {
|
|
options.wsOptions = {}
|
|
}
|
|
if (!IS_BROWSER && opts.protocol === 'wss') {
|
|
// Add cert/key/ca etc options
|
|
WSS_OPTIONS.forEach(function (prop) {
|
|
if (Object.prototype.hasOwnProperty.call(opts, prop) && !Object.prototype.hasOwnProperty.call(opts.wsOptions, prop)) {
|
|
options.wsOptions[prop] = opts[prop]
|
|
}
|
|
})
|
|
}
|
|
|
|
return options
|
|
}
|
|
|
|
function setDefaultBrowserOpts (opts) {
|
|
const options = setDefaultOpts(opts)
|
|
|
|
if (!options.hostname) {
|
|
options.hostname = options.host
|
|
}
|
|
|
|
if (!options.hostname) {
|
|
// Throwing an error in a Web Worker if no `hostname` is given, because we
|
|
// can not determine the `hostname` automatically. If connecting to
|
|
// localhost, please supply the `hostname` as an argument.
|
|
if (typeof (document) === 'undefined') {
|
|
throw new Error('Could not determine host. Specify host manually.')
|
|
}
|
|
const parsed = new URL(document.URL)
|
|
options.hostname = parsed.hostname
|
|
|
|
if (!options.port) {
|
|
options.port = parsed.port
|
|
}
|
|
}
|
|
|
|
// objectMode should be defined for logic
|
|
if (options.objectMode === undefined) {
|
|
options.objectMode = !(options.binary === true || options.binary === undefined)
|
|
}
|
|
|
|
return options
|
|
}
|
|
|
|
function createWebSocket (client, url, opts) {
|
|
debug('createWebSocket')
|
|
debug('protocol: ' + opts.protocolId + ' ' + opts.protocolVersion)
|
|
const websocketSubProtocol =
|
|
(opts.protocolId === 'MQIsdp') && (opts.protocolVersion === 3)
|
|
? 'mqttv3.1'
|
|
: 'mqtt'
|
|
|
|
debug('creating new Websocket for url: ' + url + ' and protocol: ' + websocketSubProtocol)
|
|
const socket = new WS(url, [websocketSubProtocol], opts.wsOptions)
|
|
return socket
|
|
}
|
|
|
|
function createBrowserWebSocket (client, opts) {
|
|
const websocketSubProtocol =
|
|
(opts.protocolId === 'MQIsdp') && (opts.protocolVersion === 3)
|
|
? 'mqttv3.1'
|
|
: 'mqtt'
|
|
|
|
const url = buildUrl(opts, client)
|
|
/* global WebSocket */
|
|
const socket = new WebSocket(url, [websocketSubProtocol])
|
|
socket.binaryType = 'arraybuffer'
|
|
return socket
|
|
}
|
|
|
|
function streamBuilder (client, opts) {
|
|
debug('streamBuilder')
|
|
const options = setDefaultOpts(opts)
|
|
const url = buildUrl(options, client)
|
|
const socket = createWebSocket(client, url, options)
|
|
const webSocketStream = WS.createWebSocketStream(socket, options.wsOptions)
|
|
webSocketStream.url = url
|
|
socket.on('close', () => { webSocketStream.destroy() })
|
|
return webSocketStream
|
|
}
|
|
|
|
function browserStreamBuilder (client, opts) {
|
|
debug('browserStreamBuilder')
|
|
let stream
|
|
const options = setDefaultBrowserOpts(opts)
|
|
// sets the maximum socket buffer size before throttling
|
|
const bufferSize = options.browserBufferSize || 1024 * 512
|
|
|
|
const bufferTimeout = opts.browserBufferTimeout || 1000
|
|
|
|
const coerceToBuffer = !opts.objectMode
|
|
|
|
const socket = createBrowserWebSocket(client, opts)
|
|
|
|
const proxy = buildProxy(opts, socketWriteBrowser, socketEndBrowser)
|
|
|
|
if (!opts.objectMode) {
|
|
proxy._writev = writev
|
|
}
|
|
proxy.on('close', () => { socket.close() })
|
|
|
|
const eventListenerSupport = (typeof socket.addEventListener !== 'undefined')
|
|
|
|
// was already open when passed in
|
|
if (socket.readyState === socket.OPEN) {
|
|
stream = proxy
|
|
} else {
|
|
stream = stream = duplexify(undefined, undefined, opts)
|
|
if (!opts.objectMode) {
|
|
stream._writev = writev
|
|
}
|
|
|
|
if (eventListenerSupport) {
|
|
socket.addEventListener('open', onopen)
|
|
} else {
|
|
socket.onopen = onopen
|
|
}
|
|
}
|
|
|
|
stream.socket = socket
|
|
|
|
if (eventListenerSupport) {
|
|
socket.addEventListener('close', onclose)
|
|
socket.addEventListener('error', onerror)
|
|
socket.addEventListener('message', onmessage)
|
|
} else {
|
|
socket.onclose = onclose
|
|
socket.onerror = onerror
|
|
socket.onmessage = onmessage
|
|
}
|
|
|
|
// methods for browserStreamBuilder
|
|
|
|
function buildProxy (options, socketWrite, socketEnd) {
|
|
const proxy = new Transform({
|
|
objectModeMode: options.objectMode
|
|
})
|
|
|
|
proxy._write = socketWrite
|
|
proxy._flush = socketEnd
|
|
|
|
return proxy
|
|
}
|
|
|
|
function onopen () {
|
|
stream.setReadable(proxy)
|
|
stream.setWritable(proxy)
|
|
stream.emit('connect')
|
|
}
|
|
|
|
function onclose () {
|
|
stream.end()
|
|
stream.destroy()
|
|
}
|
|
|
|
function onerror (err) {
|
|
stream.destroy(err)
|
|
}
|
|
|
|
function onmessage (event) {
|
|
let data = event.data
|
|
if (data instanceof ArrayBuffer) data = Buffer.from(data)
|
|
else data = Buffer.from(data, 'utf8')
|
|
proxy.push(data)
|
|
}
|
|
|
|
// this is to be enabled only if objectMode is false
|
|
function writev (chunks, cb) {
|
|
const buffers = new Array(chunks.length)
|
|
for (let i = 0; i < chunks.length; i++) {
|
|
if (typeof chunks[i].chunk === 'string') {
|
|
buffers[i] = Buffer.from(chunks[i], 'utf8')
|
|
} else {
|
|
buffers[i] = chunks[i].chunk
|
|
}
|
|
}
|
|
|
|
this._write(Buffer.concat(buffers), 'binary', cb)
|
|
}
|
|
|
|
function socketWriteBrowser (chunk, enc, next) {
|
|
if (socket.bufferedAmount > bufferSize) {
|
|
// throttle data until buffered amount is reduced.
|
|
setTimeout(socketWriteBrowser, bufferTimeout, chunk, enc, next)
|
|
}
|
|
|
|
if (coerceToBuffer && typeof chunk === 'string') {
|
|
chunk = Buffer.from(chunk, 'utf8')
|
|
}
|
|
|
|
try {
|
|
socket.send(chunk)
|
|
} catch (err) {
|
|
return next(err)
|
|
}
|
|
|
|
next()
|
|
}
|
|
|
|
function socketEndBrowser (done) {
|
|
socket.close()
|
|
done()
|
|
}
|
|
|
|
// end methods for browserStreamBuilder
|
|
|
|
return stream
|
|
}
|
|
|
|
if (IS_BROWSER) {
|
|
module.exports = browserStreamBuilder
|
|
} else {
|
|
module.exports = streamBuilder
|
|
}
|