summaryrefslogtreecommitdiff
path: root/www/wiki/extensions/UploadWizard/resources/uw.ConcurrentQueue.js
diff options
context:
space:
mode:
Diffstat (limited to 'www/wiki/extensions/UploadWizard/resources/uw.ConcurrentQueue.js')
-rw-r--r--www/wiki/extensions/UploadWizard/resources/uw.ConcurrentQueue.js195
1 files changed, 195 insertions, 0 deletions
diff --git a/www/wiki/extensions/UploadWizard/resources/uw.ConcurrentQueue.js b/www/wiki/extensions/UploadWizard/resources/uw.ConcurrentQueue.js
new file mode 100644
index 00000000..b3dc2610
--- /dev/null
+++ b/www/wiki/extensions/UploadWizard/resources/uw.ConcurrentQueue.js
@@ -0,0 +1,195 @@
+( function ( mw, uw, $, OO ) {
+
+ /**
+ * A queue that will execute the asynchronous function `action` for each item in the queue in
+ * order, taking care not to allow more than `count` instances to be executing at the same time.
+ *
+ * Items can be added or removed (#addItem, #removeItem) while the queue is already being
+ * executed.
+ *
+ * @mixins OO.EventEmitter
+ * @param {Object} options
+ * @param {Function} options.action Action to execute for each item, must return a Promise
+ * @param {number} options.count Number of functions to execute concurrently
+ */
+ uw.ConcurrentQueue = function UWConcurrentQueue( options ) {
+ OO.EventEmitter.call( this );
+
+ this.count = options.count;
+ this.action = options.action;
+
+ this.queued = [];
+ this.running = [];
+ this.done = [];
+ this.runningPromises = [];
+
+ this.completed = false;
+ this.executing = false;
+ };
+ OO.initClass( uw.ConcurrentQueue );
+ OO.mixinClass( uw.ConcurrentQueue, OO.EventEmitter );
+
+ /**
+ * A 'progress' event is emitted when one of the functions' promises is resolved or rejected.
+ *
+ * @event progress
+ */
+
+ /**
+ * A 'complete' event is emitted when all of the functions' promises have been resolved or rejected.
+ *
+ * @event complete
+ */
+
+ /**
+ * A 'change' event is emitted when an item is added to or removed from the queue.
+ *
+ * @event change
+ */
+
+ /**
+ * Add an item to the queue.
+ *
+ * @param {Object} item
+ * @return {boolean} true
+ */
+ uw.ConcurrentQueue.prototype.addItem = function ( item ) {
+ this.queued.push( item );
+ this.emit( 'change' );
+ if ( this.executing ) {
+ this.executeNext();
+ }
+ return true;
+ };
+
+ /**
+ * Remove an item from the queue.
+ *
+ * While it's possible to remove an item that is being executed, it doesn't stop the execution.
+ *
+ * @param {Object} item
+ * @return {boolean} Whether the item was removed
+ */
+ uw.ConcurrentQueue.prototype.removeItem = function ( item ) {
+ var index, found;
+
+ found = false;
+
+ index = this.queued.indexOf( item );
+ if ( index !== -1 ) {
+ this.queued.splice( index, 1 );
+ found = true;
+ }
+
+ index = this.done.indexOf( item );
+ if ( index !== -1 ) {
+ this.done.splice( index, 1 );
+ found = true;
+ }
+
+ index = this.running.indexOf( item );
+ if ( index !== -1 ) {
+ // Try aborting the promise if possible
+ if ( this.runningPromises[ index ].abort ) {
+ this.runningPromises[ index ].abort();
+ }
+ this.running.splice( index, 1 );
+ this.runningPromises.splice( index, 1 );
+ found = true;
+ }
+
+ if ( found ) {
+ this.emit( 'change' );
+ this.checkIfComplete();
+ }
+
+ // Ensure we're still using as many threads as requested
+ this.executeNext();
+
+ return found;
+ };
+
+ /**
+ * @private
+ * @param {Object} item
+ */
+ uw.ConcurrentQueue.prototype.promiseComplete = function ( item ) {
+ var index;
+ index = this.running.indexOf( item );
+ // Check that this item wasn't removed while it was being executed
+ if ( index !== -1 ) {
+ this.running.splice( index, 1 );
+ this.runningPromises.splice( index, 1 );
+ this.done.push( item );
+ this.emit( 'progress' );
+ }
+
+ this.checkIfComplete();
+
+ this.executeNext();
+ };
+
+ /**
+ * @private
+ */
+ uw.ConcurrentQueue.prototype.executeNext = function () {
+ var item, promise;
+ if ( this.running.length === this.count || !this.executing ) {
+ return;
+ }
+ item = this.queued.shift();
+ if ( !item ) {
+ return;
+ }
+
+ this.running.push( item );
+ promise = this.action.call( null, item );
+ this.runningPromises.push( promise );
+ promise.always( this.promiseComplete.bind( this, item ) );
+ };
+
+ /**
+ * Start executing the queue. If the queue is already executing, do nothing.
+ *
+ * When the queue finishes executing, a 'complete' event will be emitted.
+ */
+ uw.ConcurrentQueue.prototype.startExecuting = function () {
+ var i;
+ if ( this.executing ) {
+ return;
+ }
+ this.completed = false;
+ this.executing = true;
+ for ( i = 0; i < this.count; i++ ) {
+ this.executeNext();
+ }
+ // In case the queue was empty
+ this.checkIfComplete();
+ };
+
+ /**
+ * Abort executing the queue. Remove all queued items and abort running ones.
+ */
+ uw.ConcurrentQueue.prototype.abortExecuting = function () {
+ while ( this.queued.length > 0 ) {
+ this.removeItem( this.queued[ 0 ] );
+ }
+ while ( this.running.length > 0 ) {
+ this.removeItem( this.running[ 0 ] );
+ }
+ };
+
+ /**
+ * @private
+ */
+ uw.ConcurrentQueue.prototype.checkIfComplete = function () {
+ if ( this.running.length === 0 && this.queued.length === 0 ) {
+ if ( !this.completed ) {
+ this.completed = true;
+ this.executing = false;
+ this.emit( 'complete' );
+ }
+ }
+ };
+
+}( mediaWiki, mediaWiki.uploadWizard, jQuery, OO ) );