137 lines
2.7 KiB
JavaScript
137 lines
2.7 KiB
JavaScript
var inherits = require('inherits');
|
|
var EventEmitter = require('events').EventEmitter;
|
|
|
|
module.exports = Queue;
|
|
|
|
function Queue(options) {
|
|
if (!(this instanceof Queue))
|
|
return new Queue(options);
|
|
|
|
EventEmitter.call(this);
|
|
options = options || {};
|
|
this.concurrency = options.concurrency || Infinity;
|
|
this.timeout = options.timeout || 0;
|
|
this.pending = 0;
|
|
this.session = 0;
|
|
this.running = false;
|
|
this.jobs = [];
|
|
}
|
|
inherits(Queue, EventEmitter);
|
|
|
|
var arrayMethods = [
|
|
'push',
|
|
'unshift',
|
|
'splice',
|
|
'pop',
|
|
'shift',
|
|
'slice',
|
|
'reverse',
|
|
'indexOf',
|
|
'lastIndexOf'
|
|
];
|
|
|
|
for (var method in arrayMethods) (function(method) {
|
|
Queue.prototype[method] = function() {
|
|
return Array.prototype[method].apply(this.jobs, arguments);
|
|
};
|
|
})(arrayMethods[method]);
|
|
|
|
Object.defineProperty(Queue.prototype, 'length', { get: function() {
|
|
return this.pending + this.jobs.length;
|
|
}});
|
|
|
|
Queue.prototype.start = function(cb) {
|
|
if (cb) {
|
|
callOnErrorOrEnd.call(this, cb);
|
|
}
|
|
|
|
if (this.pending === this.concurrency) {
|
|
return;
|
|
}
|
|
|
|
if (this.jobs.length === 0) {
|
|
if (this.pending === 0) {
|
|
done.call(this);
|
|
}
|
|
return;
|
|
}
|
|
|
|
var self = this;
|
|
var job = this.jobs.shift();
|
|
var once = true;
|
|
var session = this.session;
|
|
var timeoutId = null;
|
|
var didTimeout = false;
|
|
|
|
function next(err, result) {
|
|
if (once && self.session === session) {
|
|
once = false;
|
|
self.pending--;
|
|
if (timeoutId !== null) {
|
|
clearTimeout(timeoutId);
|
|
}
|
|
|
|
if (err) {
|
|
self.emit('error', err, job);
|
|
} else if (didTimeout === false) {
|
|
self.emit('success', result, job);
|
|
}
|
|
|
|
if (self.session === session) {
|
|
if (self.pending === 0 && self.jobs.length === 0) {
|
|
done.call(self);
|
|
} else if (self.running) {
|
|
self.start();
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
if (this.timeout) {
|
|
timeoutId = setTimeout(function() {
|
|
didTimeout = true;
|
|
if (self.listeners('timeout').length > 0) {
|
|
self.emit('timeout', next, job);
|
|
} else {
|
|
next();
|
|
}
|
|
}, this.timeout);
|
|
}
|
|
|
|
this.pending++;
|
|
this.running = true;
|
|
job(next);
|
|
|
|
if (this.jobs.length > 0) {
|
|
this.start();
|
|
}
|
|
};
|
|
|
|
Queue.prototype.stop = function() {
|
|
this.running = false;
|
|
};
|
|
|
|
Queue.prototype.end = function(err) {
|
|
this.jobs.length = 0;
|
|
this.pending = 0;
|
|
done.call(this, err);
|
|
};
|
|
|
|
function callOnErrorOrEnd(cb) {
|
|
var self = this;
|
|
this.on('error', onerror);
|
|
this.on('end', onend);
|
|
|
|
function onerror(err) { self.end(err); }
|
|
function onend(err) {
|
|
self.removeListener('error', onerror);
|
|
self.removeListener('end', onend);
|
|
cb(err);
|
|
}
|
|
}
|
|
|
|
function done(err) {
|
|
this.session++;
|
|
this.running = false;
|
|
this.emit('end', err);
|
|
}
|