TransFlow/node_modules/mqtt/lib/store.js

129 lines
2.1 KiB
JavaScript

'use strict'
/**
* Module dependencies
*/
const xtend = require('xtend')
const Readable = require('readable-stream').Readable
const streamsOpts = { objectMode: true }
const defaultStoreOptions = {
clean: true
}
/**
* In-memory implementation of the message store
* This can actually be saved into files.
*
* @param {Object} [options] - store options
*/
function Store (options) {
if (!(this instanceof Store)) {
return new Store(options)
}
this.options = options || {}
// Defaults
this.options = xtend(defaultStoreOptions, options)
this._inflights = new Map()
}
/**
* Adds a packet to the store, a packet is
* anything that has a messageId property.
*
*/
Store.prototype.put = function (packet, cb) {
this._inflights.set(packet.messageId, packet)
if (cb) {
cb()
}
return this
}
/**
* Creates a stream with all the packets in the store
*
*/
Store.prototype.createStream = function () {
const stream = new Readable(streamsOpts)
const values = []
let destroyed = false
let i = 0
this._inflights.forEach(function (value, key) {
values.push(value)
})
stream._read = function () {
if (!destroyed && i < values.length) {
this.push(values[i++])
} else {
this.push(null)
}
}
stream.destroy = function () {
if (destroyed) {
return
}
const self = this
destroyed = true
setTimeout(function () {
self.emit('close')
}, 0)
}
return stream
}
/**
* deletes a packet from the store.
*/
Store.prototype.del = function (packet, cb) {
packet = this._inflights.get(packet.messageId)
if (packet) {
this._inflights.delete(packet.messageId)
cb(null, packet)
} else if (cb) {
cb(new Error('missing packet'))
}
return this
}
/**
* get a packet from the store.
*/
Store.prototype.get = function (packet, cb) {
packet = this._inflights.get(packet.messageId)
if (packet) {
cb(null, packet)
} else if (cb) {
cb(new Error('missing packet'))
}
return this
}
/**
* Close the store
*/
Store.prototype.close = function (cb) {
if (this.options.clean) {
this._inflights = null
}
if (cb) {
cb()
}
}
module.exports = Store