TransFlow/node_modules/mqtt/lib/client.js

1898 lines
53 KiB
JavaScript

'use strict'
/**
* Module dependencies
*/
const EventEmitter = require('events').EventEmitter
const Store = require('./store')
const TopicAliasRecv = require('./topic-alias-recv')
const TopicAliasSend = require('./topic-alias-send')
const mqttPacket = require('mqtt-packet')
const DefaultMessageIdProvider = require('./default-message-id-provider')
const Writable = require('readable-stream').Writable
const inherits = require('inherits')
const reInterval = require('reinterval')
const clone = require('rfdc/default')
const validations = require('./validations')
const xtend = require('xtend')
const debug = require('debug')('mqttjs:client')
const nextTick = process ? process.nextTick : function (callback) { setTimeout(callback, 0) }
const setImmediate = global.setImmediate || function (callback) {
// works in node v0.8
nextTick(callback)
}
const defaultConnectOptions = {
keepalive: 60,
reschedulePings: true,
protocolId: 'MQTT',
protocolVersion: 4,
reconnectPeriod: 1000,
connectTimeout: 30 * 1000,
clean: true,
resubscribe: true
}
const socketErrors = [
'ECONNREFUSED',
'EADDRINUSE',
'ECONNRESET',
'ENOTFOUND'
]
// Other Socket Errors: EADDRINUSE, ECONNRESET, ENOTFOUND.
const errors = {
0: '',
1: 'Unacceptable protocol version',
2: 'Identifier rejected',
3: 'Server unavailable',
4: 'Bad username or password',
5: 'Not authorized',
16: 'No matching subscribers',
17: 'No subscription existed',
128: 'Unspecified error',
129: 'Malformed Packet',
130: 'Protocol Error',
131: 'Implementation specific error',
132: 'Unsupported Protocol Version',
133: 'Client Identifier not valid',
134: 'Bad User Name or Password',
135: 'Not authorized',
136: 'Server unavailable',
137: 'Server busy',
138: 'Banned',
139: 'Server shutting down',
140: 'Bad authentication method',
141: 'Keep Alive timeout',
142: 'Session taken over',
143: 'Topic Filter invalid',
144: 'Topic Name invalid',
145: 'Packet identifier in use',
146: 'Packet Identifier not found',
147: 'Receive Maximum exceeded',
148: 'Topic Alias invalid',
149: 'Packet too large',
150: 'Message rate too high',
151: 'Quota exceeded',
152: 'Administrative action',
153: 'Payload format invalid',
154: 'Retain not supported',
155: 'QoS not supported',
156: 'Use another server',
157: 'Server moved',
158: 'Shared Subscriptions not supported',
159: 'Connection rate exceeded',
160: 'Maximum connect time',
161: 'Subscription Identifiers not supported',
162: 'Wildcard Subscriptions not supported'
}
function defaultId () {
return 'mqttjs_' + Math.random().toString(16).substr(2, 8)
}
function applyTopicAlias (client, packet) {
if (client.options.protocolVersion === 5) {
if (packet.cmd === 'publish') {
let alias
if (packet.properties) {
alias = packet.properties.topicAlias
}
const topic = packet.topic.toString()
if (client.topicAliasSend) {
if (alias) {
if (topic.length !== 0) {
// register topic alias
debug('applyTopicAlias :: register topic: %s - alias: %d', topic, alias)
if (!client.topicAliasSend.put(topic, alias)) {
debug('applyTopicAlias :: error out of range. topic: %s - alias: %d', topic, alias)
return new Error('Sending Topic Alias out of range')
}
}
} else {
if (topic.length !== 0) {
if (client.options.autoAssignTopicAlias) {
alias = client.topicAliasSend.getAliasByTopic(topic)
if (alias) {
packet.topic = ''
packet.properties = { ...(packet.properties), topicAlias: alias }
debug('applyTopicAlias :: auto assign(use) topic: %s - alias: %d', topic, alias)
} else {
alias = client.topicAliasSend.getLruAlias()
client.topicAliasSend.put(topic, alias)
packet.properties = { ...(packet.properties), topicAlias: alias }
debug('applyTopicAlias :: auto assign topic: %s - alias: %d', topic, alias)
}
} else if (client.options.autoUseTopicAlias) {
alias = client.topicAliasSend.getAliasByTopic(topic)
if (alias) {
packet.topic = ''
packet.properties = { ...(packet.properties), topicAlias: alias }
debug('applyTopicAlias :: auto use topic: %s - alias: %d', topic, alias)
}
}
}
}
} else if (alias) {
debug('applyTopicAlias :: error out of range. topic: %s - alias: %d', topic, alias)
return new Error('Sending Topic Alias out of range')
}
}
}
}
function removeTopicAliasAndRecoverTopicName (client, packet) {
let alias
if (packet.properties) {
alias = packet.properties.topicAlias
}
let topic = packet.topic.toString()
if (topic.length === 0) {
// restore topic from alias
if (typeof alias === 'undefined') {
return new Error('Unregistered Topic Alias')
} else {
topic = client.topicAliasSend.getTopicByAlias(alias)
if (typeof topic === 'undefined') {
return new Error('Unregistered Topic Alias')
} else {
packet.topic = topic
}
}
}
if (alias) {
delete packet.properties.topicAlias
}
}
function sendPacket (client, packet, cb) {
debug('sendPacket :: packet: %O', packet)
debug('sendPacket :: emitting `packetsend`')
client.emit('packetsend', packet)
debug('sendPacket :: writing to stream')
const result = mqttPacket.writeToStream(packet, client.stream, client.options)
debug('sendPacket :: writeToStream result %s', result)
if (!result && cb && cb !== nop) {
debug('sendPacket :: handle events on `drain` once through callback.')
client.stream.once('drain', cb)
} else if (cb) {
debug('sendPacket :: invoking cb')
cb()
}
}
function flush (queue) {
if (queue) {
debug('flush: queue exists? %b', !!(queue))
Object.keys(queue).forEach(function (messageId) {
if (typeof queue[messageId].cb === 'function') {
queue[messageId].cb(new Error('Connection closed'))
// This is suspicious. Why do we only delete this if we have a callbck?
// If this is by-design, then adding no as callback would cause this to get deleted unintentionally.
delete queue[messageId]
}
})
}
}
function flushVolatile (queue) {
if (queue) {
debug('flushVolatile :: deleting volatile messages from the queue and setting their callbacks as error function')
Object.keys(queue).forEach(function (messageId) {
if (queue[messageId].volatile && typeof queue[messageId].cb === 'function') {
queue[messageId].cb(new Error('Connection closed'))
delete queue[messageId]
}
})
}
}
function storeAndSend (client, packet, cb, cbStorePut) {
debug('storeAndSend :: store packet with cmd %s to outgoingStore', packet.cmd)
let storePacket = packet
let err
if (storePacket.cmd === 'publish') {
// The original packet is for sending.
// The cloned storePacket is for storing to resend on reconnect.
// Topic Alias must not be used after disconnected.
storePacket = clone(packet)
err = removeTopicAliasAndRecoverTopicName(client, storePacket)
if (err) {
return cb && cb(err)
}
}
client.outgoingStore.put(storePacket, function storedPacket (err) {
if (err) {
return cb && cb(err)
}
cbStorePut()
sendPacket(client, packet, cb)
})
}
function nop (error) {
debug('nop ::', error)
}
/**
* MqttClient constructor
*
* @param {Stream} stream - stream
* @param {Object} [options] - connection options
* (see Connection#connect)
*/
function MqttClient (streamBuilder, options) {
let k
const that = this
if (!(this instanceof MqttClient)) {
return new MqttClient(streamBuilder, options)
}
this.options = options || {}
// Defaults
for (k in defaultConnectOptions) {
if (typeof this.options[k] === 'undefined') {
this.options[k] = defaultConnectOptions[k]
} else {
this.options[k] = options[k]
}
}
debug('MqttClient :: options.protocol', options.protocol)
debug('MqttClient :: options.protocolVersion', options.protocolVersion)
debug('MqttClient :: options.username', options.username)
debug('MqttClient :: options.keepalive', options.keepalive)
debug('MqttClient :: options.reconnectPeriod', options.reconnectPeriod)
debug('MqttClient :: options.rejectUnauthorized', options.rejectUnauthorized)
debug('MqttClient :: options.topicAliasMaximum', options.topicAliasMaximum)
this.options.clientId = (typeof options.clientId === 'string') ? options.clientId : defaultId()
debug('MqttClient :: clientId', this.options.clientId)
this.options.customHandleAcks = (options.protocolVersion === 5 && options.customHandleAcks) ? options.customHandleAcks : function () { arguments[3](0) }
this.streamBuilder = streamBuilder
this.messageIdProvider = (typeof this.options.messageIdProvider === 'undefined') ? new DefaultMessageIdProvider() : this.options.messageIdProvider
// Inflight message storages
this.outgoingStore = options.outgoingStore || new Store()
this.incomingStore = options.incomingStore || new Store()
// Should QoS zero messages be queued when the connection is broken?
this.queueQoSZero = options.queueQoSZero === undefined ? true : options.queueQoSZero
// map of subscribed topics to support reconnection
this._resubscribeTopics = {}
// map of a subscribe messageId and a topic
this.messageIdToTopic = {}
// Ping timer, setup in _setupPingTimer
this.pingTimer = null
// Is the client connected?
this.connected = false
// Are we disconnecting?
this.disconnecting = false
// Packet queue
this.queue = []
// connack timer
this.connackTimer = null
// Reconnect timer
this.reconnectTimer = null
// Is processing store?
this._storeProcessing = false
// Packet Ids are put into the store during store processing
this._packetIdsDuringStoreProcessing = {}
// Store processing queue
this._storeProcessingQueue = []
// Inflight callbacks
this.outgoing = {}
// True if connection is first time.
this._firstConnection = true
if (options.topicAliasMaximum > 0) {
if (options.topicAliasMaximum > 0xffff) {
debug('MqttClient :: options.topicAliasMaximum is out of range')
} else {
this.topicAliasRecv = new TopicAliasRecv(options.topicAliasMaximum)
}
}
// Send queued packets
this.on('connect', function () {
const queue = this.queue
function deliver () {
const entry = queue.shift()
debug('deliver :: entry %o', entry)
let packet = null
if (!entry) {
that._resubscribe()
return
}
packet = entry.packet
debug('deliver :: call _sendPacket for %o', packet)
let send = true
if (packet.messageId && packet.messageId !== 0) {
if (!that.messageIdProvider.register(packet.messageId)) {
send = false
}
}
if (send) {
that._sendPacket(
packet,
function (err) {
if (entry.cb) {
entry.cb(err)
}
deliver()
}
)
} else {
debug('messageId: %d has already used. The message is skipped and removed.', packet.messageId)
deliver()
}
}
debug('connect :: sending queued packets')
deliver()
})
this.on('close', function () {
debug('close :: connected set to `false`')
this.connected = false
debug('close :: clearing connackTimer')
clearTimeout(this.connackTimer)
debug('close :: clearing ping timer')
if (that.pingTimer !== null) {
that.pingTimer.clear()
that.pingTimer = null
}
if (this.topicAliasRecv) {
this.topicAliasRecv.clear()
}
debug('close :: calling _setupReconnect')
this._setupReconnect()
})
EventEmitter.call(this)
debug('MqttClient :: setting up stream')
this._setupStream()
}
inherits(MqttClient, EventEmitter)
/**
* setup the event handlers in the inner stream.
*
* @api private
*/
MqttClient.prototype._setupStream = function () {
const that = this
const writable = new Writable()
const parser = mqttPacket.parser(this.options)
let completeParse = null
const packets = []
debug('_setupStream :: calling method to clear reconnect')
this._clearReconnect()
debug('_setupStream :: using streamBuilder provided to client to create stream')
this.stream = this.streamBuilder(this)
parser.on('packet', function (packet) {
debug('parser :: on packet push to packets array.')
packets.push(packet)
})
function nextTickWork () {
if (packets.length) {
nextTick(work)
} else {
const done = completeParse
completeParse = null
done()
}
}
function work () {
debug('work :: getting next packet in queue')
const packet = packets.shift()
if (packet) {
debug('work :: packet pulled from queue')
that._handlePacket(packet, nextTickWork)
} else {
debug('work :: no packets in queue')
const done = completeParse
completeParse = null
debug('work :: done flag is %s', !!(done))
if (done) done()
}
}
writable._write = function (buf, enc, done) {
completeParse = done
debug('writable stream :: parsing buffer')
parser.parse(buf)
work()
}
function streamErrorHandler (error) {
debug('streamErrorHandler :: error', error.message)
if (socketErrors.includes(error.code)) {
// handle error
debug('streamErrorHandler :: emitting error')
that.emit('error', error)
} else {
nop(error)
}
}
debug('_setupStream :: pipe stream to writable stream')
this.stream.pipe(writable)
// Suppress connection errors
this.stream.on('error', streamErrorHandler)
// Echo stream close
this.stream.on('close', function () {
debug('(%s)stream :: on close', that.options.clientId)
flushVolatile(that.outgoing)
debug('stream: emit close to MqttClient')
that.emit('close')
})
// Send a connect packet
debug('_setupStream: sending packet `connect`')
const connectPacket = Object.create(this.options)
connectPacket.cmd = 'connect'
if (this.topicAliasRecv) {
if (!connectPacket.properties) {
connectPacket.properties = {}
}
if (this.topicAliasRecv) {
connectPacket.properties.topicAliasMaximum = this.topicAliasRecv.max
}
}
// avoid message queue
sendPacket(this, connectPacket)
// Echo connection errors
parser.on('error', this.emit.bind(this, 'error'))
// auth
if (this.options.properties) {
if (!this.options.properties.authenticationMethod && this.options.properties.authenticationData) {
that.end(() =>
this.emit('error', new Error('Packet has no Authentication Method')
))
return this
}
if (this.options.properties.authenticationMethod && this.options.authPacket && typeof this.options.authPacket === 'object') {
const authPacket = xtend({ cmd: 'auth', reasonCode: 0 }, this.options.authPacket)
sendPacket(this, authPacket)
}
}
// many drain listeners are needed for qos 1 callbacks if the connection is intermittent
this.stream.setMaxListeners(1000)
clearTimeout(this.connackTimer)
this.connackTimer = setTimeout(function () {
debug('!!connectTimeout hit!! Calling _cleanUp with force `true`')
that._cleanUp(true)
}, this.options.connectTimeout)
}
MqttClient.prototype._handlePacket = function (packet, done) {
const options = this.options
if (options.protocolVersion === 5 && options.properties && options.properties.maximumPacketSize && options.properties.maximumPacketSize < packet.length) {
this.emit('error', new Error('exceeding packets size ' + packet.cmd))
this.end({ reasonCode: 149, properties: { reasonString: 'Maximum packet size was exceeded' } })
return this
}
debug('_handlePacket :: emitting packetreceive')
this.emit('packetreceive', packet)
switch (packet.cmd) {
case 'publish':
this._handlePublish(packet, done)
break
case 'puback':
case 'pubrec':
case 'pubcomp':
case 'suback':
case 'unsuback':
this._handleAck(packet)
done()
break
case 'pubrel':
this._handlePubrel(packet, done)
break
case 'connack':
this._handleConnack(packet)
done()
break
case 'auth':
this._handleAuth(packet)
done()
break
case 'pingresp':
this._handlePingresp(packet)
done()
break
case 'disconnect':
this._handleDisconnect(packet)
done()
break
default:
// do nothing
// maybe we should do an error handling
// or just log it
break
}
}
MqttClient.prototype._checkDisconnecting = function (callback) {
if (this.disconnecting) {
if (callback && callback !== nop) {
callback(new Error('client disconnecting'))
} else {
this.emit('error', new Error('client disconnecting'))
}
}
return this.disconnecting
}
/**
* publish - publish <message> to <topic>
*
* @param {String} topic - topic to publish to
* @param {String, Buffer} message - message to publish
* @param {Object} [opts] - publish options, includes:
* {Number} qos - qos level to publish on
* {Boolean} retain - whether or not to retain the message
* {Boolean} dup - whether or not mark a message as duplicate
* {Function} cbStorePut - function(){} called when message is put into `outgoingStore`
* @param {Function} [callback] - function(err){}
* called when publish succeeds or fails
* @returns {MqttClient} this - for chaining
* @api public
*
* @example client.publish('topic', 'message');
* @example
* client.publish('topic', 'message', {qos: 1, retain: true, dup: true});
* @example client.publish('topic', 'message', console.log);
*/
MqttClient.prototype.publish = function (topic, message, opts, callback) {
debug('publish :: message `%s` to topic `%s`', message, topic)
const options = this.options
// .publish(topic, payload, cb);
if (typeof opts === 'function') {
callback = opts
opts = null
}
// default opts
const defaultOpts = { qos: 0, retain: false, dup: false }
opts = xtend(defaultOpts, opts)
if (this._checkDisconnecting(callback)) {
return this
}
const that = this
const publishProc = function () {
let messageId = 0
if (opts.qos === 1 || opts.qos === 2) {
messageId = that._nextId()
if (messageId === null) {
debug('No messageId left')
return false
}
}
const packet = {
cmd: 'publish',
topic: topic,
payload: message,
qos: opts.qos,
retain: opts.retain,
messageId: messageId,
dup: opts.dup
}
if (options.protocolVersion === 5) {
packet.properties = opts.properties
}
debug('publish :: qos', opts.qos)
switch (opts.qos) {
case 1:
case 2:
// Add to callbacks
that.outgoing[packet.messageId] = {
volatile: false,
cb: callback || nop
}
debug('MqttClient:publish: packet cmd: %s', packet.cmd)
that._sendPacket(packet, undefined, opts.cbStorePut)
break
default:
debug('MqttClient:publish: packet cmd: %s', packet.cmd)
that._sendPacket(packet, callback, opts.cbStorePut)
break
}
return true
}
if (this._storeProcessing || this._storeProcessingQueue.length > 0 || !publishProc()) {
this._storeProcessingQueue.push(
{
invoke: publishProc,
cbStorePut: opts.cbStorePut,
callback: callback
}
)
}
return this
}
/**
* subscribe - subscribe to <topic>
*
* @param {String, Array, Object} topic - topic(s) to subscribe to, supports objects in the form {'topic': qos}
* @param {Object} [opts] - optional subscription options, includes:
* {Number} qos - subscribe qos level
* @param {Function} [callback] - function(err, granted){} where:
* {Error} err - subscription error (none at the moment!)
* {Array} granted - array of {topic: 't', qos: 0}
* @returns {MqttClient} this - for chaining
* @api public
* @example client.subscribe('topic');
* @example client.subscribe('topic', {qos: 1});
* @example client.subscribe({'topic': {qos: 0}, 'topic2': {qos: 1}}, console.log);
* @example client.subscribe('topic', console.log);
*/
MqttClient.prototype.subscribe = function () {
const that = this
const args = new Array(arguments.length)
for (let i = 0; i < arguments.length; i++) {
args[i] = arguments[i]
}
const subs = []
let obj = args.shift()
const resubscribe = obj.resubscribe
let callback = args.pop() || nop
let opts = args.pop()
const version = this.options.protocolVersion
delete obj.resubscribe
if (typeof obj === 'string') {
obj = [obj]
}
if (typeof callback !== 'function') {
opts = callback
callback = nop
}
const invalidTopic = validations.validateTopics(obj)
if (invalidTopic !== null) {
setImmediate(callback, new Error('Invalid topic ' + invalidTopic))
return this
}
if (this._checkDisconnecting(callback)) {
debug('subscribe: discconecting true')
return this
}
const defaultOpts = {
qos: 0
}
if (version === 5) {
defaultOpts.nl = false
defaultOpts.rap = false
defaultOpts.rh = 0
}
opts = xtend(defaultOpts, opts)
if (Array.isArray(obj)) {
obj.forEach(function (topic) {
debug('subscribe: array topic %s', topic)
if (!Object.prototype.hasOwnProperty.call(that._resubscribeTopics, topic) ||
that._resubscribeTopics[topic].qos < opts.qos ||
resubscribe) {
const currentOpts = {
topic: topic,
qos: opts.qos
}
if (version === 5) {
currentOpts.nl = opts.nl
currentOpts.rap = opts.rap
currentOpts.rh = opts.rh
currentOpts.properties = opts.properties
}
debug('subscribe: pushing topic `%s` and qos `%s` to subs list', currentOpts.topic, currentOpts.qos)
subs.push(currentOpts)
}
})
} else {
Object
.keys(obj)
.forEach(function (k) {
debug('subscribe: object topic %s', k)
if (!Object.prototype.hasOwnProperty.call(that._resubscribeTopics, k) ||
that._resubscribeTopics[k].qos < obj[k].qos ||
resubscribe) {
const currentOpts = {
topic: k,
qos: obj[k].qos
}
if (version === 5) {
currentOpts.nl = obj[k].nl
currentOpts.rap = obj[k].rap
currentOpts.rh = obj[k].rh
currentOpts.properties = opts.properties
}
debug('subscribe: pushing `%s` to subs list', currentOpts)
subs.push(currentOpts)
}
})
}
if (!subs.length) {
callback(null, [])
return this
}
const subscribeProc = function () {
const messageId = that._nextId()
if (messageId === null) {
debug('No messageId left')
return false
}
const packet = {
cmd: 'subscribe',
subscriptions: subs,
qos: 1,
retain: false,
dup: false,
messageId: messageId
}
if (opts.properties) {
packet.properties = opts.properties
}
// subscriptions to resubscribe to in case of disconnect
if (that.options.resubscribe) {
debug('subscribe :: resubscribe true')
const topics = []
subs.forEach(function (sub) {
if (that.options.reconnectPeriod > 0) {
const topic = { qos: sub.qos }
if (version === 5) {
topic.nl = sub.nl || false
topic.rap = sub.rap || false
topic.rh = sub.rh || 0
topic.properties = sub.properties
}
that._resubscribeTopics[sub.topic] = topic
topics.push(sub.topic)
}
})
that.messageIdToTopic[packet.messageId] = topics
}
that.outgoing[packet.messageId] = {
volatile: true,
cb: function (err, packet) {
if (!err) {
const granted = packet.granted
for (let i = 0; i < granted.length; i += 1) {
subs[i].qos = granted[i]
}
}
callback(err, subs)
}
}
debug('subscribe :: call _sendPacket')
that._sendPacket(packet)
return true
}
if (this._storeProcessing || this._storeProcessingQueue.length > 0 || !subscribeProc()) {
this._storeProcessingQueue.push(
{
invoke: subscribeProc,
callback: callback
}
)
}
return this
}
/**
* unsubscribe - unsubscribe from topic(s)
*
* @param {String, Array} topic - topics to unsubscribe from
* @param {Object} [opts] - optional subscription options, includes:
* {Object} properties - properties of unsubscribe packet
* @param {Function} [callback] - callback fired on unsuback
* @returns {MqttClient} this - for chaining
* @api public
* @example client.unsubscribe('topic');
* @example client.unsubscribe('topic', console.log);
*/
MqttClient.prototype.unsubscribe = function () {
const that = this
const args = new Array(arguments.length)
for (let i = 0; i < arguments.length; i++) {
args[i] = arguments[i]
}
let topic = args.shift()
let callback = args.pop() || nop
let opts = args.pop()
if (typeof topic === 'string') {
topic = [topic]
}
if (typeof callback !== 'function') {
opts = callback
callback = nop
}
const invalidTopic = validations.validateTopics(topic)
if (invalidTopic !== null) {
setImmediate(callback, new Error('Invalid topic ' + invalidTopic))
return this
}
if (that._checkDisconnecting(callback)) {
return this
}
const unsubscribeProc = function () {
const messageId = that._nextId()
if (messageId === null) {
debug('No messageId left')
return false
}
const packet = {
cmd: 'unsubscribe',
qos: 1,
messageId: messageId
}
if (typeof topic === 'string') {
packet.unsubscriptions = [topic]
} else if (Array.isArray(topic)) {
packet.unsubscriptions = topic
}
if (that.options.resubscribe) {
packet.unsubscriptions.forEach(function (topic) {
delete that._resubscribeTopics[topic]
})
}
if (typeof opts === 'object' && opts.properties) {
packet.properties = opts.properties
}
that.outgoing[packet.messageId] = {
volatile: true,
cb: callback
}
debug('unsubscribe: call _sendPacket')
that._sendPacket(packet)
return true
}
if (this._storeProcessing || this._storeProcessingQueue.length > 0 || !unsubscribeProc()) {
this._storeProcessingQueue.push(
{
invoke: unsubscribeProc,
callback: callback
}
)
}
return this
}
/**
* end - close connection
*
* @returns {MqttClient} this - for chaining
* @param {Boolean} force - do not wait for all in-flight messages to be acked
* @param {Object} opts - added to the disconnect packet
* @param {Function} cb - called when the client has been closed
*
* @api public
*/
MqttClient.prototype.end = function (force, opts, cb) {
const that = this
debug('end :: (%s)', this.options.clientId)
if (force == null || typeof force !== 'boolean') {
cb = opts || nop
opts = force
force = false
if (typeof opts !== 'object') {
cb = opts
opts = null
if (typeof cb !== 'function') {
cb = nop
}
}
}
if (typeof opts !== 'object') {
cb = opts
opts = null
}
debug('end :: cb? %s', !!cb)
cb = cb || nop
function closeStores () {
debug('end :: closeStores: closing incoming and outgoing stores')
that.disconnected = true
that.incomingStore.close(function (e1) {
that.outgoingStore.close(function (e2) {
debug('end :: closeStores: emitting end')
that.emit('end')
if (cb) {
const err = e1 || e2
debug('end :: closeStores: invoking callback with args')
cb(err)
}
})
})
if (that._deferredReconnect) {
that._deferredReconnect()
}
}
function finish () {
// defer closesStores of an I/O cycle,
// just to make sure things are
// ok for websockets
debug('end :: (%s) :: finish :: calling _cleanUp with force %s', that.options.clientId, force)
that._cleanUp(force, () => {
debug('end :: finish :: calling process.nextTick on closeStores')
// const boundProcess = nextTick.bind(null, closeStores)
nextTick(closeStores.bind(that))
}, opts)
}
if (this.disconnecting) {
cb()
return this
}
this._clearReconnect()
this.disconnecting = true
if (!force && Object.keys(this.outgoing).length > 0) {
// wait 10ms, just to be sure we received all of it
debug('end :: (%s) :: calling finish in 10ms once outgoing is empty', that.options.clientId)
this.once('outgoingEmpty', setTimeout.bind(null, finish, 10))
} else {
debug('end :: (%s) :: immediately calling finish', that.options.clientId)
finish()
}
return this
}
/**
* removeOutgoingMessage - remove a message in outgoing store
* the outgoing callback will be called withe Error('Message removed') if the message is removed
*
* @param {Number} messageId - messageId to remove message
* @returns {MqttClient} this - for chaining
* @api public
*
* @example client.removeOutgoingMessage(client.getLastAllocated());
*/
MqttClient.prototype.removeOutgoingMessage = function (messageId) {
const cb = this.outgoing[messageId] ? this.outgoing[messageId].cb : null
delete this.outgoing[messageId]
this.outgoingStore.del({ messageId: messageId }, function () {
cb(new Error('Message removed'))
})
return this
}
/**
* reconnect - connect again using the same options as connect()
*
* @param {Object} [opts] - optional reconnect options, includes:
* {Store} incomingStore - a store for the incoming packets
* {Store} outgoingStore - a store for the outgoing packets
* if opts is not given, current stores are used
* @returns {MqttClient} this - for chaining
*
* @api public
*/
MqttClient.prototype.reconnect = function (opts) {
debug('client reconnect')
const that = this
const f = function () {
if (opts) {
that.options.incomingStore = opts.incomingStore
that.options.outgoingStore = opts.outgoingStore
} else {
that.options.incomingStore = null
that.options.outgoingStore = null
}
that.incomingStore = that.options.incomingStore || new Store()
that.outgoingStore = that.options.outgoingStore || new Store()
that.disconnecting = false
that.disconnected = false
that._deferredReconnect = null
that._reconnect()
}
if (this.disconnecting && !this.disconnected) {
this._deferredReconnect = f
} else {
f()
}
return this
}
/**
* _reconnect - implement reconnection
* @api privateish
*/
MqttClient.prototype._reconnect = function () {
debug('_reconnect: emitting reconnect to client')
this.emit('reconnect')
if (this.connected) {
this.end(() => { this._setupStream() })
debug('client already connected. disconnecting first.')
} else {
debug('_reconnect: calling _setupStream')
this._setupStream()
}
}
/**
* _setupReconnect - setup reconnect timer
*/
MqttClient.prototype._setupReconnect = function () {
const that = this
if (!that.disconnecting && !that.reconnectTimer && (that.options.reconnectPeriod > 0)) {
if (!this.reconnecting) {
debug('_setupReconnect :: emit `offline` state')
this.emit('offline')
debug('_setupReconnect :: set `reconnecting` to `true`')
this.reconnecting = true
}
debug('_setupReconnect :: setting reconnectTimer for %d ms', that.options.reconnectPeriod)
that.reconnectTimer = setInterval(function () {
debug('reconnectTimer :: reconnect triggered!')
that._reconnect()
}, that.options.reconnectPeriod)
} else {
debug('_setupReconnect :: doing nothing...')
}
}
/**
* _clearReconnect - clear the reconnect timer
*/
MqttClient.prototype._clearReconnect = function () {
debug('_clearReconnect : clearing reconnect timer')
if (this.reconnectTimer) {
clearInterval(this.reconnectTimer)
this.reconnectTimer = null
}
}
/**
* _cleanUp - clean up on connection end
* @api private
*/
MqttClient.prototype._cleanUp = function (forced, done) {
const opts = arguments[2]
if (done) {
debug('_cleanUp :: done callback provided for on stream close')
this.stream.on('close', done)
}
debug('_cleanUp :: forced? %s', forced)
if (forced) {
if ((this.options.reconnectPeriod === 0) && this.options.clean) {
flush(this.outgoing)
}
debug('_cleanUp :: (%s) :: destroying stream', this.options.clientId)
this.stream.destroy()
} else {
const packet = xtend({ cmd: 'disconnect' }, opts)
debug('_cleanUp :: (%s) :: call _sendPacket with disconnect packet', this.options.clientId)
this._sendPacket(
packet,
setImmediate.bind(
null,
this.stream.end.bind(this.stream)
)
)
}
if (!this.disconnecting) {
debug('_cleanUp :: client not disconnecting. Clearing and resetting reconnect.')
this._clearReconnect()
this._setupReconnect()
}
if (this.pingTimer !== null) {
debug('_cleanUp :: clearing pingTimer')
this.pingTimer.clear()
this.pingTimer = null
}
if (done && !this.connected) {
debug('_cleanUp :: (%s) :: removing stream `done` callback `close` listener', this.options.clientId)
this.stream.removeListener('close', done)
done()
}
}
/**
* _sendPacket - send or queue a packet
* @param {Object} packet - packet options
* @param {Function} cb - callback when the packet is sent
* @param {Function} cbStorePut - called when message is put into outgoingStore
* @api private
*/
MqttClient.prototype._sendPacket = function (packet, cb, cbStorePut) {
debug('_sendPacket :: (%s) :: start', this.options.clientId)
cbStorePut = cbStorePut || nop
cb = cb || nop
const err = applyTopicAlias(this, packet)
if (err) {
cb(err)
return
}
if (!this.connected) {
// allow auth packets to be sent while authenticating with the broker (mqtt5 enhanced auth)
if (packet.cmd === 'auth') {
this._shiftPingInterval()
sendPacket(this, packet, cb)
return
}
debug('_sendPacket :: client not connected. Storing packet offline.')
this._storePacket(packet, cb, cbStorePut)
return
}
// When sending a packet, reschedule the ping timer
this._shiftPingInterval()
switch (packet.cmd) {
case 'publish':
break
case 'pubrel':
storeAndSend(this, packet, cb, cbStorePut)
return
default:
sendPacket(this, packet, cb)
return
}
switch (packet.qos) {
case 2:
case 1:
storeAndSend(this, packet, cb, cbStorePut)
break
/**
* no need of case here since it will be caught by default
* and jshint comply that before default it must be a break
* anyway it will result in -1 evaluation
*/
case 0:
/* falls through */
default:
sendPacket(this, packet, cb)
break
}
debug('_sendPacket :: (%s) :: end', this.options.clientId)
}
/**
* _storePacket - queue a packet
* @param {Object} packet - packet options
* @param {Function} cb - callback when the packet is sent
* @param {Function} cbStorePut - called when message is put into outgoingStore
* @api private
*/
MqttClient.prototype._storePacket = function (packet, cb, cbStorePut) {
debug('_storePacket :: packet: %o', packet)
debug('_storePacket :: cb? %s', !!cb)
cbStorePut = cbStorePut || nop
let storePacket = packet
if (storePacket.cmd === 'publish') {
// The original packet is for sending.
// The cloned storePacket is for storing to resend on reconnect.
// Topic Alias must not be used after disconnected.
storePacket = clone(packet)
const err = removeTopicAliasAndRecoverTopicName(this, storePacket)
if (err) {
return cb && cb(err)
}
}
// check that the packet is not a qos of 0, or that the command is not a publish
if (((storePacket.qos || 0) === 0 && this.queueQoSZero) || storePacket.cmd !== 'publish') {
this.queue.push({ packet: storePacket, cb: cb })
} else if (storePacket.qos > 0) {
cb = this.outgoing[storePacket.messageId] ? this.outgoing[storePacket.messageId].cb : null
this.outgoingStore.put(storePacket, function (err) {
if (err) {
return cb && cb(err)
}
cbStorePut()
})
} else if (cb) {
cb(new Error('No connection to broker'))
}
}
/**
* _setupPingTimer - setup the ping timer
*
* @api private
*/
MqttClient.prototype._setupPingTimer = function () {
debug('_setupPingTimer :: keepalive %d (seconds)', this.options.keepalive)
const that = this
if (!this.pingTimer && this.options.keepalive) {
this.pingResp = true
this.pingTimer = reInterval(function () {
that._checkPing()
}, this.options.keepalive * 1000)
}
}
/**
* _shiftPingInterval - reschedule the ping interval
*
* @api private
*/
MqttClient.prototype._shiftPingInterval = function () {
if (this.pingTimer && this.options.keepalive && this.options.reschedulePings) {
this.pingTimer.reschedule(this.options.keepalive * 1000)
}
}
/**
* _checkPing - check if a pingresp has come back, and ping the server again
*
* @api private
*/
MqttClient.prototype._checkPing = function () {
debug('_checkPing :: checking ping...')
if (this.pingResp) {
debug('_checkPing :: ping response received. Clearing flag and sending `pingreq`')
this.pingResp = false
this._sendPacket({ cmd: 'pingreq' })
} else {
// do a forced cleanup since socket will be in bad shape
debug('_checkPing :: calling _cleanUp with force true')
this._cleanUp(true)
}
}
/**
* _handlePingresp - handle a pingresp
*
* @api private
*/
MqttClient.prototype._handlePingresp = function () {
this.pingResp = true
}
/**
* _handleConnack
*
* @param {Object} packet
* @api private
*/
MqttClient.prototype._handleConnack = function (packet) {
debug('_handleConnack')
const options = this.options
const version = options.protocolVersion
const rc = version === 5 ? packet.reasonCode : packet.returnCode
clearTimeout(this.connackTimer)
delete this.topicAliasSend
if (packet.properties) {
if (packet.properties.topicAliasMaximum) {
if (packet.properties.topicAliasMaximum > 0xffff) {
this.emit('error', new Error('topicAliasMaximum from broker is out of range'))
return
}
if (packet.properties.topicAliasMaximum > 0) {
this.topicAliasSend = new TopicAliasSend(packet.properties.topicAliasMaximum)
}
}
if (packet.properties.serverKeepAlive && options.keepalive) {
options.keepalive = packet.properties.serverKeepAlive
this._shiftPingInterval()
}
if (packet.properties.maximumPacketSize) {
if (!options.properties) { options.properties = {} }
options.properties.maximumPacketSize = packet.properties.maximumPacketSize
}
}
if (rc === 0) {
this.reconnecting = false
this._onConnect(packet)
} else if (rc > 0) {
const err = new Error('Connection refused: ' + errors[rc])
err.code = rc
this.emit('error', err)
}
}
MqttClient.prototype._handleAuth = function (packet) {
const options = this.options
const version = options.protocolVersion
const rc = version === 5 ? packet.reasonCode : packet.returnCode
if (version !== 5) {
const err = new Error('Protocol error: Auth packets are only supported in MQTT 5. Your version:' + version)
err.code = rc
this.emit('error', err)
return
}
const that = this
this.handleAuth(packet, function (err, packet) {
if (err) {
that.emit('error', err)
return
}
if (rc === 24) {
that.reconnecting = false
that._sendPacket(packet)
} else {
const error = new Error('Connection refused: ' + errors[rc])
err.code = rc
that.emit('error', error)
}
})
}
/**
* @param packet the packet received by the broker
* @return the auth packet to be returned to the broker
* @api public
*/
MqttClient.prototype.handleAuth = function (packet, callback) {
callback()
}
/**
* _handlePublish
*
* @param {Object} packet
* @api private
*/
/*
those late 2 case should be rewrite to comply with coding style:
case 1:
case 0:
// do not wait sending a puback
// no callback passed
if (1 === qos) {
this._sendPacket({
cmd: 'puback',
messageId: messageId
});
}
// emit the message event for both qos 1 and 0
this.emit('message', topic, message, packet);
this.handleMessage(packet, done);
break;
default:
// do nothing but every switch mus have a default
// log or throw an error about unknown qos
break;
for now i just suppressed the warnings
*/
MqttClient.prototype._handlePublish = function (packet, done) {
debug('_handlePublish: packet %o', packet)
done = typeof done !== 'undefined' ? done : nop
let topic = packet.topic.toString()
const message = packet.payload
const qos = packet.qos
const messageId = packet.messageId
const that = this
const options = this.options
const validReasonCodes = [0, 16, 128, 131, 135, 144, 145, 151, 153]
if (this.options.protocolVersion === 5) {
let alias
if (packet.properties) {
alias = packet.properties.topicAlias
}
if (typeof alias !== 'undefined') {
if (topic.length === 0) {
if (alias > 0 && alias <= 0xffff) {
const gotTopic = this.topicAliasRecv.getTopicByAlias(alias)
if (gotTopic) {
topic = gotTopic
debug('_handlePublish :: topic complemented by alias. topic: %s - alias: %d', topic, alias)
} else {
debug('_handlePublish :: unregistered topic alias. alias: %d', alias)
this.emit('error', new Error('Received unregistered Topic Alias'))
return
}
} else {
debug('_handlePublish :: topic alias out of range. alias: %d', alias)
this.emit('error', new Error('Received Topic Alias is out of range'))
return
}
} else {
if (this.topicAliasRecv.put(topic, alias)) {
debug('_handlePublish :: registered topic: %s - alias: %d', topic, alias)
} else {
debug('_handlePublish :: topic alias out of range. alias: %d', alias)
this.emit('error', new Error('Received Topic Alias is out of range'))
return
}
}
}
}
debug('_handlePublish: qos %d', qos)
switch (qos) {
case 2: {
options.customHandleAcks(topic, message, packet, function (error, code) {
if (!(error instanceof Error)) {
code = error
error = null
}
if (error) { return that.emit('error', error) }
if (validReasonCodes.indexOf(code) === -1) { return that.emit('error', new Error('Wrong reason code for pubrec')) }
if (code) {
that._sendPacket({ cmd: 'pubrec', messageId: messageId, reasonCode: code }, done)
} else {
that.incomingStore.put(packet, function () {
that._sendPacket({ cmd: 'pubrec', messageId: messageId }, done)
})
}
})
break
}
case 1: {
// emit the message event
options.customHandleAcks(topic, message, packet, function (error, code) {
if (!(error instanceof Error)) {
code = error
error = null
}
if (error) { return that.emit('error', error) }
if (validReasonCodes.indexOf(code) === -1) { return that.emit('error', new Error('Wrong reason code for puback')) }
if (!code) { that.emit('message', topic, message, packet) }
that.handleMessage(packet, function (err) {
if (err) {
return done && done(err)
}
that._sendPacket({ cmd: 'puback', messageId: messageId, reasonCode: code }, done)
})
})
break
}
case 0:
// emit the message event
this.emit('message', topic, message, packet)
this.handleMessage(packet, done)
break
default:
// do nothing
debug('_handlePublish: unknown QoS. Doing nothing.')
// log or throw an error about unknown qos
break
}
}
/**
* Handle messages with backpressure support, one at a time.
* Override at will.
*
* @param Packet packet the packet
* @param Function callback call when finished
* @api public
*/
MqttClient.prototype.handleMessage = function (packet, callback) {
callback()
}
/**
* _handleAck
*
* @param {Object} packet
* @api private
*/
MqttClient.prototype._handleAck = function (packet) {
/* eslint no-fallthrough: "off" */
const messageId = packet.messageId
const type = packet.cmd
let response = null
const cb = this.outgoing[messageId] ? this.outgoing[messageId].cb : null
const that = this
let err
// Checking `!cb` happens to work, but it's not technically "correct".
//
// Why? This code assumes that "no callback" is the same as that "we're not
// waiting for responses" (puback, pubrec, pubcomp, suback, or unsuback).
//
// It would be better to check `if (!this.outgoing[messageId])` here, but
// there's no reason to change it and risk (another) regression.
//
// The only reason this code works is becaues code in MqttClient.publish,
// MqttClinet.subscribe, and MqttClient.unsubscribe ensures that we will
// have a callback even if the user doesn't pass one in.)
if (!cb) {
debug('_handleAck :: Server sent an ack in error. Ignoring.')
// Server sent an ack in error, ignore it.
return
}
// Process
debug('_handleAck :: packet type', type)
switch (type) {
case 'pubcomp':
// same thing as puback for QoS 2
case 'puback': {
const pubackRC = packet.reasonCode
// Callback - we're done
if (pubackRC && pubackRC > 0 && pubackRC !== 16) {
err = new Error('Publish error: ' + errors[pubackRC])
err.code = pubackRC
cb(err, packet)
}
delete this.outgoing[messageId]
this.outgoingStore.del(packet, cb)
this.messageIdProvider.deallocate(messageId)
this._invokeStoreProcessingQueue()
break
}
case 'pubrec': {
response = {
cmd: 'pubrel',
qos: 2,
messageId: messageId
}
const pubrecRC = packet.reasonCode
if (pubrecRC && pubrecRC > 0 && pubrecRC !== 16) {
err = new Error('Publish error: ' + errors[pubrecRC])
err.code = pubrecRC
cb(err, packet)
} else {
this._sendPacket(response)
}
break
}
case 'suback': {
delete this.outgoing[messageId]
this.messageIdProvider.deallocate(messageId)
for (let grantedI = 0; grantedI < packet.granted.length; grantedI++) {
if ((packet.granted[grantedI] & 0x80) !== 0) {
// suback with Failure status
const topics = this.messageIdToTopic[messageId]
if (topics) {
topics.forEach(function (topic) {
delete that._resubscribeTopics[topic]
})
}
}
}
this._invokeStoreProcessingQueue()
cb(null, packet)
break
}
case 'unsuback': {
delete this.outgoing[messageId]
this.messageIdProvider.deallocate(messageId)
this._invokeStoreProcessingQueue()
cb(null)
break
}
default:
that.emit('error', new Error('unrecognized packet type'))
}
if (this.disconnecting &&
Object.keys(this.outgoing).length === 0) {
this.emit('outgoingEmpty')
}
}
/**
* _handlePubrel
*
* @param {Object} packet
* @api private
*/
MqttClient.prototype._handlePubrel = function (packet, callback) {
debug('handling pubrel packet')
callback = typeof callback !== 'undefined' ? callback : nop
const messageId = packet.messageId
const that = this
const comp = { cmd: 'pubcomp', messageId: messageId }
that.incomingStore.get(packet, function (err, pub) {
if (!err) {
that.emit('message', pub.topic, pub.payload, pub)
that.handleMessage(pub, function (err) {
if (err) {
return callback(err)
}
that.incomingStore.del(pub, nop)
that._sendPacket(comp, callback)
})
} else {
that._sendPacket(comp, callback)
}
})
}
/**
* _handleDisconnect
*
* @param {Object} packet
* @api private
*/
MqttClient.prototype._handleDisconnect = function (packet) {
this.emit('disconnect', packet)
}
/**
* _nextId
* @return unsigned int
*/
MqttClient.prototype._nextId = function () {
return this.messageIdProvider.allocate()
}
/**
* getLastMessageId
* @return unsigned int
*/
MqttClient.prototype.getLastMessageId = function () {
return this.messageIdProvider.getLastAllocated()
}
/**
* _resubscribe
* @api private
*/
MqttClient.prototype._resubscribe = function () {
debug('_resubscribe')
const _resubscribeTopicsKeys = Object.keys(this._resubscribeTopics)
if (!this._firstConnection &&
(this.options.clean || (this.options.protocolVersion === 5 && !this.connackPacket.sessionPresent)) &&
_resubscribeTopicsKeys.length > 0) {
if (this.options.resubscribe) {
if (this.options.protocolVersion === 5) {
debug('_resubscribe: protocolVersion 5')
for (let topicI = 0; topicI < _resubscribeTopicsKeys.length; topicI++) {
const resubscribeTopic = {}
resubscribeTopic[_resubscribeTopicsKeys[topicI]] = this._resubscribeTopics[_resubscribeTopicsKeys[topicI]]
resubscribeTopic.resubscribe = true
this.subscribe(resubscribeTopic, { properties: resubscribeTopic[_resubscribeTopicsKeys[topicI]].properties })
}
} else {
this._resubscribeTopics.resubscribe = true
this.subscribe(this._resubscribeTopics)
}
} else {
this._resubscribeTopics = {}
}
}
this._firstConnection = false
}
/**
* _onConnect
*
* @api private
*/
MqttClient.prototype._onConnect = function (packet) {
if (this.disconnected) {
this.emit('connect', packet)
return
}
const that = this
this.connackPacket = packet
this.messageIdProvider.clear()
this._setupPingTimer()
this.connected = true
function startStreamProcess () {
let outStore = that.outgoingStore.createStream()
function clearStoreProcessing () {
that._storeProcessing = false
that._packetIdsDuringStoreProcessing = {}
}
that.once('close', remove)
outStore.on('error', function (err) {
clearStoreProcessing()
that._flushStoreProcessingQueue()
that.removeListener('close', remove)
that.emit('error', err)
})
function remove () {
outStore.destroy()
outStore = null
that._flushStoreProcessingQueue()
clearStoreProcessing()
}
function storeDeliver () {
// edge case, we wrapped this twice
if (!outStore) {
return
}
that._storeProcessing = true
const packet = outStore.read(1)
let cb
if (!packet) {
// read when data is available in the future
outStore.once('readable', storeDeliver)
return
}
// Skip already processed store packets
if (that._packetIdsDuringStoreProcessing[packet.messageId]) {
storeDeliver()
return
}
// Avoid unnecessary stream read operations when disconnected
if (!that.disconnecting && !that.reconnectTimer) {
cb = that.outgoing[packet.messageId] ? that.outgoing[packet.messageId].cb : null
that.outgoing[packet.messageId] = {
volatile: false,
cb: function (err, status) {
// Ensure that the original callback passed in to publish gets invoked
if (cb) {
cb(err, status)
}
storeDeliver()
}
}
that._packetIdsDuringStoreProcessing[packet.messageId] = true
if (that.messageIdProvider.register(packet.messageId)) {
that._sendPacket(packet)
} else {
debug('messageId: %d has already used.', packet.messageId)
}
} else if (outStore.destroy) {
outStore.destroy()
}
}
outStore.on('end', function () {
let allProcessed = true
for (const id in that._packetIdsDuringStoreProcessing) {
if (!that._packetIdsDuringStoreProcessing[id]) {
allProcessed = false
break
}
}
if (allProcessed) {
clearStoreProcessing()
that.removeListener('close', remove)
that._invokeAllStoreProcessingQueue()
that.emit('connect', packet)
} else {
startStreamProcess()
}
})
storeDeliver()
}
// start flowing
startStreamProcess()
}
MqttClient.prototype._invokeStoreProcessingQueue = function () {
if (this._storeProcessingQueue.length > 0) {
const f = this._storeProcessingQueue[0]
if (f && f.invoke()) {
this._storeProcessingQueue.shift()
return true
}
}
return false
}
MqttClient.prototype._invokeAllStoreProcessingQueue = function () {
while (this._invokeStoreProcessingQueue()) { /* empty */ }
}
MqttClient.prototype._flushStoreProcessingQueue = function () {
for (const f of this._storeProcessingQueue) {
if (f.cbStorePut) f.cbStorePut(new Error('Connection closed'))
if (f.callback) f.callback(new Error('Connection closed'))
}
this._storeProcessingQueue.splice(0)
}
module.exports = MqttClient