summaryrefslogtreecommitdiff
path: root/www/wiki/includes/jobqueue
diff options
context:
space:
mode:
authorYaco <franco@reevo.org>2020-06-04 11:01:00 -0300
committerYaco <franco@reevo.org>2020-06-04 11:01:00 -0300
commitfc7369835258467bf97eb64f184b93691f9a9fd5 (patch)
treedaabd60089d2dd76d9f5fb416b005fbe159c799d /www/wiki/includes/jobqueue
first commit
Diffstat (limited to 'www/wiki/includes/jobqueue')
-rw-r--r--www/wiki/includes/jobqueue/Job.php426
-rw-r--r--www/wiki/includes/jobqueue/JobQueue.php731
-rw-r--r--www/wiki/includes/jobqueue/JobQueueDB.php851
-rw-r--r--www/wiki/includes/jobqueue/JobQueueFederated.php496
-rw-r--r--www/wiki/includes/jobqueue/JobQueueGroup.php480
-rw-r--r--www/wiki/includes/jobqueue/JobQueueMemory.php230
-rw-r--r--www/wiki/includes/jobqueue/JobQueueRedis.php820
-rw-r--r--www/wiki/includes/jobqueue/JobQueueSecondTestQueue.php290
-rw-r--r--www/wiki/includes/jobqueue/JobRunner.php607
-rw-r--r--www/wiki/includes/jobqueue/JobSpecification.php233
-rw-r--r--www/wiki/includes/jobqueue/README80
-rw-r--r--www/wiki/includes/jobqueue/aggregator/JobQueueAggregator.php180
-rw-r--r--www/wiki/includes/jobqueue/aggregator/JobQueueAggregatorRedis.php135
-rw-r--r--www/wiki/includes/jobqueue/jobs/ActivityUpdateJob.php82
-rw-r--r--www/wiki/includes/jobqueue/jobs/AssembleUploadChunksJob.php138
-rw-r--r--www/wiki/includes/jobqueue/jobs/CategoryMembershipChangeJob.php253
-rw-r--r--www/wiki/includes/jobqueue/jobs/CdnPurgeJob.php46
-rw-r--r--www/wiki/includes/jobqueue/jobs/ClearUserWatchlistJob.php118
-rw-r--r--www/wiki/includes/jobqueue/jobs/ClearWatchlistNotificationsJob.php79
-rw-r--r--www/wiki/includes/jobqueue/jobs/DeleteLinksJob.php67
-rw-r--r--www/wiki/includes/jobqueue/jobs/DoubleRedirectJob.php252
-rw-r--r--www/wiki/includes/jobqueue/jobs/DuplicateJob.php59
-rw-r--r--www/wiki/includes/jobqueue/jobs/EmaillingJob.php46
-rw-r--r--www/wiki/includes/jobqueue/jobs/EnotifNotifyJob.php57
-rw-r--r--www/wiki/includes/jobqueue/jobs/EnqueueJob.php98
-rw-r--r--www/wiki/includes/jobqueue/jobs/HTMLCacheUpdateJob.php202
-rw-r--r--www/wiki/includes/jobqueue/jobs/NullJob.php76
-rw-r--r--www/wiki/includes/jobqueue/jobs/PublishStashedFileJob.php152
-rw-r--r--www/wiki/includes/jobqueue/jobs/RecentChangesUpdateJob.php246
-rw-r--r--www/wiki/includes/jobqueue/jobs/RefreshLinksJob.php320
-rw-r--r--www/wiki/includes/jobqueue/jobs/ThumbnailRenderJob.php111
-rw-r--r--www/wiki/includes/jobqueue/jobs/UserGroupExpiryJob.php39
-rw-r--r--www/wiki/includes/jobqueue/utils/BacklinkJobUtils.php149
-rw-r--r--www/wiki/includes/jobqueue/utils/PurgeJobUtils.php81
34 files changed, 8230 insertions, 0 deletions
diff --git a/www/wiki/includes/jobqueue/Job.php b/www/wiki/includes/jobqueue/Job.php
new file mode 100644
index 00000000..f9c416f3
--- /dev/null
+++ b/www/wiki/includes/jobqueue/Job.php
@@ -0,0 +1,426 @@
+<?php
+/**
+ * Job queue task base code.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along
+ * with this program; if not, write to the Free Software Foundation, Inc.,
+ * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ * http://www.gnu.org/copyleft/gpl.html
+ *
+ * @file
+ * @defgroup JobQueue JobQueue
+ */
+
+/**
+ * Class to both describe a background job and handle jobs.
+ * The queue aspects of this class are now deprecated.
+ * Using the class to push jobs onto queues is deprecated (use JobSpecification).
+ *
+ * @ingroup JobQueue
+ */
+abstract class Job implements IJobSpecification {
+ /** @var string */
+ public $command;
+
+ /** @var array Array of job parameters */
+ public $params;
+
+ /** @var array Additional queue metadata */
+ public $metadata = [];
+
+ /** @var Title */
+ protected $title;
+
+ /** @var bool Expensive jobs may set this to true */
+ protected $removeDuplicates;
+
+ /** @var string Text for error that occurred last */
+ protected $error;
+
+ /** @var callable[] */
+ protected $teardownCallbacks = [];
+
+ /** @var int Bitfield of JOB_* class constants */
+ protected $executionFlags = 0;
+
+ /** @var int Job must not be wrapped in the usual explicit LBFactory transaction round */
+ const JOB_NO_EXPLICIT_TRX_ROUND = 1;
+
+ /**
+ * Run the job
+ * @return bool Success
+ */
+ abstract public function run();
+
+ /**
+ * Create the appropriate object to handle a specific job
+ *
+ * @param string $command Job command
+ * @param Title $title Associated title
+ * @param array $params Job parameters
+ * @throws MWException
+ * @return Job
+ */
+ public static function factory( $command, Title $title, $params = [] ) {
+ global $wgJobClasses;
+
+ if ( isset( $wgJobClasses[$command] ) ) {
+ $handler = $wgJobClasses[$command];
+
+ if ( is_callable( $handler ) ) {
+ $job = call_user_func( $handler, $title, $params );
+ } elseif ( class_exists( $handler ) ) {
+ $job = new $handler( $title, $params );
+ } else {
+ $job = null;
+ }
+
+ if ( $job instanceof Job ) {
+ $job->command = $command;
+ return $job;
+ } else {
+ throw new InvalidArgumentException( "Cannot instantiate job '$command': bad spec!" );
+ }
+ }
+
+ throw new InvalidArgumentException( "Invalid job command '{$command}'" );
+ }
+
+ /**
+ * @param string $command
+ * @param Title $title
+ * @param array|bool $params Can not be === true
+ */
+ public function __construct( $command, $title, $params = false ) {
+ $this->command = $command;
+ $this->title = $title;
+ $this->params = is_array( $params ) ? $params : []; // sanity
+
+ // expensive jobs may set this to true
+ $this->removeDuplicates = false;
+
+ if ( !isset( $this->params['requestId'] ) ) {
+ $this->params['requestId'] = WebRequest::getRequestId();
+ }
+ }
+
+ /**
+ * @param int $flag JOB_* class constant
+ * @return bool
+ * @since 1.31
+ */
+ public function hasExecutionFlag( $flag ) {
+ return ( $this->executionFlags && $flag ) === $flag;
+ }
+
+ /**
+ * Batch-insert a group of jobs into the queue.
+ * This will be wrapped in a transaction with a forced commit.
+ *
+ * This may add duplicate at insert time, but they will be
+ * removed later on, when the first one is popped.
+ *
+ * @param Job[] $jobs Array of Job objects
+ * @return bool
+ * @deprecated since 1.21
+ */
+ public static function batchInsert( $jobs ) {
+ wfDeprecated( __METHOD__, '1.21' );
+ JobQueueGroup::singleton()->push( $jobs );
+ return true;
+ }
+
+ /**
+ * @return string
+ */
+ public function getType() {
+ return $this->command;
+ }
+
+ /**
+ * @return Title
+ */
+ public function getTitle() {
+ return $this->title;
+ }
+
+ /**
+ * @return array
+ */
+ public function getParams() {
+ return $this->params;
+ }
+
+ /**
+ * @return int|null UNIX timestamp to delay running this job until, otherwise null
+ * @since 1.22
+ */
+ public function getReleaseTimestamp() {
+ return isset( $this->params['jobReleaseTimestamp'] )
+ ? wfTimestampOrNull( TS_UNIX, $this->params['jobReleaseTimestamp'] )
+ : null;
+ }
+
+ /**
+ * @return int|null UNIX timestamp of when the job was queued, or null
+ * @since 1.26
+ */
+ public function getQueuedTimestamp() {
+ return isset( $this->metadata['timestamp'] )
+ ? wfTimestampOrNull( TS_UNIX, $this->metadata['timestamp'] )
+ : null;
+ }
+
+ /**
+ * @return string|null Id of the request that created this job. Follows
+ * jobs recursively, allowing to track the id of the request that started a
+ * job when jobs insert jobs which insert other jobs.
+ * @since 1.27
+ */
+ public function getRequestId() {
+ return isset( $this->params['requestId'] )
+ ? $this->params['requestId']
+ : null;
+ }
+
+ /**
+ * @return int|null UNIX timestamp of when the job was runnable, or null
+ * @since 1.26
+ */
+ public function getReadyTimestamp() {
+ return $this->getReleaseTimestamp() ?: $this->getQueuedTimestamp();
+ }
+
+ /**
+ * Whether the queue should reject insertion of this job if a duplicate exists
+ *
+ * This can be used to avoid duplicated effort or combined with delayed jobs to
+ * coalesce updates into larger batches. Claimed jobs are never treated as
+ * duplicates of new jobs, and some queues may allow a few duplicates due to
+ * network partitions and fail-over. Thus, additional locking is needed to
+ * enforce mutual exclusion if this is really needed.
+ *
+ * @return bool
+ */
+ public function ignoreDuplicates() {
+ return $this->removeDuplicates;
+ }
+
+ /**
+ * @return bool Whether this job can be retried on failure by job runners
+ * @since 1.21
+ */
+ public function allowRetries() {
+ return true;
+ }
+
+ /**
+ * @return int Number of actually "work items" handled in this job
+ * @see $wgJobBackoffThrottling
+ * @since 1.23
+ */
+ public function workItemCount() {
+ return 1;
+ }
+
+ /**
+ * Subclasses may need to override this to make duplication detection work.
+ * The resulting map conveys everything that makes the job unique. This is
+ * only checked if ignoreDuplicates() returns true, meaning that duplicate
+ * jobs are supposed to be ignored.
+ *
+ * @return array Map of key/values
+ * @since 1.21
+ */
+ public function getDeduplicationInfo() {
+ $info = [
+ 'type' => $this->getType(),
+ 'namespace' => $this->getTitle()->getNamespace(),
+ 'title' => $this->getTitle()->getDBkey(),
+ 'params' => $this->getParams()
+ ];
+ if ( is_array( $info['params'] ) ) {
+ // Identical jobs with different "root" jobs should count as duplicates
+ unset( $info['params']['rootJobSignature'] );
+ unset( $info['params']['rootJobTimestamp'] );
+ // Likewise for jobs with different delay times
+ unset( $info['params']['jobReleaseTimestamp'] );
+ // Identical jobs from different requests should count as duplicates
+ unset( $info['params']['requestId'] );
+ // Queues pack and hash this array, so normalize the order
+ ksort( $info['params'] );
+ }
+
+ return $info;
+ }
+
+ /**
+ * Get "root job" parameters for a task
+ *
+ * This is used to no-op redundant jobs, including child jobs of jobs,
+ * as long as the children inherit the root job parameters. When a job
+ * with root job parameters and "rootJobIsSelf" set is pushed, the
+ * deduplicateRootJob() method is automatically called on it. If the
+ * root job is only virtual and not actually pushed (e.g. the sub-jobs
+ * are inserted directly), then call deduplicateRootJob() directly.
+ *
+ * @see JobQueue::deduplicateRootJob()
+ *
+ * @param string $key A key that identifies the task
+ * @return array Map of:
+ * - rootJobIsSelf : true
+ * - rootJobSignature : hash (e.g. SHA1) that identifies the task
+ * - rootJobTimestamp : TS_MW timestamp of this instance of the task
+ * @since 1.21
+ */
+ public static function newRootJobParams( $key ) {
+ return [
+ 'rootJobIsSelf' => true,
+ 'rootJobSignature' => sha1( $key ),
+ 'rootJobTimestamp' => wfTimestampNow()
+ ];
+ }
+
+ /**
+ * @see JobQueue::deduplicateRootJob()
+ * @return array
+ * @since 1.21
+ */
+ public function getRootJobParams() {
+ return [
+ 'rootJobSignature' => isset( $this->params['rootJobSignature'] )
+ ? $this->params['rootJobSignature']
+ : null,
+ 'rootJobTimestamp' => isset( $this->params['rootJobTimestamp'] )
+ ? $this->params['rootJobTimestamp']
+ : null
+ ];
+ }
+
+ /**
+ * @see JobQueue::deduplicateRootJob()
+ * @return bool
+ * @since 1.22
+ */
+ public function hasRootJobParams() {
+ return isset( $this->params['rootJobSignature'] )
+ && isset( $this->params['rootJobTimestamp'] );
+ }
+
+ /**
+ * @see JobQueue::deduplicateRootJob()
+ * @return bool Whether this is job is a root job
+ */
+ public function isRootJob() {
+ return $this->hasRootJobParams() && !empty( $this->params['rootJobIsSelf'] );
+ }
+
+ /**
+ * @param callable $callback A function with one parameter, the success status, which will be
+ * false if the job failed or it succeeded but the DB changes could not be committed or
+ * any deferred updates threw an exception. (This parameter was added in 1.28.)
+ * @since 1.27
+ */
+ protected function addTeardownCallback( $callback ) {
+ $this->teardownCallbacks[] = $callback;
+ }
+
+ /**
+ * Do any final cleanup after run(), deferred updates, and all DB commits happen
+ * @param bool $status Whether the job, its deferred updates, and DB commit all succeeded
+ * @since 1.27
+ */
+ public function teardown( $status ) {
+ foreach ( $this->teardownCallbacks as $callback ) {
+ call_user_func( $callback, $status );
+ }
+ }
+
+ /**
+ * Insert a single job into the queue.
+ * @return bool True on success
+ * @deprecated since 1.21
+ */
+ public function insert() {
+ wfDeprecated( __METHOD__, '1.21' );
+ JobQueueGroup::singleton()->push( $this );
+ return true;
+ }
+
+ /**
+ * @return string
+ */
+ public function toString() {
+ $paramString = '';
+ if ( $this->params ) {
+ foreach ( $this->params as $key => $value ) {
+ if ( $paramString != '' ) {
+ $paramString .= ' ';
+ }
+ if ( is_array( $value ) ) {
+ $filteredValue = [];
+ foreach ( $value as $k => $v ) {
+ $json = FormatJson::encode( $v );
+ if ( $json === false || mb_strlen( $json ) > 512 ) {
+ $filteredValue[$k] = gettype( $v ) . '(...)';
+ } else {
+ $filteredValue[$k] = $v;
+ }
+ }
+ if ( count( $filteredValue ) <= 10 ) {
+ $value = FormatJson::encode( $filteredValue );
+ } else {
+ $value = "array(" . count( $value ) . ")";
+ }
+ } elseif ( is_object( $value ) && !method_exists( $value, '__toString' ) ) {
+ $value = "object(" . get_class( $value ) . ")";
+ }
+
+ $flatValue = (string)$value;
+ if ( mb_strlen( $value ) > 1024 ) {
+ $flatValue = "string(" . mb_strlen( $value ) . ")";
+ }
+
+ $paramString .= "$key={$flatValue}";
+ }
+ }
+
+ $metaString = '';
+ foreach ( $this->metadata as $key => $value ) {
+ if ( is_scalar( $value ) && mb_strlen( $value ) < 1024 ) {
+ $metaString .= ( $metaString ? ",$key=$value" : "$key=$value" );
+ }
+ }
+
+ $s = $this->command;
+ if ( is_object( $this->title ) ) {
+ $s .= " {$this->title->getPrefixedDBkey()}";
+ }
+ if ( $paramString != '' ) {
+ $s .= " $paramString";
+ }
+ if ( $metaString != '' ) {
+ $s .= " ($metaString)";
+ }
+
+ return $s;
+ }
+
+ protected function setLastError( $error ) {
+ $this->error = $error;
+ }
+
+ public function getLastError() {
+ return $this->error;
+ }
+}
diff --git a/www/wiki/includes/jobqueue/JobQueue.php b/www/wiki/includes/jobqueue/JobQueue.php
new file mode 100644
index 00000000..3a8bf1ab
--- /dev/null
+++ b/www/wiki/includes/jobqueue/JobQueue.php
@@ -0,0 +1,731 @@
+<?php
+/**
+ * Job queue base code.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along
+ * with this program; if not, write to the Free Software Foundation, Inc.,
+ * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ * http://www.gnu.org/copyleft/gpl.html
+ *
+ * @file
+ * @defgroup JobQueue JobQueue
+ */
+use MediaWiki\MediaWikiServices;
+
+/**
+ * Class to handle enqueueing and running of background jobs
+ *
+ * @ingroup JobQueue
+ * @since 1.21
+ */
+abstract class JobQueue {
+ /** @var string Wiki ID */
+ protected $wiki;
+ /** @var string Job type */
+ protected $type;
+ /** @var string Job priority for pop() */
+ protected $order;
+ /** @var int Time to live in seconds */
+ protected $claimTTL;
+ /** @var int Maximum number of times to try a job */
+ protected $maxTries;
+ /** @var string|bool Read only rationale (or false if r/w) */
+ protected $readOnlyReason;
+
+ /** @var BagOStuff */
+ protected $dupCache;
+ /** @var JobQueueAggregator */
+ protected $aggr;
+
+ const QOS_ATOMIC = 1; // integer; "all-or-nothing" job insertions
+
+ const ROOTJOB_TTL = 2419200; // integer; seconds to remember root jobs (28 days)
+
+ /**
+ * @param array $params
+ * @throws MWException
+ */
+ protected function __construct( array $params ) {
+ $this->wiki = $params['wiki'];
+ $this->type = $params['type'];
+ $this->claimTTL = isset( $params['claimTTL'] ) ? $params['claimTTL'] : 0;
+ $this->maxTries = isset( $params['maxTries'] ) ? $params['maxTries'] : 3;
+ if ( isset( $params['order'] ) && $params['order'] !== 'any' ) {
+ $this->order = $params['order'];
+ } else {
+ $this->order = $this->optimalOrder();
+ }
+ if ( !in_array( $this->order, $this->supportedOrders() ) ) {
+ throw new MWException( __CLASS__ . " does not support '{$this->order}' order." );
+ }
+ $this->dupCache = wfGetCache( CACHE_ANYTHING );
+ $this->aggr = isset( $params['aggregator'] )
+ ? $params['aggregator']
+ : new JobQueueAggregatorNull( [] );
+ $this->readOnlyReason = isset( $params['readOnlyReason'] )
+ ? $params['readOnlyReason']
+ : false;
+ }
+
+ /**
+ * Get a job queue object of the specified type.
+ * $params includes:
+ * - class : What job class to use (determines job type)
+ * - wiki : wiki ID of the wiki the jobs are for (defaults to current wiki)
+ * - type : The name of the job types this queue handles
+ * - order : Order that pop() selects jobs, one of "fifo", "timestamp" or "random".
+ * If "fifo" is used, the queue will effectively be FIFO. Note that job
+ * completion will not appear to be exactly FIFO if there are multiple
+ * job runners since jobs can take different times to finish once popped.
+ * If "timestamp" is used, the queue will at least be loosely ordered
+ * by timestamp, allowing for some jobs to be popped off out of order.
+ * If "random" is used, pop() will pick jobs in random order.
+ * Note that it may only be weakly random (e.g. a lottery of the oldest X).
+ * If "any" is choosen, the queue will use whatever order is the fastest.
+ * This might be useful for improving concurrency for job acquisition.
+ * - claimTTL : If supported, the queue will recycle jobs that have been popped
+ * but not acknowledged as completed after this many seconds. Recycling
+ * of jobs simply means re-inserting them into the queue. Jobs can be
+ * attempted up to three times before being discarded.
+ * - readOnlyReason : Set this to a string to make the queue read-only.
+ *
+ * Queue classes should throw an exception if they do not support the options given.
+ *
+ * @param array $params
+ * @return JobQueue
+ * @throws MWException
+ */
+ final public static function factory( array $params ) {
+ $class = $params['class'];
+ if ( !class_exists( $class ) ) {
+ throw new MWException( "Invalid job queue class '$class'." );
+ }
+ $obj = new $class( $params );
+ if ( !( $obj instanceof self ) ) {
+ throw new MWException( "Class '$class' is not a " . __CLASS__ . " class." );
+ }
+
+ return $obj;
+ }
+
+ /**
+ * @return string Wiki ID
+ */
+ final public function getWiki() {
+ return $this->wiki;
+ }
+
+ /**
+ * @return string Job type that this queue handles
+ */
+ final public function getType() {
+ return $this->type;
+ }
+
+ /**
+ * @return string One of (random, timestamp, fifo, undefined)
+ */
+ final public function getOrder() {
+ return $this->order;
+ }
+
+ /**
+ * Get the allowed queue orders for configuration validation
+ *
+ * @return array Subset of (random, timestamp, fifo, undefined)
+ */
+ abstract protected function supportedOrders();
+
+ /**
+ * Get the default queue order to use if configuration does not specify one
+ *
+ * @return string One of (random, timestamp, fifo, undefined)
+ */
+ abstract protected function optimalOrder();
+
+ /**
+ * Find out if delayed jobs are supported for configuration validation
+ *
+ * @return bool Whether delayed jobs are supported
+ */
+ protected function supportsDelayedJobs() {
+ return false; // not implemented
+ }
+
+ /**
+ * @return bool Whether delayed jobs are enabled
+ * @since 1.22
+ */
+ final public function delayedJobsEnabled() {
+ return $this->supportsDelayedJobs();
+ }
+
+ /**
+ * @return string|bool Read-only rational or false if r/w
+ * @since 1.27
+ */
+ public function getReadOnlyReason() {
+ return $this->readOnlyReason;
+ }
+
+ /**
+ * Quickly check if the queue has no available (unacquired, non-delayed) jobs.
+ * Queue classes should use caching if they are any slower without memcached.
+ *
+ * If caching is used, this might return false when there are actually no jobs.
+ * If pop() is called and returns false then it should correct the cache. Also,
+ * calling flushCaches() first prevents this. However, this affect is typically
+ * not distinguishable from the race condition between isEmpty() and pop().
+ *
+ * @return bool
+ * @throws JobQueueError
+ */
+ final public function isEmpty() {
+ $res = $this->doIsEmpty();
+
+ return $res;
+ }
+
+ /**
+ * @see JobQueue::isEmpty()
+ * @return bool
+ */
+ abstract protected function doIsEmpty();
+
+ /**
+ * Get the number of available (unacquired, non-delayed) jobs in the queue.
+ * Queue classes should use caching if they are any slower without memcached.
+ *
+ * If caching is used, this number might be out of date for a minute.
+ *
+ * @return int
+ * @throws JobQueueError
+ */
+ final public function getSize() {
+ $res = $this->doGetSize();
+
+ return $res;
+ }
+
+ /**
+ * @see JobQueue::getSize()
+ * @return int
+ */
+ abstract protected function doGetSize();
+
+ /**
+ * Get the number of acquired jobs (these are temporarily out of the queue).
+ * Queue classes should use caching if they are any slower without memcached.
+ *
+ * If caching is used, this number might be out of date for a minute.
+ *
+ * @return int
+ * @throws JobQueueError
+ */
+ final public function getAcquiredCount() {
+ $res = $this->doGetAcquiredCount();
+
+ return $res;
+ }
+
+ /**
+ * @see JobQueue::getAcquiredCount()
+ * @return int
+ */
+ abstract protected function doGetAcquiredCount();
+
+ /**
+ * Get the number of delayed jobs (these are temporarily out of the queue).
+ * Queue classes should use caching if they are any slower without memcached.
+ *
+ * If caching is used, this number might be out of date for a minute.
+ *
+ * @return int
+ * @throws JobQueueError
+ * @since 1.22
+ */
+ final public function getDelayedCount() {
+ $res = $this->doGetDelayedCount();
+
+ return $res;
+ }
+
+ /**
+ * @see JobQueue::getDelayedCount()
+ * @return int
+ */
+ protected function doGetDelayedCount() {
+ return 0; // not implemented
+ }
+
+ /**
+ * Get the number of acquired jobs that can no longer be attempted.
+ * Queue classes should use caching if they are any slower without memcached.
+ *
+ * If caching is used, this number might be out of date for a minute.
+ *
+ * @return int
+ * @throws JobQueueError
+ */
+ final public function getAbandonedCount() {
+ $res = $this->doGetAbandonedCount();
+
+ return $res;
+ }
+
+ /**
+ * @see JobQueue::getAbandonedCount()
+ * @return int
+ */
+ protected function doGetAbandonedCount() {
+ return 0; // not implemented
+ }
+
+ /**
+ * Push one or more jobs into the queue.
+ * This does not require $wgJobClasses to be set for the given job type.
+ * Outside callers should use JobQueueGroup::push() instead of this function.
+ *
+ * @param IJobSpecification|IJobSpecification[] $jobs
+ * @param int $flags Bitfield (supports JobQueue::QOS_ATOMIC)
+ * @return void
+ * @throws JobQueueError
+ */
+ final public function push( $jobs, $flags = 0 ) {
+ $jobs = is_array( $jobs ) ? $jobs : [ $jobs ];
+ $this->batchPush( $jobs, $flags );
+ }
+
+ /**
+ * Push a batch of jobs into the queue.
+ * This does not require $wgJobClasses to be set for the given job type.
+ * Outside callers should use JobQueueGroup::push() instead of this function.
+ *
+ * @param IJobSpecification[] $jobs
+ * @param int $flags Bitfield (supports JobQueue::QOS_ATOMIC)
+ * @return void
+ * @throws MWException
+ */
+ final public function batchPush( array $jobs, $flags = 0 ) {
+ $this->assertNotReadOnly();
+
+ if ( !count( $jobs ) ) {
+ return; // nothing to do
+ }
+
+ foreach ( $jobs as $job ) {
+ if ( $job->getType() !== $this->type ) {
+ throw new MWException(
+ "Got '{$job->getType()}' job; expected a '{$this->type}' job." );
+ } elseif ( $job->getReleaseTimestamp() && !$this->supportsDelayedJobs() ) {
+ throw new MWException(
+ "Got delayed '{$job->getType()}' job; delays are not supported." );
+ }
+ }
+
+ $this->doBatchPush( $jobs, $flags );
+ $this->aggr->notifyQueueNonEmpty( $this->wiki, $this->type );
+
+ foreach ( $jobs as $job ) {
+ if ( $job->isRootJob() ) {
+ $this->deduplicateRootJob( $job );
+ }
+ }
+ }
+
+ /**
+ * @see JobQueue::batchPush()
+ * @param IJobSpecification[] $jobs
+ * @param int $flags
+ */
+ abstract protected function doBatchPush( array $jobs, $flags );
+
+ /**
+ * Pop a job off of the queue.
+ * This requires $wgJobClasses to be set for the given job type.
+ * Outside callers should use JobQueueGroup::pop() instead of this function.
+ *
+ * @throws MWException
+ * @return Job|bool Returns false if there are no jobs
+ */
+ final public function pop() {
+ global $wgJobClasses;
+
+ $this->assertNotReadOnly();
+ if ( !WikiMap::isCurrentWikiDbDomain( $this->wiki ) ) {
+ throw new MWException( "Cannot pop '{$this->type}' job off foreign wiki queue." );
+ } elseif ( !isset( $wgJobClasses[$this->type] ) ) {
+ // Do not pop jobs if there is no class for the queue type
+ throw new MWException( "Unrecognized job type '{$this->type}'." );
+ }
+
+ $job = $this->doPop();
+
+ if ( !$job ) {
+ $this->aggr->notifyQueueEmpty( $this->wiki, $this->type );
+ }
+
+ // Flag this job as an old duplicate based on its "root" job...
+ try {
+ if ( $job && $this->isRootJobOldDuplicate( $job ) ) {
+ self::incrStats( 'dupe_pops', $this->type );
+ $job = DuplicateJob::newFromJob( $job ); // convert to a no-op
+ }
+ } catch ( Exception $e ) {
+ // don't lose jobs over this
+ }
+
+ return $job;
+ }
+
+ /**
+ * @see JobQueue::pop()
+ * @return Job|bool
+ */
+ abstract protected function doPop();
+
+ /**
+ * Acknowledge that a job was completed.
+ *
+ * This does nothing for certain queue classes or if "claimTTL" is not set.
+ * Outside callers should use JobQueueGroup::ack() instead of this function.
+ *
+ * @param Job $job
+ * @return void
+ * @throws MWException
+ */
+ final public function ack( Job $job ) {
+ $this->assertNotReadOnly();
+ if ( $job->getType() !== $this->type ) {
+ throw new MWException( "Got '{$job->getType()}' job; expected '{$this->type}'." );
+ }
+
+ $this->doAck( $job );
+ }
+
+ /**
+ * @see JobQueue::ack()
+ * @param Job $job
+ */
+ abstract protected function doAck( Job $job );
+
+ /**
+ * Register the "root job" of a given job into the queue for de-duplication.
+ * This should only be called right *after* all the new jobs have been inserted.
+ * This is used to turn older, duplicate, job entries into no-ops. The root job
+ * information will remain in the registry until it simply falls out of cache.
+ *
+ * This requires that $job has two special fields in the "params" array:
+ * - rootJobSignature : hash (e.g. SHA1) that identifies the task
+ * - rootJobTimestamp : TS_MW timestamp of this instance of the task
+ *
+ * A "root job" is a conceptual job that consist of potentially many smaller jobs
+ * that are actually inserted into the queue. For example, "refreshLinks" jobs are
+ * spawned when a template is edited. One can think of the task as "update links
+ * of pages that use template X" and an instance of that task as a "root job".
+ * However, what actually goes into the queue are range and leaf job subtypes.
+ * Since these jobs include things like page ID ranges and DB master positions,
+ * and can morph into smaller jobs recursively, simple duplicate detection
+ * for individual jobs being identical (like that of job_sha1) is not useful.
+ *
+ * In the case of "refreshLinks", if these jobs are still in the queue when the template
+ * is edited again, we want all of these old refreshLinks jobs for that template to become
+ * no-ops. This can greatly reduce server load, since refreshLinks jobs involves parsing.
+ * Essentially, the new batch of jobs belong to a new "root job" and the older ones to a
+ * previous "root job" for the same task of "update links of pages that use template X".
+ *
+ * This does nothing for certain queue classes.
+ *
+ * @param IJobSpecification $job
+ * @throws MWException
+ * @return bool
+ */
+ final public function deduplicateRootJob( IJobSpecification $job ) {
+ $this->assertNotReadOnly();
+ if ( $job->getType() !== $this->type ) {
+ throw new MWException( "Got '{$job->getType()}' job; expected '{$this->type}'." );
+ }
+
+ return $this->doDeduplicateRootJob( $job );
+ }
+
+ /**
+ * @see JobQueue::deduplicateRootJob()
+ * @param IJobSpecification $job
+ * @throws MWException
+ * @return bool
+ */
+ protected function doDeduplicateRootJob( IJobSpecification $job ) {
+ if ( !$job->hasRootJobParams() ) {
+ throw new MWException( "Cannot register root job; missing parameters." );
+ }
+ $params = $job->getRootJobParams();
+
+ $key = $this->getRootJobCacheKey( $params['rootJobSignature'] );
+ // Callers should call batchInsert() and then this function so that if the insert
+ // fails, the de-duplication registration will be aborted. Since the insert is
+ // deferred till "transaction idle", do the same here, so that the ordering is
+ // maintained. Having only the de-duplication registration succeed would cause
+ // jobs to become no-ops without any actual jobs that made them redundant.
+ $timestamp = $this->dupCache->get( $key ); // current last timestamp of this job
+ if ( $timestamp && $timestamp >= $params['rootJobTimestamp'] ) {
+ return true; // a newer version of this root job was enqueued
+ }
+
+ // Update the timestamp of the last root job started at the location...
+ return $this->dupCache->set( $key, $params['rootJobTimestamp'], JobQueueDB::ROOTJOB_TTL );
+ }
+
+ /**
+ * Check if the "root" job of a given job has been superseded by a newer one
+ *
+ * @param Job $job
+ * @throws MWException
+ * @return bool
+ */
+ final protected function isRootJobOldDuplicate( Job $job ) {
+ if ( $job->getType() !== $this->type ) {
+ throw new MWException( "Got '{$job->getType()}' job; expected '{$this->type}'." );
+ }
+ $isDuplicate = $this->doIsRootJobOldDuplicate( $job );
+
+ return $isDuplicate;
+ }
+
+ /**
+ * @see JobQueue::isRootJobOldDuplicate()
+ * @param Job $job
+ * @return bool
+ */
+ protected function doIsRootJobOldDuplicate( Job $job ) {
+ if ( !$job->hasRootJobParams() ) {
+ return false; // job has no de-deplication info
+ }
+ $params = $job->getRootJobParams();
+
+ $key = $this->getRootJobCacheKey( $params['rootJobSignature'] );
+ // Get the last time this root job was enqueued
+ $timestamp = $this->dupCache->get( $key );
+
+ // Check if a new root job was started at the location after this one's...
+ return ( $timestamp && $timestamp > $params['rootJobTimestamp'] );
+ }
+
+ /**
+ * @param string $signature Hash identifier of the root job
+ * @return string
+ */
+ protected function getRootJobCacheKey( $signature ) {
+ list( $db, $prefix ) = wfSplitWikiID( $this->wiki );
+
+ return wfForeignMemcKey( $db, $prefix, 'jobqueue', $this->type, 'rootjob', $signature );
+ }
+
+ /**
+ * Deleted all unclaimed and delayed jobs from the queue
+ *
+ * @throws JobQueueError
+ * @since 1.22
+ * @return void
+ */
+ final public function delete() {
+ $this->assertNotReadOnly();
+
+ $this->doDelete();
+ }
+
+ /**
+ * @see JobQueue::delete()
+ * @throws MWException
+ */
+ protected function doDelete() {
+ throw new MWException( "This method is not implemented." );
+ }
+
+ /**
+ * Wait for any replica DBs or backup servers to catch up.
+ *
+ * This does nothing for certain queue classes.
+ *
+ * @return void
+ * @throws JobQueueError
+ */
+ final public function waitForBackups() {
+ $this->doWaitForBackups();
+ }
+
+ /**
+ * @see JobQueue::waitForBackups()
+ * @return void
+ */
+ protected function doWaitForBackups() {
+ }
+
+ /**
+ * Clear any process and persistent caches
+ *
+ * @return void
+ */
+ final public function flushCaches() {
+ $this->doFlushCaches();
+ }
+
+ /**
+ * @see JobQueue::flushCaches()
+ * @return void
+ */
+ protected function doFlushCaches() {
+ }
+
+ /**
+ * Get an iterator to traverse over all available jobs in this queue.
+ * This does not include jobs that are currently acquired or delayed.
+ * Note: results may be stale if the queue is concurrently modified.
+ *
+ * @return Iterator
+ * @throws JobQueueError
+ */
+ abstract public function getAllQueuedJobs();
+
+ /**
+ * Get an iterator to traverse over all delayed jobs in this queue.
+ * Note: results may be stale if the queue is concurrently modified.
+ *
+ * @return Iterator
+ * @throws JobQueueError
+ * @since 1.22
+ */
+ public function getAllDelayedJobs() {
+ return new ArrayIterator( [] ); // not implemented
+ }
+
+ /**
+ * Get an iterator to traverse over all claimed jobs in this queue
+ *
+ * Callers should be quick to iterator over it or few results
+ * will be returned due to jobs being acknowledged and deleted
+ *
+ * @return Iterator
+ * @throws JobQueueError
+ * @since 1.26
+ */
+ public function getAllAcquiredJobs() {
+ return new ArrayIterator( [] ); // not implemented
+ }
+
+ /**
+ * Get an iterator to traverse over all abandoned jobs in this queue
+ *
+ * @return Iterator
+ * @throws JobQueueError
+ * @since 1.25
+ */
+ public function getAllAbandonedJobs() {
+ return new ArrayIterator( [] ); // not implemented
+ }
+
+ /**
+ * Do not use this function outside of JobQueue/JobQueueGroup
+ *
+ * @return string
+ * @since 1.22
+ */
+ public function getCoalesceLocationInternal() {
+ return null;
+ }
+
+ /**
+ * Check whether each of the given queues are empty.
+ * This is used for batching checks for queues stored at the same place.
+ *
+ * @param array $types List of queues types
+ * @return array|null (list of non-empty queue types) or null if unsupported
+ * @throws MWException
+ * @since 1.22
+ */
+ final public function getSiblingQueuesWithJobs( array $types ) {
+ return $this->doGetSiblingQueuesWithJobs( $types );
+ }
+
+ /**
+ * @see JobQueue::getSiblingQueuesWithJobs()
+ * @param array $types List of queues types
+ * @return array|null (list of queue types) or null if unsupported
+ */
+ protected function doGetSiblingQueuesWithJobs( array $types ) {
+ return null; // not supported
+ }
+
+ /**
+ * Check the size of each of the given queues.
+ * For queues not served by the same store as this one, 0 is returned.
+ * This is used for batching checks for queues stored at the same place.
+ *
+ * @param array $types List of queues types
+ * @return array|null (job type => whether queue is empty) or null if unsupported
+ * @throws MWException
+ * @since 1.22
+ */
+ final public function getSiblingQueueSizes( array $types ) {
+ return $this->doGetSiblingQueueSizes( $types );
+ }
+
+ /**
+ * @see JobQueue::getSiblingQueuesSize()
+ * @param array $types List of queues types
+ * @return array|null (list of queue types) or null if unsupported
+ */
+ protected function doGetSiblingQueueSizes( array $types ) {
+ return null; // not supported
+ }
+
+ /**
+ * @throws JobQueueReadOnlyError
+ */
+ protected function assertNotReadOnly() {
+ if ( $this->readOnlyReason !== false ) {
+ throw new JobQueueReadOnlyError( "Job queue is read-only: {$this->readOnlyReason}" );
+ }
+ }
+
+ /**
+ * Call wfIncrStats() for the queue overall and for the queue type
+ *
+ * @param string $key Event type
+ * @param string $type Job type
+ * @param int $delta
+ * @since 1.22
+ */
+ public static function incrStats( $key, $type, $delta = 1 ) {
+ static $stats;
+ if ( !$stats ) {
+ $stats = MediaWikiServices::getInstance()->getStatsdDataFactory();
+ }
+ $stats->updateCount( "jobqueue.{$key}.all", $delta );
+ $stats->updateCount( "jobqueue.{$key}.{$type}", $delta );
+ }
+}
+
+/**
+ * @ingroup JobQueue
+ * @since 1.22
+ */
+class JobQueueError extends MWException {
+}
+
+class JobQueueConnectionError extends JobQueueError {
+}
+
+class JobQueueReadOnlyError extends JobQueueError {
+
+}
diff --git a/www/wiki/includes/jobqueue/JobQueueDB.php b/www/wiki/includes/jobqueue/JobQueueDB.php
new file mode 100644
index 00000000..f01ba63f
--- /dev/null
+++ b/www/wiki/includes/jobqueue/JobQueueDB.php
@@ -0,0 +1,851 @@
+<?php
+/**
+ * Database-backed job queue code.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along
+ * with this program; if not, write to the Free Software Foundation, Inc.,
+ * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ * http://www.gnu.org/copyleft/gpl.html
+ *
+ * @file
+ */
+use Wikimedia\Rdbms\IDatabase;
+use Wikimedia\Rdbms\DBConnRef;
+use Wikimedia\Rdbms\DBConnectionError;
+use Wikimedia\Rdbms\DBError;
+use MediaWiki\MediaWikiServices;
+use Wikimedia\ScopedCallback;
+
+/**
+ * Class to handle job queues stored in the DB
+ *
+ * @ingroup JobQueue
+ * @since 1.21
+ */
+class JobQueueDB extends JobQueue {
+ const CACHE_TTL_SHORT = 30; // integer; seconds to cache info without re-validating
+ const MAX_AGE_PRUNE = 604800; // integer; seconds a job can live once claimed
+ const MAX_JOB_RANDOM = 2147483647; // integer; 2^31 - 1, used for job_random
+ const MAX_OFFSET = 255; // integer; maximum number of rows to skip
+
+ /** @var WANObjectCache */
+ protected $cache;
+
+ /** @var bool|string Name of an external DB cluster. False if not set */
+ protected $cluster = false;
+
+ /**
+ * Additional parameters include:
+ * - cluster : The name of an external cluster registered via LBFactory.
+ * If not specified, the primary DB cluster for the wiki will be used.
+ * This can be overridden with a custom cluster so that DB handles will
+ * be retrieved via LBFactory::getExternalLB() and getConnection().
+ * @param array $params
+ */
+ protected function __construct( array $params ) {
+ parent::__construct( $params );
+
+ $this->cluster = isset( $params['cluster'] ) ? $params['cluster'] : false;
+ $this->cache = ObjectCache::getMainWANInstance();
+ }
+
+ protected function supportedOrders() {
+ return [ 'random', 'timestamp', 'fifo' ];
+ }
+
+ protected function optimalOrder() {
+ return 'random';
+ }
+
+ /**
+ * @see JobQueue::doIsEmpty()
+ * @return bool
+ */
+ protected function doIsEmpty() {
+ $dbr = $this->getReplicaDB();
+ try {
+ $found = $dbr->selectField( // unclaimed job
+ 'job', '1', [ 'job_cmd' => $this->type, 'job_token' => '' ], __METHOD__
+ );
+ } catch ( DBError $e ) {
+ $this->throwDBException( $e );
+ }
+
+ return !$found;
+ }
+
+ /**
+ * @see JobQueue::doGetSize()
+ * @return int
+ */
+ protected function doGetSize() {
+ $key = $this->getCacheKey( 'size' );
+
+ $size = $this->cache->get( $key );
+ if ( is_int( $size ) ) {
+ return $size;
+ }
+
+ try {
+ $dbr = $this->getReplicaDB();
+ $size = (int)$dbr->selectField( 'job', 'COUNT(*)',
+ [ 'job_cmd' => $this->type, 'job_token' => '' ],
+ __METHOD__
+ );
+ } catch ( DBError $e ) {
+ $this->throwDBException( $e );
+ }
+ $this->cache->set( $key, $size, self::CACHE_TTL_SHORT );
+
+ return $size;
+ }
+
+ /**
+ * @see JobQueue::doGetAcquiredCount()
+ * @return int
+ */
+ protected function doGetAcquiredCount() {
+ if ( $this->claimTTL <= 0 ) {
+ return 0; // no acknowledgements
+ }
+
+ $key = $this->getCacheKey( 'acquiredcount' );
+
+ $count = $this->cache->get( $key );
+ if ( is_int( $count ) ) {
+ return $count;
+ }
+
+ $dbr = $this->getReplicaDB();
+ try {
+ $count = (int)$dbr->selectField( 'job', 'COUNT(*)',
+ [ 'job_cmd' => $this->type, "job_token != {$dbr->addQuotes( '' )}" ],
+ __METHOD__
+ );
+ } catch ( DBError $e ) {
+ $this->throwDBException( $e );
+ }
+ $this->cache->set( $key, $count, self::CACHE_TTL_SHORT );
+
+ return $count;
+ }
+
+ /**
+ * @see JobQueue::doGetAbandonedCount()
+ * @return int
+ * @throws MWException
+ */
+ protected function doGetAbandonedCount() {
+ if ( $this->claimTTL <= 0 ) {
+ return 0; // no acknowledgements
+ }
+
+ $key = $this->getCacheKey( 'abandonedcount' );
+
+ $count = $this->cache->get( $key );
+ if ( is_int( $count ) ) {
+ return $count;
+ }
+
+ $dbr = $this->getReplicaDB();
+ try {
+ $count = (int)$dbr->selectField( 'job', 'COUNT(*)',
+ [
+ 'job_cmd' => $this->type,
+ "job_token != {$dbr->addQuotes( '' )}",
+ "job_attempts >= " . $dbr->addQuotes( $this->maxTries )
+ ],
+ __METHOD__
+ );
+ } catch ( DBError $e ) {
+ $this->throwDBException( $e );
+ }
+
+ $this->cache->set( $key, $count, self::CACHE_TTL_SHORT );
+
+ return $count;
+ }
+
+ /**
+ * @see JobQueue::doBatchPush()
+ * @param IJobSpecification[] $jobs
+ * @param int $flags
+ * @throws DBError|Exception
+ * @return void
+ */
+ protected function doBatchPush( array $jobs, $flags ) {
+ $dbw = $this->getMasterDB();
+ // In general, there will be two cases here:
+ // a) sqlite; DB connection is probably a regular round-aware handle.
+ // If the connection is busy with a transaction, then defer the job writes
+ // until right before the main round commit step. Any errors that bubble
+ // up will rollback the main commit round.
+ // b) mysql/postgres; DB connection is generally a separate CONN_TRX_AUTOCOMMIT handle.
+ // No transaction is active nor will be started by writes, so enqueue the jobs
+ // now so that any errors will show up immediately as the interface expects. Any
+ // errors that bubble up will rollback the main commit round.
+ $fname = __METHOD__;
+ $dbw->onTransactionPreCommitOrIdle(
+ function () use ( $dbw, $jobs, $flags, $fname ) {
+ $this->doBatchPushInternal( $dbw, $jobs, $flags, $fname );
+ },
+ $fname
+ );
+ }
+
+ /**
+ * This function should *not* be called outside of JobQueueDB
+ *
+ * @param IDatabase $dbw
+ * @param IJobSpecification[] $jobs
+ * @param int $flags
+ * @param string $method
+ * @throws DBError
+ * @return void
+ */
+ public function doBatchPushInternal( IDatabase $dbw, array $jobs, $flags, $method ) {
+ if ( !count( $jobs ) ) {
+ return;
+ }
+
+ $rowSet = []; // (sha1 => job) map for jobs that are de-duplicated
+ $rowList = []; // list of jobs for jobs that are not de-duplicated
+ foreach ( $jobs as $job ) {
+ $row = $this->insertFields( $job );
+ if ( $job->ignoreDuplicates() ) {
+ $rowSet[$row['job_sha1']] = $row;
+ } else {
+ $rowList[] = $row;
+ }
+ }
+
+ if ( $flags & self::QOS_ATOMIC ) {
+ $dbw->startAtomic( $method ); // wrap all the job additions in one transaction
+ }
+ try {
+ // Strip out any duplicate jobs that are already in the queue...
+ if ( count( $rowSet ) ) {
+ $res = $dbw->select( 'job', 'job_sha1',
+ [
+ // No job_type condition since it's part of the job_sha1 hash
+ 'job_sha1' => array_keys( $rowSet ),
+ 'job_token' => '' // unclaimed
+ ],
+ $method
+ );
+ foreach ( $res as $row ) {
+ wfDebug( "Job with hash '{$row->job_sha1}' is a duplicate.\n" );
+ unset( $rowSet[$row->job_sha1] ); // already enqueued
+ }
+ }
+ // Build the full list of job rows to insert
+ $rows = array_merge( $rowList, array_values( $rowSet ) );
+ // Insert the job rows in chunks to avoid replica DB lag...
+ foreach ( array_chunk( $rows, 50 ) as $rowBatch ) {
+ $dbw->insert( 'job', $rowBatch, $method );
+ }
+ JobQueue::incrStats( 'inserts', $this->type, count( $rows ) );
+ JobQueue::incrStats( 'dupe_inserts', $this->type,
+ count( $rowSet ) + count( $rowList ) - count( $rows )
+ );
+ } catch ( DBError $e ) {
+ $this->throwDBException( $e );
+ }
+ if ( $flags & self::QOS_ATOMIC ) {
+ $dbw->endAtomic( $method );
+ }
+
+ return;
+ }
+
+ /**
+ * @see JobQueue::doPop()
+ * @return Job|bool
+ */
+ protected function doPop() {
+ $dbw = $this->getMasterDB();
+ try {
+ $autoTrx = $dbw->getFlag( DBO_TRX ); // get current setting
+ $dbw->clearFlag( DBO_TRX ); // make each query its own transaction
+ $scopedReset = new ScopedCallback( function () use ( $dbw, $autoTrx ) {
+ $dbw->setFlag( $autoTrx ? DBO_TRX : 0 ); // restore old setting
+ } );
+
+ $uuid = wfRandomString( 32 ); // pop attempt
+ $job = false; // job popped off
+ do { // retry when our row is invalid or deleted as a duplicate
+ // Try to reserve a row in the DB...
+ if ( in_array( $this->order, [ 'fifo', 'timestamp' ] ) ) {
+ $row = $this->claimOldest( $uuid );
+ } else { // random first
+ $rand = mt_rand( 0, self::MAX_JOB_RANDOM ); // encourage concurrent UPDATEs
+ $gte = (bool)mt_rand( 0, 1 ); // find rows with rand before/after $rand
+ $row = $this->claimRandom( $uuid, $rand, $gte );
+ }
+ // Check if we found a row to reserve...
+ if ( !$row ) {
+ break; // nothing to do
+ }
+ JobQueue::incrStats( 'pops', $this->type );
+ // Get the job object from the row...
+ $title = Title::makeTitle( $row->job_namespace, $row->job_title );
+ $job = Job::factory( $row->job_cmd, $title,
+ self::extractBlob( $row->job_params ), $row->job_id );
+ $job->metadata['id'] = $row->job_id;
+ $job->metadata['timestamp'] = $row->job_timestamp;
+ break; // done
+ } while ( true );
+
+ if ( !$job || mt_rand( 0, 9 ) == 0 ) {
+ // Handled jobs that need to be recycled/deleted;
+ // any recycled jobs will be picked up next attempt
+ $this->recycleAndDeleteStaleJobs();
+ }
+ } catch ( DBError $e ) {
+ $this->throwDBException( $e );
+ }
+
+ return $job;
+ }
+
+ /**
+ * Reserve a row with a single UPDATE without holding row locks over RTTs...
+ *
+ * @param string $uuid 32 char hex string
+ * @param int $rand Random unsigned integer (31 bits)
+ * @param bool $gte Search for job_random >= $random (otherwise job_random <= $random)
+ * @return stdClass|bool Row|false
+ */
+ protected function claimRandom( $uuid, $rand, $gte ) {
+ $dbw = $this->getMasterDB();
+ // Check cache to see if the queue has <= OFFSET items
+ $tinyQueue = $this->cache->get( $this->getCacheKey( 'small' ) );
+
+ $row = false; // the row acquired
+ $invertedDirection = false; // whether one job_random direction was already scanned
+ // This uses a replication safe method for acquiring jobs. One could use UPDATE+LIMIT
+ // instead, but that either uses ORDER BY (in which case it deadlocks in MySQL) or is
+ // not replication safe. Due to https://bugs.mysql.com/bug.php?id=6980, subqueries cannot
+ // be used here with MySQL.
+ do {
+ if ( $tinyQueue ) { // queue has <= MAX_OFFSET rows
+ // For small queues, using OFFSET will overshoot and return no rows more often.
+ // Instead, this uses job_random to pick a row (possibly checking both directions).
+ $ineq = $gte ? '>=' : '<=';
+ $dir = $gte ? 'ASC' : 'DESC';
+ $row = $dbw->selectRow( 'job', self::selectFields(), // find a random job
+ [
+ 'job_cmd' => $this->type,
+ 'job_token' => '', // unclaimed
+ "job_random {$ineq} {$dbw->addQuotes( $rand )}" ],
+ __METHOD__,
+ [ 'ORDER BY' => "job_random {$dir}" ]
+ );
+ if ( !$row && !$invertedDirection ) {
+ $gte = !$gte;
+ $invertedDirection = true;
+ continue; // try the other direction
+ }
+ } else { // table *may* have >= MAX_OFFSET rows
+ // T44614: "ORDER BY job_random" with a job_random inequality causes high CPU
+ // in MySQL if there are many rows for some reason. This uses a small OFFSET
+ // instead of job_random for reducing excess claim retries.
+ $row = $dbw->selectRow( 'job', self::selectFields(), // find a random job
+ [
+ 'job_cmd' => $this->type,
+ 'job_token' => '', // unclaimed
+ ],
+ __METHOD__,
+ [ 'OFFSET' => mt_rand( 0, self::MAX_OFFSET ) ]
+ );
+ if ( !$row ) {
+ $tinyQueue = true; // we know the queue must have <= MAX_OFFSET rows
+ $this->cache->set( $this->getCacheKey( 'small' ), 1, 30 );
+ continue; // use job_random
+ }
+ }
+
+ if ( $row ) { // claim the job
+ $dbw->update( 'job', // update by PK
+ [
+ 'job_token' => $uuid,
+ 'job_token_timestamp' => $dbw->timestamp(),
+ 'job_attempts = job_attempts+1' ],
+ [ 'job_cmd' => $this->type, 'job_id' => $row->job_id, 'job_token' => '' ],
+ __METHOD__
+ );
+ // This might get raced out by another runner when claiming the previously
+ // selected row. The use of job_random should minimize this problem, however.
+ if ( !$dbw->affectedRows() ) {
+ $row = false; // raced out
+ }
+ } else {
+ break; // nothing to do
+ }
+ } while ( !$row );
+
+ return $row;
+ }
+
+ /**
+ * Reserve a row with a single UPDATE without holding row locks over RTTs...
+ *
+ * @param string $uuid 32 char hex string
+ * @return stdClass|bool Row|false
+ */
+ protected function claimOldest( $uuid ) {
+ $dbw = $this->getMasterDB();
+
+ $row = false; // the row acquired
+ do {
+ if ( $dbw->getType() === 'mysql' ) {
+ // Per https://bugs.mysql.com/bug.php?id=6980, we can't use subqueries on the
+ // same table being changed in an UPDATE query in MySQL (gives Error: 1093).
+ // Oracle and Postgre have no such limitation. However, MySQL offers an
+ // alternative here by supporting ORDER BY + LIMIT for UPDATE queries.
+ $dbw->query( "UPDATE {$dbw->tableName( 'job' )} " .
+ "SET " .
+ "job_token = {$dbw->addQuotes( $uuid ) }, " .
+ "job_token_timestamp = {$dbw->addQuotes( $dbw->timestamp() )}, " .
+ "job_attempts = job_attempts+1 " .
+ "WHERE ( " .
+ "job_cmd = {$dbw->addQuotes( $this->type )} " .
+ "AND job_token = {$dbw->addQuotes( '' )} " .
+ ") ORDER BY job_id ASC LIMIT 1",
+ __METHOD__
+ );
+ } else {
+ // Use a subquery to find the job, within an UPDATE to claim it.
+ // This uses as much of the DB wrapper functions as possible.
+ $dbw->update( 'job',
+ [
+ 'job_token' => $uuid,
+ 'job_token_timestamp' => $dbw->timestamp(),
+ 'job_attempts = job_attempts+1' ],
+ [ 'job_id = (' .
+ $dbw->selectSQLText( 'job', 'job_id',
+ [ 'job_cmd' => $this->type, 'job_token' => '' ],
+ __METHOD__,
+ [ 'ORDER BY' => 'job_id ASC', 'LIMIT' => 1 ] ) .
+ ')'
+ ],
+ __METHOD__
+ );
+ }
+ // Fetch any row that we just reserved...
+ if ( $dbw->affectedRows() ) {
+ $row = $dbw->selectRow( 'job', self::selectFields(),
+ [ 'job_cmd' => $this->type, 'job_token' => $uuid ], __METHOD__
+ );
+ if ( !$row ) { // raced out by duplicate job removal
+ wfDebug( "Row deleted as duplicate by another process.\n" );
+ }
+ } else {
+ break; // nothing to do
+ }
+ } while ( !$row );
+
+ return $row;
+ }
+
+ /**
+ * @see JobQueue::doAck()
+ * @param Job $job
+ * @throws MWException
+ */
+ protected function doAck( Job $job ) {
+ if ( !isset( $job->metadata['id'] ) ) {
+ throw new MWException( "Job of type '{$job->getType()}' has no ID." );
+ }
+
+ $dbw = $this->getMasterDB();
+ try {
+ $autoTrx = $dbw->getFlag( DBO_TRX ); // get current setting
+ $dbw->clearFlag( DBO_TRX ); // make each query its own transaction
+ $scopedReset = new ScopedCallback( function () use ( $dbw, $autoTrx ) {
+ $dbw->setFlag( $autoTrx ? DBO_TRX : 0 ); // restore old setting
+ } );
+
+ // Delete a row with a single DELETE without holding row locks over RTTs...
+ $dbw->delete( 'job',
+ [ 'job_cmd' => $this->type, 'job_id' => $job->metadata['id'] ], __METHOD__ );
+
+ JobQueue::incrStats( 'acks', $this->type );
+ } catch ( DBError $e ) {
+ $this->throwDBException( $e );
+ }
+ }
+
+ /**
+ * @see JobQueue::doDeduplicateRootJob()
+ * @param IJobSpecification $job
+ * @throws MWException
+ * @return bool
+ */
+ protected function doDeduplicateRootJob( IJobSpecification $job ) {
+ $params = $job->getParams();
+ if ( !isset( $params['rootJobSignature'] ) ) {
+ throw new MWException( "Cannot register root job; missing 'rootJobSignature'." );
+ } elseif ( !isset( $params['rootJobTimestamp'] ) ) {
+ throw new MWException( "Cannot register root job; missing 'rootJobTimestamp'." );
+ }
+ $key = $this->getRootJobCacheKey( $params['rootJobSignature'] );
+ // Callers should call batchInsert() and then this function so that if the insert
+ // fails, the de-duplication registration will be aborted. Since the insert is
+ // deferred till "transaction idle", do the same here, so that the ordering is
+ // maintained. Having only the de-duplication registration succeed would cause
+ // jobs to become no-ops without any actual jobs that made them redundant.
+ $dbw = $this->getMasterDB();
+ $cache = $this->dupCache;
+ $dbw->onTransactionIdle(
+ function () use ( $cache, $params, $key, $dbw ) {
+ $timestamp = $cache->get( $key ); // current last timestamp of this job
+ if ( $timestamp && $timestamp >= $params['rootJobTimestamp'] ) {
+ return true; // a newer version of this root job was enqueued
+ }
+
+ // Update the timestamp of the last root job started at the location...
+ return $cache->set( $key, $params['rootJobTimestamp'], JobQueueDB::ROOTJOB_TTL );
+ },
+ __METHOD__
+ );
+
+ return true;
+ }
+
+ /**
+ * @see JobQueue::doDelete()
+ * @return bool
+ */
+ protected function doDelete() {
+ $dbw = $this->getMasterDB();
+ try {
+ $dbw->delete( 'job', [ 'job_cmd' => $this->type ] );
+ } catch ( DBError $e ) {
+ $this->throwDBException( $e );
+ }
+
+ return true;
+ }
+
+ /**
+ * @see JobQueue::doWaitForBackups()
+ * @return void
+ */
+ protected function doWaitForBackups() {
+ $lbFactory = MediaWikiServices::getInstance()->getDBLoadBalancerFactory();
+ $lbFactory->waitForReplication( [ 'wiki' => $this->wiki, 'cluster' => $this->cluster ] );
+ }
+
+ /**
+ * @return void
+ */
+ protected function doFlushCaches() {
+ foreach ( [ 'size', 'acquiredcount' ] as $type ) {
+ $this->cache->delete( $this->getCacheKey( $type ) );
+ }
+ }
+
+ /**
+ * @see JobQueue::getAllQueuedJobs()
+ * @return Iterator
+ */
+ public function getAllQueuedJobs() {
+ return $this->getJobIterator( [ 'job_cmd' => $this->getType(), 'job_token' => '' ] );
+ }
+
+ /**
+ * @see JobQueue::getAllAcquiredJobs()
+ * @return Iterator
+ */
+ public function getAllAcquiredJobs() {
+ return $this->getJobIterator( [ 'job_cmd' => $this->getType(), "job_token > ''" ] );
+ }
+
+ /**
+ * @param array $conds Query conditions
+ * @return Iterator
+ */
+ protected function getJobIterator( array $conds ) {
+ $dbr = $this->getReplicaDB();
+ try {
+ return new MappedIterator(
+ $dbr->select( 'job', self::selectFields(), $conds ),
+ function ( $row ) {
+ $job = Job::factory(
+ $row->job_cmd,
+ Title::makeTitle( $row->job_namespace, $row->job_title ),
+ strlen( $row->job_params ) ? unserialize( $row->job_params ) : []
+ );
+ $job->metadata['id'] = $row->job_id;
+ $job->metadata['timestamp'] = $row->job_timestamp;
+
+ return $job;
+ }
+ );
+ } catch ( DBError $e ) {
+ $this->throwDBException( $e );
+ }
+ }
+
+ public function getCoalesceLocationInternal() {
+ return $this->cluster
+ ? "DBCluster:{$this->cluster}:{$this->wiki}"
+ : "LBFactory:{$this->wiki}";
+ }
+
+ protected function doGetSiblingQueuesWithJobs( array $types ) {
+ $dbr = $this->getReplicaDB();
+ // @note: this does not check whether the jobs are claimed or not.
+ // This is useful so JobQueueGroup::pop() also sees queues that only
+ // have stale jobs. This lets recycleAndDeleteStaleJobs() re-enqueue
+ // failed jobs so that they can be popped again for that edge case.
+ $res = $dbr->select( 'job', 'DISTINCT job_cmd',
+ [ 'job_cmd' => $types ], __METHOD__ );
+
+ $types = [];
+ foreach ( $res as $row ) {
+ $types[] = $row->job_cmd;
+ }
+
+ return $types;
+ }
+
+ protected function doGetSiblingQueueSizes( array $types ) {
+ $dbr = $this->getReplicaDB();
+ $res = $dbr->select( 'job', [ 'job_cmd', 'COUNT(*) AS count' ],
+ [ 'job_cmd' => $types ], __METHOD__, [ 'GROUP BY' => 'job_cmd' ] );
+
+ $sizes = [];
+ foreach ( $res as $row ) {
+ $sizes[$row->job_cmd] = (int)$row->count;
+ }
+
+ return $sizes;
+ }
+
+ /**
+ * Recycle or destroy any jobs that have been claimed for too long
+ *
+ * @return int Number of jobs recycled/deleted
+ */
+ public function recycleAndDeleteStaleJobs() {
+ $now = time();
+ $count = 0; // affected rows
+ $dbw = $this->getMasterDB();
+
+ try {
+ if ( !$dbw->lock( "jobqueue-recycle-{$this->type}", __METHOD__, 1 ) ) {
+ return $count; // already in progress
+ }
+
+ // Remove claims on jobs acquired for too long if enabled...
+ if ( $this->claimTTL > 0 ) {
+ $claimCutoff = $dbw->timestamp( $now - $this->claimTTL );
+ // Get the IDs of jobs that have be claimed but not finished after too long.
+ // These jobs can be recycled into the queue by expiring the claim. Selecting
+ // the IDs first means that the UPDATE can be done by primary key (less deadlocks).
+ $res = $dbw->select( 'job', 'job_id',
+ [
+ 'job_cmd' => $this->type,
+ "job_token != {$dbw->addQuotes( '' )}", // was acquired
+ "job_token_timestamp < {$dbw->addQuotes( $claimCutoff )}", // stale
+ "job_attempts < {$dbw->addQuotes( $this->maxTries )}" ], // retries left
+ __METHOD__
+ );
+ $ids = array_map(
+ function ( $o ) {
+ return $o->job_id;
+ }, iterator_to_array( $res )
+ );
+ if ( count( $ids ) ) {
+ // Reset job_token for these jobs so that other runners will pick them up.
+ // Set the timestamp to the current time, as it is useful to now that the job
+ // was already tried before (the timestamp becomes the "released" time).
+ $dbw->update( 'job',
+ [
+ 'job_token' => '',
+ 'job_token_timestamp' => $dbw->timestamp( $now ) ], // time of release
+ [
+ 'job_id' => $ids ],
+ __METHOD__
+ );
+ $affected = $dbw->affectedRows();
+ $count += $affected;
+ JobQueue::incrStats( 'recycles', $this->type, $affected );
+ $this->aggr->notifyQueueNonEmpty( $this->wiki, $this->type );
+ }
+ }
+
+ // Just destroy any stale jobs...
+ $pruneCutoff = $dbw->timestamp( $now - self::MAX_AGE_PRUNE );
+ $conds = [
+ 'job_cmd' => $this->type,
+ "job_token != {$dbw->addQuotes( '' )}", // was acquired
+ "job_token_timestamp < {$dbw->addQuotes( $pruneCutoff )}" // stale
+ ];
+ if ( $this->claimTTL > 0 ) { // only prune jobs attempted too many times...
+ $conds[] = "job_attempts >= {$dbw->addQuotes( $this->maxTries )}";
+ }
+ // Get the IDs of jobs that are considered stale and should be removed. Selecting
+ // the IDs first means that the UPDATE can be done by primary key (less deadlocks).
+ $res = $dbw->select( 'job', 'job_id', $conds, __METHOD__ );
+ $ids = array_map(
+ function ( $o ) {
+ return $o->job_id;
+ }, iterator_to_array( $res )
+ );
+ if ( count( $ids ) ) {
+ $dbw->delete( 'job', [ 'job_id' => $ids ], __METHOD__ );
+ $affected = $dbw->affectedRows();
+ $count += $affected;
+ JobQueue::incrStats( 'abandons', $this->type, $affected );
+ }
+
+ $dbw->unlock( "jobqueue-recycle-{$this->type}", __METHOD__ );
+ } catch ( DBError $e ) {
+ $this->throwDBException( $e );
+ }
+
+ return $count;
+ }
+
+ /**
+ * @param IJobSpecification $job
+ * @return array
+ */
+ protected function insertFields( IJobSpecification $job ) {
+ $dbw = $this->getMasterDB();
+
+ return [
+ // Fields that describe the nature of the job
+ 'job_cmd' => $job->getType(),
+ 'job_namespace' => $job->getTitle()->getNamespace(),
+ 'job_title' => $job->getTitle()->getDBkey(),
+ 'job_params' => self::makeBlob( $job->getParams() ),
+ // Additional job metadata
+ 'job_timestamp' => $dbw->timestamp(),
+ 'job_sha1' => Wikimedia\base_convert(
+ sha1( serialize( $job->getDeduplicationInfo() ) ),
+ 16, 36, 31
+ ),
+ 'job_random' => mt_rand( 0, self::MAX_JOB_RANDOM )
+ ];
+ }
+
+ /**
+ * @throws JobQueueConnectionError
+ * @return DBConnRef
+ */
+ protected function getReplicaDB() {
+ try {
+ return $this->getDB( DB_REPLICA );
+ } catch ( DBConnectionError $e ) {
+ throw new JobQueueConnectionError( "DBConnectionError:" . $e->getMessage() );
+ }
+ }
+
+ /**
+ * @throws JobQueueConnectionError
+ * @return DBConnRef
+ */
+ protected function getMasterDB() {
+ try {
+ return $this->getDB( DB_MASTER );
+ } catch ( DBConnectionError $e ) {
+ throw new JobQueueConnectionError( "DBConnectionError:" . $e->getMessage() );
+ }
+ }
+
+ /**
+ * @param int $index (DB_REPLICA/DB_MASTER)
+ * @return DBConnRef
+ */
+ protected function getDB( $index ) {
+ $lbFactory = MediaWikiServices::getInstance()->getDBLoadBalancerFactory();
+ $lb = ( $this->cluster !== false )
+ ? $lbFactory->getExternalLB( $this->cluster )
+ : $lbFactory->getMainLB( $this->wiki );
+
+ return ( $lb->getServerType( $lb->getWriterIndex() ) !== 'sqlite' )
+ // Keep a separate connection to avoid contention and deadlocks;
+ // However, SQLite has the opposite behavior due to DB-level locking.
+ ? $lb->getConnectionRef( $index, [], $this->wiki, $lb::CONN_TRX_AUTOCOMMIT )
+ // Jobs insertion will be defered until the PRESEND stage to reduce contention.
+ : $lb->getConnectionRef( $index, [], $this->wiki );
+ }
+
+ /**
+ * @param string $property
+ * @return string
+ */
+ private function getCacheKey( $property ) {
+ list( $db, $prefix ) = wfSplitWikiID( $this->wiki );
+ $cluster = is_string( $this->cluster ) ? $this->cluster : 'main';
+
+ return wfForeignMemcKey( $db, $prefix, 'jobqueue', $cluster, $this->type, $property );
+ }
+
+ /**
+ * @param array|bool $params
+ * @return string
+ */
+ protected static function makeBlob( $params ) {
+ if ( $params !== false ) {
+ return serialize( $params );
+ } else {
+ return '';
+ }
+ }
+
+ /**
+ * @param string $blob
+ * @return bool|mixed
+ */
+ protected static function extractBlob( $blob ) {
+ if ( (string)$blob !== '' ) {
+ return unserialize( $blob );
+ } else {
+ return false;
+ }
+ }
+
+ /**
+ * @param DBError $e
+ * @throws JobQueueError
+ */
+ protected function throwDBException( DBError $e ) {
+ throw new JobQueueError( get_class( $e ) . ": " . $e->getMessage() );
+ }
+
+ /**
+ * Return the list of job fields that should be selected.
+ * @since 1.23
+ * @return array
+ */
+ public static function selectFields() {
+ return [
+ 'job_id',
+ 'job_cmd',
+ 'job_namespace',
+ 'job_title',
+ 'job_timestamp',
+ 'job_params',
+ 'job_random',
+ 'job_attempts',
+ 'job_token',
+ 'job_token_timestamp',
+ 'job_sha1',
+ ];
+ }
+}
diff --git a/www/wiki/includes/jobqueue/JobQueueFederated.php b/www/wiki/includes/jobqueue/JobQueueFederated.php
new file mode 100644
index 00000000..7f3b2b1d
--- /dev/null
+++ b/www/wiki/includes/jobqueue/JobQueueFederated.php
@@ -0,0 +1,496 @@
+<?php
+/**
+ * Job queue code for federated queues.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along
+ * with this program; if not, write to the Free Software Foundation, Inc.,
+ * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ * http://www.gnu.org/copyleft/gpl.html
+ *
+ * @file
+ */
+
+/**
+ * Class to handle enqueueing and running of background jobs for federated queues
+ *
+ * This class allows for queues to be partitioned into smaller queues.
+ * A partition is defined by the configuration for a JobQueue instance.
+ * For example, one can set $wgJobTypeConf['refreshLinks'] to point to a
+ * JobQueueFederated instance, which itself would consist of three JobQueueRedis
+ * instances, each using their own redis server. This would allow for the jobs
+ * to be split (evenly or based on weights) across multiple servers if a single
+ * server becomes impractical or expensive. Different JobQueue classes can be mixed.
+ *
+ * The basic queue configuration (e.g. "order", "claimTTL") of a federated queue
+ * is inherited by the partition queues. Additional configuration defines what
+ * section each wiki is in, what partition queues each section uses (and their weight),
+ * and the JobQueue configuration for each partition. Some sections might only need a
+ * single queue partition, like the sections for groups of small wikis.
+ *
+ * If used for performance, then $wgMainCacheType should be set to memcached/redis.
+ * Note that "fifo" cannot be used for the ordering, since the data is distributed.
+ * One can still use "timestamp" instead, as in "roughly timestamp ordered". Also,
+ * queue classes used by this should ignore down servers (with TTL) to avoid slowness.
+ *
+ * @ingroup JobQueue
+ * @since 1.22
+ */
+class JobQueueFederated extends JobQueue {
+ /** @var HashRing */
+ protected $partitionRing;
+ /** @var JobQueue[] (partition name => JobQueue) reverse sorted by weight */
+ protected $partitionQueues = [];
+
+ /** @var int Maximum number of partitions to try */
+ protected $maxPartitionsTry;
+
+ /**
+ * @param array $params Possible keys:
+ * - sectionsByWiki : A map of wiki IDs to section names.
+ * Wikis will default to using the section "default".
+ * - partitionsBySection : Map of section names to maps of (partition name => weight).
+ * A section called 'default' must be defined if not all wikis
+ * have explicitly defined sections.
+ * - configByPartition : Map of queue partition names to configuration arrays.
+ * These configuration arrays are passed to JobQueue::factory().
+ * The options set here are overridden by those passed to this
+ * the federated queue itself (e.g. 'order' and 'claimTTL').
+ * - maxPartitionsTry : Maximum number of times to attempt job insertion using
+ * different partition queues. This improves availability
+ * during failure, at the cost of added latency and somewhat
+ * less reliable job de-duplication mechanisms.
+ * @throws MWException
+ */
+ protected function __construct( array $params ) {
+ parent::__construct( $params );
+ $section = isset( $params['sectionsByWiki'][$this->wiki] )
+ ? $params['sectionsByWiki'][$this->wiki]
+ : 'default';
+ if ( !isset( $params['partitionsBySection'][$section] ) ) {
+ throw new MWException( "No configuration for section '$section'." );
+ }
+ $this->maxPartitionsTry = isset( $params['maxPartitionsTry'] )
+ ? $params['maxPartitionsTry']
+ : 2;
+ // Get the full partition map
+ $partitionMap = $params['partitionsBySection'][$section];
+ arsort( $partitionMap, SORT_NUMERIC );
+ // Get the config to pass to merge into each partition queue config
+ $baseConfig = $params;
+ foreach ( [ 'class', 'sectionsByWiki', 'maxPartitionsTry',
+ 'partitionsBySection', 'configByPartition', ] as $o
+ ) {
+ unset( $baseConfig[$o] ); // partition queue doesn't care about this
+ }
+ // The class handles all aggregator calls already
+ unset( $baseConfig['aggregator'] );
+ // Get the partition queue objects
+ foreach ( $partitionMap as $partition => $w ) {
+ if ( !isset( $params['configByPartition'][$partition] ) ) {
+ throw new MWException( "No configuration for partition '$partition'." );
+ }
+ $this->partitionQueues[$partition] = JobQueue::factory(
+ $baseConfig + $params['configByPartition'][$partition] );
+ }
+ // Ring of all partitions
+ $this->partitionRing = new HashRing( $partitionMap );
+ }
+
+ protected function supportedOrders() {
+ // No FIFO due to partitioning, though "rough timestamp order" is supported
+ return [ 'undefined', 'random', 'timestamp' ];
+ }
+
+ protected function optimalOrder() {
+ return 'undefined'; // defer to the partitions
+ }
+
+ protected function supportsDelayedJobs() {
+ foreach ( $this->partitionQueues as $queue ) {
+ if ( !$queue->supportsDelayedJobs() ) {
+ return false;
+ }
+ }
+
+ return true;
+ }
+
+ protected function doIsEmpty() {
+ $empty = true;
+ $failed = 0;
+ foreach ( $this->partitionQueues as $queue ) {
+ try {
+ $empty = $empty && $queue->doIsEmpty();
+ } catch ( JobQueueError $e ) {
+ ++$failed;
+ $this->logException( $e );
+ }
+ }
+ $this->throwErrorIfAllPartitionsDown( $failed );
+
+ return $empty;
+ }
+
+ protected function doGetSize() {
+ return $this->getCrossPartitionSum( 'size', 'doGetSize' );
+ }
+
+ protected function doGetAcquiredCount() {
+ return $this->getCrossPartitionSum( 'acquiredcount', 'doGetAcquiredCount' );
+ }
+
+ protected function doGetDelayedCount() {
+ return $this->getCrossPartitionSum( 'delayedcount', 'doGetDelayedCount' );
+ }
+
+ protected function doGetAbandonedCount() {
+ return $this->getCrossPartitionSum( 'abandonedcount', 'doGetAbandonedCount' );
+ }
+
+ /**
+ * @param string $type
+ * @param string $method
+ * @return int
+ */
+ protected function getCrossPartitionSum( $type, $method ) {
+ $count = 0;
+ $failed = 0;
+ foreach ( $this->partitionQueues as $queue ) {
+ try {
+ $count += $queue->$method();
+ } catch ( JobQueueError $e ) {
+ ++$failed;
+ $this->logException( $e );
+ }
+ }
+ $this->throwErrorIfAllPartitionsDown( $failed );
+
+ return $count;
+ }
+
+ protected function doBatchPush( array $jobs, $flags ) {
+ // Local ring variable that may be changed to point to a new ring on failure
+ $partitionRing = $this->partitionRing;
+ // Try to insert the jobs and update $partitionsTry on any failures.
+ // Retry to insert any remaning jobs again, ignoring the bad partitions.
+ $jobsLeft = $jobs;
+ // phpcs:ignore Generic.CodeAnalysis.ForLoopWithTestFunctionCall
+ for ( $i = $this->maxPartitionsTry; $i > 0 && count( $jobsLeft ); --$i ) {
+ try {
+ $partitionRing->getLiveLocationWeights();
+ } catch ( UnexpectedValueException $e ) {
+ break; // all servers down; nothing to insert to
+ }
+ $jobsLeft = $this->tryJobInsertions( $jobsLeft, $partitionRing, $flags );
+ }
+ if ( count( $jobsLeft ) ) {
+ throw new JobQueueError(
+ "Could not insert job(s), {$this->maxPartitionsTry} partitions tried." );
+ }
+ }
+
+ /**
+ * @param array $jobs
+ * @param HashRing &$partitionRing
+ * @param int $flags
+ * @throws JobQueueError
+ * @return array List of Job object that could not be inserted
+ */
+ protected function tryJobInsertions( array $jobs, HashRing &$partitionRing, $flags ) {
+ $jobsLeft = [];
+
+ // Because jobs are spread across partitions, per-job de-duplication needs
+ // to use a consistent hash to avoid allowing duplicate jobs per partition.
+ // When inserting a batch of de-duplicated jobs, QOS_ATOMIC is disregarded.
+ $uJobsByPartition = []; // (partition name => job list)
+ /** @var Job $job */
+ foreach ( $jobs as $key => $job ) {
+ if ( $job->ignoreDuplicates() ) {
+ $sha1 = sha1( serialize( $job->getDeduplicationInfo() ) );
+ $uJobsByPartition[$partitionRing->getLiveLocation( $sha1 )][] = $job;
+ unset( $jobs[$key] );
+ }
+ }
+ // Get the batches of jobs that are not de-duplicated
+ if ( $flags & self::QOS_ATOMIC ) {
+ $nuJobBatches = [ $jobs ]; // all or nothing
+ } else {
+ // Split the jobs into batches and spread them out over servers if there
+ // are many jobs. This helps keep the partitions even. Otherwise, send all
+ // the jobs to a single partition queue to avoids the extra connections.
+ $nuJobBatches = array_chunk( $jobs, 300 );
+ }
+
+ // Insert the de-duplicated jobs into the queues...
+ foreach ( $uJobsByPartition as $partition => $jobBatch ) {
+ /** @var JobQueue $queue */
+ $queue = $this->partitionQueues[$partition];
+ try {
+ $ok = true;
+ $queue->doBatchPush( $jobBatch, $flags | self::QOS_ATOMIC );
+ } catch ( JobQueueError $e ) {
+ $ok = false;
+ $this->logException( $e );
+ }
+ if ( !$ok ) {
+ if ( !$partitionRing->ejectFromLiveRing( $partition, 5 ) ) { // blacklist
+ throw new JobQueueError( "Could not insert job(s), no partitions available." );
+ }
+ $jobsLeft = array_merge( $jobsLeft, $jobBatch ); // not inserted
+ }
+ }
+
+ // Insert the jobs that are not de-duplicated into the queues...
+ foreach ( $nuJobBatches as $jobBatch ) {
+ $partition = ArrayUtils::pickRandom( $partitionRing->getLiveLocationWeights() );
+ $queue = $this->partitionQueues[$partition];
+ try {
+ $ok = true;
+ $queue->doBatchPush( $jobBatch, $flags | self::QOS_ATOMIC );
+ } catch ( JobQueueError $e ) {
+ $ok = false;
+ $this->logException( $e );
+ }
+ if ( !$ok ) {
+ if ( !$partitionRing->ejectFromLiveRing( $partition, 5 ) ) { // blacklist
+ throw new JobQueueError( "Could not insert job(s), no partitions available." );
+ }
+ $jobsLeft = array_merge( $jobsLeft, $jobBatch ); // not inserted
+ }
+ }
+
+ return $jobsLeft;
+ }
+
+ protected function doPop() {
+ $partitionsTry = $this->partitionRing->getLiveLocationWeights(); // (partition => weight)
+
+ $failed = 0;
+ while ( count( $partitionsTry ) ) {
+ $partition = ArrayUtils::pickRandom( $partitionsTry );
+ if ( $partition === false ) {
+ break; // all partitions at 0 weight
+ }
+
+ /** @var JobQueue $queue */
+ $queue = $this->partitionQueues[$partition];
+ try {
+ $job = $queue->pop();
+ } catch ( JobQueueError $e ) {
+ ++$failed;
+ $this->logException( $e );
+ $job = false;
+ }
+ if ( $job ) {
+ $job->metadata['QueuePartition'] = $partition;
+
+ return $job;
+ } else {
+ unset( $partitionsTry[$partition] ); // blacklist partition
+ }
+ }
+ $this->throwErrorIfAllPartitionsDown( $failed );
+
+ return false;
+ }
+
+ protected function doAck( Job $job ) {
+ if ( !isset( $job->metadata['QueuePartition'] ) ) {
+ throw new MWException( "The given job has no defined partition name." );
+ }
+
+ $this->partitionQueues[$job->metadata['QueuePartition']]->ack( $job );
+ }
+
+ protected function doIsRootJobOldDuplicate( Job $job ) {
+ $signature = $job->getRootJobParams()['rootJobSignature'];
+ $partition = $this->partitionRing->getLiveLocation( $signature );
+ try {
+ return $this->partitionQueues[$partition]->doIsRootJobOldDuplicate( $job );
+ } catch ( JobQueueError $e ) {
+ if ( $this->partitionRing->ejectFromLiveRing( $partition, 5 ) ) {
+ $partition = $this->partitionRing->getLiveLocation( $signature );
+ return $this->partitionQueues[$partition]->doIsRootJobOldDuplicate( $job );
+ }
+ }
+
+ return false;
+ }
+
+ protected function doDeduplicateRootJob( IJobSpecification $job ) {
+ $signature = $job->getRootJobParams()['rootJobSignature'];
+ $partition = $this->partitionRing->getLiveLocation( $signature );
+ try {
+ return $this->partitionQueues[$partition]->doDeduplicateRootJob( $job );
+ } catch ( JobQueueError $e ) {
+ if ( $this->partitionRing->ejectFromLiveRing( $partition, 5 ) ) {
+ $partition = $this->partitionRing->getLiveLocation( $signature );
+ return $this->partitionQueues[$partition]->doDeduplicateRootJob( $job );
+ }
+ }
+
+ return false;
+ }
+
+ protected function doDelete() {
+ $failed = 0;
+ /** @var JobQueue $queue */
+ foreach ( $this->partitionQueues as $queue ) {
+ try {
+ $queue->doDelete();
+ } catch ( JobQueueError $e ) {
+ ++$failed;
+ $this->logException( $e );
+ }
+ }
+ $this->throwErrorIfAllPartitionsDown( $failed );
+ return true;
+ }
+
+ protected function doWaitForBackups() {
+ $failed = 0;
+ /** @var JobQueue $queue */
+ foreach ( $this->partitionQueues as $queue ) {
+ try {
+ $queue->waitForBackups();
+ } catch ( JobQueueError $e ) {
+ ++$failed;
+ $this->logException( $e );
+ }
+ }
+ $this->throwErrorIfAllPartitionsDown( $failed );
+ }
+
+ protected function doFlushCaches() {
+ /** @var JobQueue $queue */
+ foreach ( $this->partitionQueues as $queue ) {
+ $queue->doFlushCaches();
+ }
+ }
+
+ public function getAllQueuedJobs() {
+ $iterator = new AppendIterator();
+
+ /** @var JobQueue $queue */
+ foreach ( $this->partitionQueues as $queue ) {
+ $iterator->append( $queue->getAllQueuedJobs() );
+ }
+
+ return $iterator;
+ }
+
+ public function getAllDelayedJobs() {
+ $iterator = new AppendIterator();
+
+ /** @var JobQueue $queue */
+ foreach ( $this->partitionQueues as $queue ) {
+ $iterator->append( $queue->getAllDelayedJobs() );
+ }
+
+ return $iterator;
+ }
+
+ public function getAllAcquiredJobs() {
+ $iterator = new AppendIterator();
+
+ /** @var JobQueue $queue */
+ foreach ( $this->partitionQueues as $queue ) {
+ $iterator->append( $queue->getAllAcquiredJobs() );
+ }
+
+ return $iterator;
+ }
+
+ public function getAllAbandonedJobs() {
+ $iterator = new AppendIterator();
+
+ /** @var JobQueue $queue */
+ foreach ( $this->partitionQueues as $queue ) {
+ $iterator->append( $queue->getAllAbandonedJobs() );
+ }
+
+ return $iterator;
+ }
+
+ public function getCoalesceLocationInternal() {
+ return "JobQueueFederated:wiki:{$this->wiki}" .
+ sha1( serialize( array_keys( $this->partitionQueues ) ) );
+ }
+
+ protected function doGetSiblingQueuesWithJobs( array $types ) {
+ $result = [];
+
+ $failed = 0;
+ /** @var JobQueue $queue */
+ foreach ( $this->partitionQueues as $queue ) {
+ try {
+ $nonEmpty = $queue->doGetSiblingQueuesWithJobs( $types );
+ if ( is_array( $nonEmpty ) ) {
+ $result = array_unique( array_merge( $result, $nonEmpty ) );
+ } else {
+ return null; // not supported on all partitions; bail
+ }
+ if ( count( $result ) == count( $types ) ) {
+ break; // short-circuit
+ }
+ } catch ( JobQueueError $e ) {
+ ++$failed;
+ $this->logException( $e );
+ }
+ }
+ $this->throwErrorIfAllPartitionsDown( $failed );
+
+ return array_values( $result );
+ }
+
+ protected function doGetSiblingQueueSizes( array $types ) {
+ $result = [];
+ $failed = 0;
+ /** @var JobQueue $queue */
+ foreach ( $this->partitionQueues as $queue ) {
+ try {
+ $sizes = $queue->doGetSiblingQueueSizes( $types );
+ if ( is_array( $sizes ) ) {
+ foreach ( $sizes as $type => $size ) {
+ $result[$type] = isset( $result[$type] ) ? $result[$type] + $size : $size;
+ }
+ } else {
+ return null; // not supported on all partitions; bail
+ }
+ } catch ( JobQueueError $e ) {
+ ++$failed;
+ $this->logException( $e );
+ }
+ }
+ $this->throwErrorIfAllPartitionsDown( $failed );
+
+ return $result;
+ }
+
+ protected function logException( Exception $e ) {
+ wfDebugLog( 'JobQueueFederated', $e->getMessage() . "\n" . $e->getTraceAsString() );
+ }
+
+ /**
+ * Throw an error if no partitions available
+ *
+ * @param int $down The number of up partitions down
+ * @return void
+ * @throws JobQueueError
+ */
+ protected function throwErrorIfAllPartitionsDown( $down ) {
+ if ( $down >= count( $this->partitionQueues ) ) {
+ throw new JobQueueError( 'No queue partitions available.' );
+ }
+ }
+}
diff --git a/www/wiki/includes/jobqueue/JobQueueGroup.php b/www/wiki/includes/jobqueue/JobQueueGroup.php
new file mode 100644
index 00000000..df55fc57
--- /dev/null
+++ b/www/wiki/includes/jobqueue/JobQueueGroup.php
@@ -0,0 +1,480 @@
+<?php
+/**
+ * Job queue base code.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along
+ * with this program; if not, write to the Free Software Foundation, Inc.,
+ * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ * http://www.gnu.org/copyleft/gpl.html
+ *
+ * @file
+ */
+
+/**
+ * Class to handle enqueueing of background jobs
+ *
+ * @ingroup JobQueue
+ * @since 1.21
+ */
+class JobQueueGroup {
+ /** @var JobQueueGroup[] */
+ protected static $instances = [];
+
+ /** @var ProcessCacheLRU */
+ protected $cache;
+
+ /** @var string Wiki DB domain ID */
+ protected $domain;
+ /** @var string|bool Read only rationale (or false if r/w) */
+ protected $readOnlyReason;
+ /** @var bool Whether the wiki is not recognized in configuration */
+ protected $invalidWiki = false;
+
+ /** @var array Map of (bucket => (queue => JobQueue, types => list of types) */
+ protected $coalescedQueues;
+
+ /** @var Job[] */
+ protected $bufferedJobs = [];
+
+ const TYPE_DEFAULT = 1; // integer; jobs popped by default
+ const TYPE_ANY = 2; // integer; any job
+
+ const USE_CACHE = 1; // integer; use process or persistent cache
+
+ const PROC_CACHE_TTL = 15; // integer; seconds
+
+ const CACHE_VERSION = 1; // integer; cache version
+
+ /**
+ * @param string $domain Wiki DB domain ID
+ * @param string|bool $readOnlyReason Read-only reason or false
+ */
+ protected function __construct( $domain, $readOnlyReason ) {
+ $this->domain = $domain;
+ $this->readOnlyReason = $readOnlyReason;
+ $this->cache = new ProcessCacheLRU( 10 );
+ }
+
+ /**
+ * @param bool|string $domain Wiki domain ID
+ * @return JobQueueGroup
+ */
+ public static function singleton( $domain = false ) {
+ global $wgLocalDatabases;
+
+ if ( $domain === false ) {
+ $domain = WikiMap::getCurrentWikiDbDomain()->getId();
+ }
+
+ if ( !isset( self::$instances[$domain] ) ) {
+ self::$instances[$domain] = new self( $domain, wfConfiguredReadOnlyReason() );
+ // Make sure jobs are not getting pushed to bogus wikis. This can confuse
+ // the job runner system into spawning endless RPC requests that fail (T171371).
+ $wikiId = WikiMap::getWikiIdFromDomain( $domain );
+ if (
+ !WikiMap::isCurrentWikiDbDomain( $domain ) &&
+ !in_array( $wikiId, $wgLocalDatabases )
+ ) {
+ self::$instances[$domain]->invalidWiki = true;
+ }
+ }
+
+ return self::$instances[$domain];
+ }
+
+ /**
+ * Destroy the singleton instances
+ *
+ * @return void
+ */
+ public static function destroySingletons() {
+ self::$instances = [];
+ }
+
+ /**
+ * Get the job queue object for a given queue type
+ *
+ * @param string $type
+ * @return JobQueue
+ */
+ public function get( $type ) {
+ global $wgJobTypeConf;
+
+ $conf = [ 'wiki' => $this->domain, 'type' => $type ];
+ if ( isset( $wgJobTypeConf[$type] ) ) {
+ $conf = $conf + $wgJobTypeConf[$type];
+ } else {
+ $conf = $conf + $wgJobTypeConf['default'];
+ }
+ $conf['aggregator'] = JobQueueAggregator::singleton();
+ if ( $this->readOnlyReason !== false ) {
+ $conf['readOnlyReason'] = $this->readOnlyReason;
+ }
+
+ return JobQueue::factory( $conf );
+ }
+
+ /**
+ * Insert jobs into the respective queues of which they belong
+ *
+ * This inserts the jobs into the queue specified by $wgJobTypeConf
+ * and updates the aggregate job queue information cache as needed.
+ *
+ * @param IJobSpecification|IJobSpecification[] $jobs A single Job or a list of Jobs
+ * @throws InvalidArgumentException
+ * @return void
+ */
+ public function push( $jobs ) {
+ global $wgJobTypesExcludedFromDefaultQueue;
+
+ if ( $this->invalidWiki ) {
+ // Do not enqueue job that cannot be run (T171371)
+ $e = new LogicException( "Domain '{$this->domain}' is not recognized." );
+ MWExceptionHandler::logException( $e );
+ return;
+ }
+
+ $jobs = is_array( $jobs ) ? $jobs : [ $jobs ];
+ if ( !count( $jobs ) ) {
+ return;
+ }
+
+ $this->assertValidJobs( $jobs );
+
+ $jobsByType = []; // (job type => list of jobs)
+ foreach ( $jobs as $job ) {
+ $jobsByType[$job->getType()][] = $job;
+ }
+
+ foreach ( $jobsByType as $type => $jobs ) {
+ $this->get( $type )->push( $jobs );
+ }
+
+ if ( $this->cache->has( 'queues-ready', 'list' ) ) {
+ $list = $this->cache->get( 'queues-ready', 'list' );
+ if ( count( array_diff( array_keys( $jobsByType ), $list ) ) ) {
+ $this->cache->clear( 'queues-ready' );
+ }
+ }
+
+ $cache = ObjectCache::getLocalClusterInstance();
+ $cache->set(
+ $cache->makeGlobalKey( 'jobqueue', $this->domain, 'hasjobs', self::TYPE_ANY ),
+ 'true',
+ 15
+ );
+ if ( array_diff( array_keys( $jobsByType ), $wgJobTypesExcludedFromDefaultQueue ) ) {
+ $cache->set(
+ $cache->makeGlobalKey( 'jobqueue', $this->domain, 'hasjobs', self::TYPE_DEFAULT ),
+ 'true',
+ 15
+ );
+ }
+ }
+
+ /**
+ * Buffer jobs for insertion via push() or call it now if in CLI mode
+ *
+ * Note that pushLazyJobs() is registered as a deferred update just before
+ * DeferredUpdates::doUpdates() in MediaWiki and JobRunner classes in order
+ * to be executed as the very last deferred update (T100085, T154425).
+ *
+ * @param IJobSpecification|IJobSpecification[] $jobs A single Job or a list of Jobs
+ * @return void
+ * @since 1.26
+ */
+ public function lazyPush( $jobs ) {
+ if ( $this->invalidWiki ) {
+ // Do not enqueue job that cannot be run (T171371)
+ throw new LogicException( "Domain '{$this->domain}' is not recognized." );
+ }
+
+ if ( PHP_SAPI === 'cli' || PHP_SAPI === 'phpdbg' ) {
+ $this->push( $jobs );
+ return;
+ }
+
+ $jobs = is_array( $jobs ) ? $jobs : [ $jobs ];
+
+ // Throw errors now instead of on push(), when other jobs may be buffered
+ $this->assertValidJobs( $jobs );
+
+ $this->bufferedJobs = array_merge( $this->bufferedJobs, $jobs );
+ }
+
+ /**
+ * Push all jobs buffered via lazyPush() into their respective queues
+ *
+ * @return void
+ * @since 1.26
+ */
+ public static function pushLazyJobs() {
+ foreach ( self::$instances as $group ) {
+ try {
+ $group->push( $group->bufferedJobs );
+ $group->bufferedJobs = [];
+ } catch ( Exception $e ) {
+ // Get in as many jobs as possible and let other post-send updates happen
+ MWExceptionHandler::logException( $e );
+ }
+ }
+ }
+
+ /**
+ * Pop a job off one of the job queues
+ *
+ * This pops a job off a queue as specified by $wgJobTypeConf and
+ * updates the aggregate job queue information cache as needed.
+ *
+ * @param int|string $qtype JobQueueGroup::TYPE_* constant or job type string
+ * @param int $flags Bitfield of JobQueueGroup::USE_* constants
+ * @param array $blacklist List of job types to ignore
+ * @return Job|bool Returns false on failure
+ */
+ public function pop( $qtype = self::TYPE_DEFAULT, $flags = 0, array $blacklist = [] ) {
+ $job = false;
+
+ if ( is_string( $qtype ) ) { // specific job type
+ if ( !in_array( $qtype, $blacklist ) ) {
+ $job = $this->get( $qtype )->pop();
+ }
+ } else { // any job in the "default" jobs types
+ if ( $flags & self::USE_CACHE ) {
+ if ( !$this->cache->has( 'queues-ready', 'list', self::PROC_CACHE_TTL ) ) {
+ $this->cache->set( 'queues-ready', 'list', $this->getQueuesWithJobs() );
+ }
+ $types = $this->cache->get( 'queues-ready', 'list' );
+ } else {
+ $types = $this->getQueuesWithJobs();
+ }
+
+ if ( $qtype == self::TYPE_DEFAULT ) {
+ $types = array_intersect( $types, $this->getDefaultQueueTypes() );
+ }
+
+ $types = array_diff( $types, $blacklist ); // avoid selected types
+ shuffle( $types ); // avoid starvation
+
+ foreach ( $types as $type ) { // for each queue...
+ $job = $this->get( $type )->pop();
+ if ( $job ) { // found
+ break;
+ } else { // not found
+ $this->cache->clear( 'queues-ready' );
+ }
+ }
+ }
+
+ return $job;
+ }
+
+ /**
+ * Acknowledge that a job was completed
+ *
+ * @param Job $job
+ * @return void
+ */
+ public function ack( Job $job ) {
+ $this->get( $job->getType() )->ack( $job );
+ }
+
+ /**
+ * Register the "root job" of a given job into the queue for de-duplication.
+ * This should only be called right *after* all the new jobs have been inserted.
+ *
+ * @param Job $job
+ * @return bool
+ */
+ public function deduplicateRootJob( Job $job ) {
+ return $this->get( $job->getType() )->deduplicateRootJob( $job );
+ }
+
+ /**
+ * Wait for any replica DBs or backup queue servers to catch up.
+ *
+ * This does nothing for certain queue classes.
+ *
+ * @return void
+ */
+ public function waitForBackups() {
+ global $wgJobTypeConf;
+
+ // Try to avoid doing this more than once per queue storage medium
+ foreach ( $wgJobTypeConf as $type => $conf ) {
+ $this->get( $type )->waitForBackups();
+ }
+ }
+
+ /**
+ * Get the list of queue types
+ *
+ * @return array List of strings
+ */
+ public function getQueueTypes() {
+ return array_keys( $this->getCachedConfigVar( 'wgJobClasses' ) );
+ }
+
+ /**
+ * Get the list of default queue types
+ *
+ * @return array List of strings
+ */
+ public function getDefaultQueueTypes() {
+ global $wgJobTypesExcludedFromDefaultQueue;
+
+ return array_diff( $this->getQueueTypes(), $wgJobTypesExcludedFromDefaultQueue );
+ }
+
+ /**
+ * Check if there are any queues with jobs (this is cached)
+ *
+ * @param int $type JobQueueGroup::TYPE_* constant
+ * @return bool
+ * @since 1.23
+ */
+ public function queuesHaveJobs( $type = self::TYPE_ANY ) {
+ $cache = ObjectCache::getLocalClusterInstance();
+ $key = $cache->makeGlobalKey( 'jobqueue', $this->domain, 'hasjobs', $type );
+
+ $value = $cache->get( $key );
+ if ( $value === false ) {
+ $queues = $this->getQueuesWithJobs();
+ if ( $type == self::TYPE_DEFAULT ) {
+ $queues = array_intersect( $queues, $this->getDefaultQueueTypes() );
+ }
+ $value = count( $queues ) ? 'true' : 'false';
+ $cache->add( $key, $value, 15 );
+ }
+
+ return ( $value === 'true' );
+ }
+
+ /**
+ * Get the list of job types that have non-empty queues
+ *
+ * @return array List of job types that have non-empty queues
+ */
+ public function getQueuesWithJobs() {
+ $types = [];
+ foreach ( $this->getCoalescedQueues() as $info ) {
+ $nonEmpty = $info['queue']->getSiblingQueuesWithJobs( $this->getQueueTypes() );
+ if ( is_array( $nonEmpty ) ) { // batching features supported
+ $types = array_merge( $types, $nonEmpty );
+ } else { // we have to go through the queues in the bucket one-by-one
+ foreach ( $info['types'] as $type ) {
+ if ( !$this->get( $type )->isEmpty() ) {
+ $types[] = $type;
+ }
+ }
+ }
+ }
+
+ return $types;
+ }
+
+ /**
+ * Get the size of the queus for a list of job types
+ *
+ * @return array Map of (job type => size)
+ */
+ public function getQueueSizes() {
+ $sizeMap = [];
+ foreach ( $this->getCoalescedQueues() as $info ) {
+ $sizes = $info['queue']->getSiblingQueueSizes( $this->getQueueTypes() );
+ if ( is_array( $sizes ) ) { // batching features supported
+ $sizeMap = $sizeMap + $sizes;
+ } else { // we have to go through the queues in the bucket one-by-one
+ foreach ( $info['types'] as $type ) {
+ $sizeMap[$type] = $this->get( $type )->getSize();
+ }
+ }
+ }
+
+ return $sizeMap;
+ }
+
+ /**
+ * @return array
+ */
+ protected function getCoalescedQueues() {
+ global $wgJobTypeConf;
+
+ if ( $this->coalescedQueues === null ) {
+ $this->coalescedQueues = [];
+ foreach ( $wgJobTypeConf as $type => $conf ) {
+ $queue = JobQueue::factory(
+ [ 'wiki' => $this->domain, 'type' => 'null' ] + $conf );
+ $loc = $queue->getCoalesceLocationInternal();
+ if ( !isset( $this->coalescedQueues[$loc] ) ) {
+ $this->coalescedQueues[$loc]['queue'] = $queue;
+ $this->coalescedQueues[$loc]['types'] = [];
+ }
+ if ( $type === 'default' ) {
+ $this->coalescedQueues[$loc]['types'] = array_merge(
+ $this->coalescedQueues[$loc]['types'],
+ array_diff( $this->getQueueTypes(), array_keys( $wgJobTypeConf ) )
+ );
+ } else {
+ $this->coalescedQueues[$loc]['types'][] = $type;
+ }
+ }
+ }
+
+ return $this->coalescedQueues;
+ }
+
+ /**
+ * @param string $name
+ * @return mixed
+ */
+ private function getCachedConfigVar( $name ) {
+ // @TODO: cleanup this whole method with a proper config system
+ if ( WikiMap::isCurrentWikiDbDomain( $this->domain ) ) {
+ return $GLOBALS[$name]; // common case
+ } else {
+ $wiki = WikiMap::getWikiIdFromDomain( $this->domain );
+ $cache = ObjectCache::getMainWANInstance();
+ $value = $cache->getWithSetCallback(
+ $cache->makeGlobalKey( 'jobqueue', 'configvalue', $this->domain, $name ),
+ $cache::TTL_DAY + mt_rand( 0, $cache::TTL_DAY ),
+ function () use ( $wiki, $name ) {
+ global $wgConf;
+ // @TODO: use the full domain ID here
+ return [ 'v' => $wgConf->getConfig( $wiki, $name ) ];
+ },
+ [ 'pcTTL' => WANObjectCache::TTL_PROC_LONG ]
+ );
+ return $value['v'];
+ }
+ }
+
+ /**
+ * @param array $jobs
+ * @throws InvalidArgumentException
+ */
+ private function assertValidJobs( array $jobs ) {
+ foreach ( $jobs as $job ) { // sanity checks
+ if ( !( $job instanceof IJobSpecification ) ) {
+ throw new InvalidArgumentException( "Expected IJobSpecification objects" );
+ }
+ }
+ }
+
+ function __destruct() {
+ $n = count( $this->bufferedJobs );
+ if ( $n > 0 ) {
+ $type = implode( ', ', array_unique( array_map( 'get_class', $this->bufferedJobs ) ) );
+ trigger_error( __METHOD__ . ": $n buffered job(s) of type(s) $type never inserted." );
+ }
+ }
+}
diff --git a/www/wiki/includes/jobqueue/JobQueueMemory.php b/www/wiki/includes/jobqueue/JobQueueMemory.php
new file mode 100644
index 00000000..f9e2c3dc
--- /dev/null
+++ b/www/wiki/includes/jobqueue/JobQueueMemory.php
@@ -0,0 +1,230 @@
+<?php
+/**
+ * PHP memory-backed job queue code.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along
+ * with this program; if not, write to the Free Software Foundation, Inc.,
+ * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ * http://www.gnu.org/copyleft/gpl.html
+ *
+ * @file
+ */
+
+/**
+ * Class to handle job queues stored in PHP memory for testing
+ *
+ * JobQueueGroup does not remember every queue instance, so statically track it here
+ *
+ * @ingroup JobQueue
+ * @since 1.27
+ */
+class JobQueueMemory extends JobQueue {
+ /** @var array[] */
+ protected static $data = [];
+
+ /**
+ * @see JobQueue::doBatchPush
+ *
+ * @param IJobSpecification[] $jobs
+ * @param int $flags
+ */
+ protected function doBatchPush( array $jobs, $flags ) {
+ $unclaimed =& $this->getQueueData( 'unclaimed', [] );
+
+ foreach ( $jobs as $job ) {
+ if ( $job->ignoreDuplicates() ) {
+ $sha1 = Wikimedia\base_convert(
+ sha1( serialize( $job->getDeduplicationInfo() ) ),
+ 16, 36, 31
+ );
+ if ( !isset( $unclaimed[$sha1] ) ) {
+ $unclaimed[$sha1] = $job;
+ }
+ } else {
+ $unclaimed[] = $job;
+ }
+ }
+ }
+
+ /**
+ * @see JobQueue::supportedOrders
+ *
+ * @return string[]
+ */
+ protected function supportedOrders() {
+ return [ 'random', 'timestamp', 'fifo' ];
+ }
+
+ /**
+ * @see JobQueue::optimalOrder
+ *
+ * @return string
+ */
+ protected function optimalOrder() {
+ return 'fifo';
+ }
+
+ /**
+ * @see JobQueue::doIsEmpty
+ *
+ * @return bool
+ */
+ protected function doIsEmpty() {
+ return ( $this->doGetSize() == 0 );
+ }
+
+ /**
+ * @see JobQueue::doGetSize
+ *
+ * @return int
+ */
+ protected function doGetSize() {
+ $unclaimed = $this->getQueueData( 'unclaimed' );
+
+ return $unclaimed ? count( $unclaimed ) : 0;
+ }
+
+ /**
+ * @see JobQueue::doGetAcquiredCount
+ *
+ * @return int
+ */
+ protected function doGetAcquiredCount() {
+ $claimed = $this->getQueueData( 'claimed' );
+
+ return $claimed ? count( $claimed ) : 0;
+ }
+
+ /**
+ * @see JobQueue::doPop
+ *
+ * @return Job|bool
+ */
+ protected function doPop() {
+ if ( $this->doGetSize() == 0 ) {
+ return false;
+ }
+
+ $unclaimed =& $this->getQueueData( 'unclaimed' );
+ $claimed =& $this->getQueueData( 'claimed', [] );
+
+ if ( $this->order === 'random' ) {
+ $key = array_rand( $unclaimed );
+ } else {
+ reset( $unclaimed );
+ $key = key( $unclaimed );
+ }
+
+ $spec = $unclaimed[$key];
+ unset( $unclaimed[$key] );
+ $claimed[] = $spec;
+
+ $job = $this->jobFromSpecInternal( $spec );
+
+ end( $claimed );
+ $job->metadata['claimId'] = key( $claimed );
+
+ return $job;
+ }
+
+ /**
+ * @see JobQueue::doAck
+ *
+ * @param Job $job
+ */
+ protected function doAck( Job $job ) {
+ if ( $this->getAcquiredCount() == 0 ) {
+ return;
+ }
+
+ $claimed =& $this->getQueueData( 'claimed' );
+ unset( $claimed[$job->metadata['claimId']] );
+ }
+
+ /**
+ * @see JobQueue::doDelete
+ */
+ protected function doDelete() {
+ if ( isset( self::$data[$this->type][$this->wiki] ) ) {
+ unset( self::$data[$this->type][$this->wiki] );
+ if ( !self::$data[$this->type] ) {
+ unset( self::$data[$this->type] );
+ }
+ }
+ }
+
+ /**
+ * @see JobQueue::getAllQueuedJobs
+ *
+ * @return Iterator of Job objects.
+ */
+ public function getAllQueuedJobs() {
+ $unclaimed = $this->getQueueData( 'unclaimed' );
+ if ( !$unclaimed ) {
+ return new ArrayIterator( [] );
+ }
+
+ return new MappedIterator(
+ $unclaimed,
+ function ( $value ) {
+ return $this->jobFromSpecInternal( $value );
+ }
+ );
+ }
+
+ /**
+ * @see JobQueue::getAllAcquiredJobs
+ *
+ * @return Iterator of Job objects.
+ */
+ public function getAllAcquiredJobs() {
+ $claimed = $this->getQueueData( 'claimed' );
+ if ( !$claimed ) {
+ return new ArrayIterator( [] );
+ }
+
+ return new MappedIterator(
+ $claimed,
+ function ( $value ) {
+ return $this->jobFromSpecInternal( $value );
+ }
+ );
+ }
+
+ /**
+ * @param IJobSpecification $spec
+ *
+ * @return Job
+ */
+ public function jobFromSpecInternal( IJobSpecification $spec ) {
+ return Job::factory( $spec->getType(), $spec->getTitle(), $spec->getParams() );
+ }
+
+ /**
+ * @param string $field
+ * @param mixed $init
+ *
+ * @return mixed
+ */
+ private function &getQueueData( $field, $init = null ) {
+ if ( !isset( self::$data[$this->type][$this->wiki][$field] ) ) {
+ if ( $init !== null ) {
+ self::$data[$this->type][$this->wiki][$field] = $init;
+ } else {
+ return $init;
+ }
+ }
+
+ return self::$data[$this->type][$this->wiki][$field];
+ }
+}
diff --git a/www/wiki/includes/jobqueue/JobQueueRedis.php b/www/wiki/includes/jobqueue/JobQueueRedis.php
new file mode 100644
index 00000000..7dad014e
--- /dev/null
+++ b/www/wiki/includes/jobqueue/JobQueueRedis.php
@@ -0,0 +1,820 @@
+<?php
+/**
+ * Redis-backed job queue code.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along
+ * with this program; if not, write to the Free Software Foundation, Inc.,
+ * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ * http://www.gnu.org/copyleft/gpl.html
+ *
+ * @file
+ */
+use Psr\Log\LoggerInterface;
+
+/**
+ * Class to handle job queues stored in Redis
+ *
+ * This is a faster and less resource-intensive job queue than JobQueueDB.
+ * All data for a queue using this class is placed into one redis server.
+ * The mediawiki/services/jobrunner background service must be set up and running.
+ *
+ * There are eight main redis keys (per queue) used to track jobs:
+ * - l-unclaimed : A list of job IDs used for ready unclaimed jobs
+ * - z-claimed : A sorted set of (job ID, UNIX timestamp as score) used for job retries
+ * - z-abandoned : A sorted set of (job ID, UNIX timestamp as score) used for broken jobs
+ * - z-delayed : A sorted set of (job ID, UNIX timestamp as score) used for delayed jobs
+ * - h-idBySha1 : A hash of (SHA1 => job ID) for unclaimed jobs used for de-duplication
+ * - h-sha1ById : A hash of (job ID => SHA1) for unclaimed jobs used for de-duplication
+ * - h-attempts : A hash of (job ID => attempt count) used for job claiming/retries
+ * - h-data : A hash of (job ID => serialized blobs) for job storage
+ * A job ID can be in only one of z-delayed, l-unclaimed, z-claimed, and z-abandoned.
+ * If an ID appears in any of those lists, it should have a h-data entry for its ID.
+ * If a job has a SHA1 de-duplication value and its ID is in l-unclaimed or z-delayed, then
+ * there should be no other such jobs with that SHA1. Every h-idBySha1 entry has an h-sha1ById
+ * entry and every h-sha1ById must refer to an ID that is l-unclaimed. If a job has its
+ * ID in z-claimed or z-abandoned, then it must also have an h-attempts entry for its ID.
+ *
+ * The following keys are used to track queue states:
+ * - s-queuesWithJobs : A set of all queues with non-abandoned jobs
+ *
+ * The background service takes care of undelaying, recycling, and pruning jobs as well as
+ * removing s-queuesWithJobs entries as queues empty.
+ *
+ * Additionally, "rootjob:* keys track "root jobs" used for additional de-duplication.
+ * Aside from root job keys, all keys have no expiry, and are only removed when jobs are run.
+ * All the keys are prefixed with the relevant wiki ID information.
+ *
+ * This class requires Redis 2.6 as it makes use Lua scripts for fast atomic operations.
+ * Additionally, it should be noted that redis has different persistence modes, such
+ * as rdb snapshots, journaling, and no persistence. Appropriate configuration should be
+ * made on the servers based on what queues are using it and what tolerance they have.
+ *
+ * @ingroup JobQueue
+ * @ingroup Redis
+ * @since 1.22
+ */
+class JobQueueRedis extends JobQueue {
+ /** @var RedisConnectionPool */
+ protected $redisPool;
+ /** @var LoggerInterface */
+ protected $logger;
+
+ /** @var string Server address */
+ protected $server;
+ /** @var string Compression method to use */
+ protected $compression;
+
+ const MAX_PUSH_SIZE = 25; // avoid tying up the server
+
+ /**
+ * @param array $params Possible keys:
+ * - redisConfig : An array of parameters to RedisConnectionPool::__construct().
+ * Note that the serializer option is ignored as "none" is always used.
+ * - redisServer : A hostname/port combination or the absolute path of a UNIX socket.
+ * If a hostname is specified but no port, the standard port number
+ * 6379 will be used. Required.
+ * - compression : The type of compression to use; one of (none,gzip).
+ * - daemonized : Set to true if the redisJobRunnerService runs in the background.
+ * This will disable job recycling/undelaying from the MediaWiki side
+ * to avoid redundance and out-of-sync configuration.
+ * @throws InvalidArgumentException
+ */
+ public function __construct( array $params ) {
+ parent::__construct( $params );
+ $params['redisConfig']['serializer'] = 'none'; // make it easy to use Lua
+ $this->server = $params['redisServer'];
+ $this->compression = isset( $params['compression'] ) ? $params['compression'] : 'none';
+ $this->redisPool = RedisConnectionPool::singleton( $params['redisConfig'] );
+ if ( empty( $params['daemonized'] ) ) {
+ throw new InvalidArgumentException(
+ "Non-daemonized mode is no longer supported. Please install the " .
+ "mediawiki/services/jobrunner service and update \$wgJobTypeConf as needed." );
+ }
+ $this->logger = \MediaWiki\Logger\LoggerFactory::getInstance( 'redis' );
+ }
+
+ protected function supportedOrders() {
+ return [ 'timestamp', 'fifo' ];
+ }
+
+ protected function optimalOrder() {
+ return 'fifo';
+ }
+
+ protected function supportsDelayedJobs() {
+ return true;
+ }
+
+ /**
+ * @see JobQueue::doIsEmpty()
+ * @return bool
+ * @throws JobQueueError
+ */
+ protected function doIsEmpty() {
+ return $this->doGetSize() == 0;
+ }
+
+ /**
+ * @see JobQueue::doGetSize()
+ * @return int
+ * @throws JobQueueError
+ */
+ protected function doGetSize() {
+ $conn = $this->getConnection();
+ try {
+ return $conn->lSize( $this->getQueueKey( 'l-unclaimed' ) );
+ } catch ( RedisException $e ) {
+ $this->throwRedisException( $conn, $e );
+ }
+ }
+
+ /**
+ * @see JobQueue::doGetAcquiredCount()
+ * @return int
+ * @throws JobQueueError
+ */
+ protected function doGetAcquiredCount() {
+ $conn = $this->getConnection();
+ try {
+ $conn->multi( Redis::PIPELINE );
+ $conn->zSize( $this->getQueueKey( 'z-claimed' ) );
+ $conn->zSize( $this->getQueueKey( 'z-abandoned' ) );
+
+ return array_sum( $conn->exec() );
+ } catch ( RedisException $e ) {
+ $this->throwRedisException( $conn, $e );
+ }
+ }
+
+ /**
+ * @see JobQueue::doGetDelayedCount()
+ * @return int
+ * @throws JobQueueError
+ */
+ protected function doGetDelayedCount() {
+ $conn = $this->getConnection();
+ try {
+ return $conn->zSize( $this->getQueueKey( 'z-delayed' ) );
+ } catch ( RedisException $e ) {
+ $this->throwRedisException( $conn, $e );
+ }
+ }
+
+ /**
+ * @see JobQueue::doGetAbandonedCount()
+ * @return int
+ * @throws JobQueueError
+ */
+ protected function doGetAbandonedCount() {
+ $conn = $this->getConnection();
+ try {
+ return $conn->zSize( $this->getQueueKey( 'z-abandoned' ) );
+ } catch ( RedisException $e ) {
+ $this->throwRedisException( $conn, $e );
+ }
+ }
+
+ /**
+ * @see JobQueue::doBatchPush()
+ * @param IJobSpecification[] $jobs
+ * @param int $flags
+ * @return void
+ * @throws JobQueueError
+ */
+ protected function doBatchPush( array $jobs, $flags ) {
+ // Convert the jobs into field maps (de-duplicated against each other)
+ $items = []; // (job ID => job fields map)
+ foreach ( $jobs as $job ) {
+ $item = $this->getNewJobFields( $job );
+ if ( strlen( $item['sha1'] ) ) { // hash identifier => de-duplicate
+ $items[$item['sha1']] = $item;
+ } else {
+ $items[$item['uuid']] = $item;
+ }
+ }
+
+ if ( !count( $items ) ) {
+ return; // nothing to do
+ }
+
+ $conn = $this->getConnection();
+ try {
+ // Actually push the non-duplicate jobs into the queue...
+ if ( $flags & self::QOS_ATOMIC ) {
+ $batches = [ $items ]; // all or nothing
+ } else {
+ $batches = array_chunk( $items, self::MAX_PUSH_SIZE );
+ }
+ $failed = 0;
+ $pushed = 0;
+ foreach ( $batches as $itemBatch ) {
+ $added = $this->pushBlobs( $conn, $itemBatch );
+ if ( is_int( $added ) ) {
+ $pushed += $added;
+ } else {
+ $failed += count( $itemBatch );
+ }
+ }
+ JobQueue::incrStats( 'inserts', $this->type, count( $items ) );
+ JobQueue::incrStats( 'inserts_actual', $this->type, $pushed );
+ JobQueue::incrStats( 'dupe_inserts', $this->type,
+ count( $items ) - $failed - $pushed );
+ if ( $failed > 0 ) {
+ $err = "Could not insert {$failed} {$this->type} job(s).";
+ wfDebugLog( 'JobQueueRedis', $err );
+ throw new RedisException( $err );
+ }
+ } catch ( RedisException $e ) {
+ $this->throwRedisException( $conn, $e );
+ }
+ }
+
+ /**
+ * @param RedisConnRef $conn
+ * @param array $items List of results from JobQueueRedis::getNewJobFields()
+ * @return int Number of jobs inserted (duplicates are ignored)
+ * @throws RedisException
+ */
+ protected function pushBlobs( RedisConnRef $conn, array $items ) {
+ $args = [ $this->encodeQueueName() ];
+ // Next args come in 4s ([id, sha1, rtime, blob [, id, sha1, rtime, blob ... ] ] )
+ foreach ( $items as $item ) {
+ $args[] = (string)$item['uuid'];
+ $args[] = (string)$item['sha1'];
+ $args[] = (string)$item['rtimestamp'];
+ $args[] = (string)$this->serialize( $item );
+ }
+ static $script =
+ /** @lang Lua */
+<<<LUA
+ local kUnclaimed, kSha1ById, kIdBySha1, kDelayed, kData, kQwJobs = unpack(KEYS)
+ -- First argument is the queue ID
+ local queueId = ARGV[1]
+ -- Next arguments all come in 4s (one per job)
+ local variadicArgCount = #ARGV - 1
+ if variadicArgCount % 4 ~= 0 then
+ return redis.error_reply('Unmatched arguments')
+ end
+ -- Insert each job into this queue as needed
+ local pushed = 0
+ for i = 2,#ARGV,4 do
+ local id,sha1,rtimestamp,blob = ARGV[i],ARGV[i+1],ARGV[i+2],ARGV[i+3]
+ if sha1 == '' or redis.call('hExists',kIdBySha1,sha1) == 0 then
+ if 1*rtimestamp > 0 then
+ -- Insert into delayed queue (release time as score)
+ redis.call('zAdd',kDelayed,rtimestamp,id)
+ else
+ -- Insert into unclaimed queue
+ redis.call('lPush',kUnclaimed,id)
+ end
+ if sha1 ~= '' then
+ redis.call('hSet',kSha1ById,id,sha1)
+ redis.call('hSet',kIdBySha1,sha1,id)
+ end
+ redis.call('hSet',kData,id,blob)
+ pushed = pushed + 1
+ end
+ end
+ -- Mark this queue as having jobs
+ redis.call('sAdd',kQwJobs,queueId)
+ return pushed
+LUA;
+ return $conn->luaEval( $script,
+ array_merge(
+ [
+ $this->getQueueKey( 'l-unclaimed' ), # KEYS[1]
+ $this->getQueueKey( 'h-sha1ById' ), # KEYS[2]
+ $this->getQueueKey( 'h-idBySha1' ), # KEYS[3]
+ $this->getQueueKey( 'z-delayed' ), # KEYS[4]
+ $this->getQueueKey( 'h-data' ), # KEYS[5]
+ $this->getGlobalKey( 's-queuesWithJobs' ), # KEYS[6]
+ ],
+ $args
+ ),
+ 6 # number of first argument(s) that are keys
+ );
+ }
+
+ /**
+ * @see JobQueue::doPop()
+ * @return Job|bool
+ * @throws JobQueueError
+ */
+ protected function doPop() {
+ $job = false;
+
+ $conn = $this->getConnection();
+ try {
+ do {
+ $blob = $this->popAndAcquireBlob( $conn );
+ if ( !is_string( $blob ) ) {
+ break; // no jobs; nothing to do
+ }
+
+ JobQueue::incrStats( 'pops', $this->type );
+ $item = $this->unserialize( $blob );
+ if ( $item === false ) {
+ wfDebugLog( 'JobQueueRedis', "Could not unserialize {$this->type} job." );
+ continue;
+ }
+
+ // If $item is invalid, the runner loop recyling will cleanup as needed
+ $job = $this->getJobFromFields( $item ); // may be false
+ } while ( !$job ); // job may be false if invalid
+ } catch ( RedisException $e ) {
+ $this->throwRedisException( $conn, $e );
+ }
+
+ return $job;
+ }
+
+ /**
+ * @param RedisConnRef $conn
+ * @return array Serialized string or false
+ * @throws RedisException
+ */
+ protected function popAndAcquireBlob( RedisConnRef $conn ) {
+ static $script =
+ /** @lang Lua */
+<<<LUA
+ local kUnclaimed, kSha1ById, kIdBySha1, kClaimed, kAttempts, kData = unpack(KEYS)
+ local rTime = unpack(ARGV)
+ -- Pop an item off the queue
+ local id = redis.call('rPop',kUnclaimed)
+ if not id then
+ return false
+ end
+ -- Allow new duplicates of this job
+ local sha1 = redis.call('hGet',kSha1ById,id)
+ if sha1 then redis.call('hDel',kIdBySha1,sha1) end
+ redis.call('hDel',kSha1ById,id)
+ -- Mark the jobs as claimed and return it
+ redis.call('zAdd',kClaimed,rTime,id)
+ redis.call('hIncrBy',kAttempts,id,1)
+ return redis.call('hGet',kData,id)
+LUA;
+ return $conn->luaEval( $script,
+ [
+ $this->getQueueKey( 'l-unclaimed' ), # KEYS[1]
+ $this->getQueueKey( 'h-sha1ById' ), # KEYS[2]
+ $this->getQueueKey( 'h-idBySha1' ), # KEYS[3]
+ $this->getQueueKey( 'z-claimed' ), # KEYS[4]
+ $this->getQueueKey( 'h-attempts' ), # KEYS[5]
+ $this->getQueueKey( 'h-data' ), # KEYS[6]
+ time(), # ARGV[1] (injected to be replication-safe)
+ ],
+ 6 # number of first argument(s) that are keys
+ );
+ }
+
+ /**
+ * @see JobQueue::doAck()
+ * @param Job $job
+ * @return Job|bool
+ * @throws UnexpectedValueException
+ * @throws JobQueueError
+ */
+ protected function doAck( Job $job ) {
+ if ( !isset( $job->metadata['uuid'] ) ) {
+ throw new UnexpectedValueException( "Job of type '{$job->getType()}' has no UUID." );
+ }
+
+ $uuid = $job->metadata['uuid'];
+ $conn = $this->getConnection();
+ try {
+ static $script =
+ /** @lang Lua */
+<<<LUA
+ local kClaimed, kAttempts, kData = unpack(KEYS)
+ local id = unpack(ARGV)
+ -- Unmark the job as claimed
+ local removed = redis.call('zRem',kClaimed,id)
+ -- Check if the job was recycled
+ if removed == 0 then
+ return 0
+ end
+ -- Delete the retry data
+ redis.call('hDel',kAttempts,id)
+ -- Delete the job data itself
+ return redis.call('hDel',kData,id)
+LUA;
+ $res = $conn->luaEval( $script,
+ [
+ $this->getQueueKey( 'z-claimed' ), # KEYS[1]
+ $this->getQueueKey( 'h-attempts' ), # KEYS[2]
+ $this->getQueueKey( 'h-data' ), # KEYS[3]
+ $uuid # ARGV[1]
+ ],
+ 3 # number of first argument(s) that are keys
+ );
+
+ if ( !$res ) {
+ wfDebugLog( 'JobQueueRedis', "Could not acknowledge {$this->type} job $uuid." );
+
+ return false;
+ }
+
+ JobQueue::incrStats( 'acks', $this->type );
+ } catch ( RedisException $e ) {
+ $this->throwRedisException( $conn, $e );
+ }
+
+ return true;
+ }
+
+ /**
+ * @see JobQueue::doDeduplicateRootJob()
+ * @param IJobSpecification $job
+ * @return bool
+ * @throws JobQueueError
+ * @throws LogicException
+ */
+ protected function doDeduplicateRootJob( IJobSpecification $job ) {
+ if ( !$job->hasRootJobParams() ) {
+ throw new LogicException( "Cannot register root job; missing parameters." );
+ }
+ $params = $job->getRootJobParams();
+
+ $key = $this->getRootJobCacheKey( $params['rootJobSignature'] );
+
+ $conn = $this->getConnection();
+ try {
+ $timestamp = $conn->get( $key ); // current last timestamp of this job
+ if ( $timestamp && $timestamp >= $params['rootJobTimestamp'] ) {
+ return true; // a newer version of this root job was enqueued
+ }
+
+ // Update the timestamp of the last root job started at the location...
+ return $conn->set( $key, $params['rootJobTimestamp'], self::ROOTJOB_TTL ); // 2 weeks
+ } catch ( RedisException $e ) {
+ $this->throwRedisException( $conn, $e );
+ }
+ }
+
+ /**
+ * @see JobQueue::doIsRootJobOldDuplicate()
+ * @param Job $job
+ * @return bool
+ * @throws JobQueueError
+ */
+ protected function doIsRootJobOldDuplicate( Job $job ) {
+ if ( !$job->hasRootJobParams() ) {
+ return false; // job has no de-deplication info
+ }
+ $params = $job->getRootJobParams();
+
+ $conn = $this->getConnection();
+ try {
+ // Get the last time this root job was enqueued
+ $timestamp = $conn->get( $this->getRootJobCacheKey( $params['rootJobSignature'] ) );
+ } catch ( RedisException $e ) {
+ $timestamp = false;
+ $this->throwRedisException( $conn, $e );
+ }
+
+ // Check if a new root job was started at the location after this one's...
+ return ( $timestamp && $timestamp > $params['rootJobTimestamp'] );
+ }
+
+ /**
+ * @see JobQueue::doDelete()
+ * @return bool
+ * @throws JobQueueError
+ */
+ protected function doDelete() {
+ static $props = [ 'l-unclaimed', 'z-claimed', 'z-abandoned',
+ 'z-delayed', 'h-idBySha1', 'h-sha1ById', 'h-attempts', 'h-data' ];
+
+ $conn = $this->getConnection();
+ try {
+ $keys = [];
+ foreach ( $props as $prop ) {
+ $keys[] = $this->getQueueKey( $prop );
+ }
+
+ $ok = ( $conn->delete( $keys ) !== false );
+ $conn->sRem( $this->getGlobalKey( 's-queuesWithJobs' ), $this->encodeQueueName() );
+
+ return $ok;
+ } catch ( RedisException $e ) {
+ $this->throwRedisException( $conn, $e );
+ }
+ }
+
+ /**
+ * @see JobQueue::getAllQueuedJobs()
+ * @return Iterator
+ * @throws JobQueueError
+ */
+ public function getAllQueuedJobs() {
+ $conn = $this->getConnection();
+ try {
+ $uids = $conn->lRange( $this->getQueueKey( 'l-unclaimed' ), 0, -1 );
+ } catch ( RedisException $e ) {
+ $this->throwRedisException( $conn, $e );
+ }
+
+ return $this->getJobIterator( $conn, $uids );
+ }
+
+ /**
+ * @see JobQueue::getAllDelayedJobs()
+ * @return Iterator
+ * @throws JobQueueError
+ */
+ public function getAllDelayedJobs() {
+ $conn = $this->getConnection();
+ try {
+ $uids = $conn->zRange( $this->getQueueKey( 'z-delayed' ), 0, -1 );
+ } catch ( RedisException $e ) {
+ $this->throwRedisException( $conn, $e );
+ }
+
+ return $this->getJobIterator( $conn, $uids );
+ }
+
+ /**
+ * @see JobQueue::getAllAcquiredJobs()
+ * @return Iterator
+ * @throws JobQueueError
+ */
+ public function getAllAcquiredJobs() {
+ $conn = $this->getConnection();
+ try {
+ $uids = $conn->zRange( $this->getQueueKey( 'z-claimed' ), 0, -1 );
+ } catch ( RedisException $e ) {
+ $this->throwRedisException( $conn, $e );
+ }
+
+ return $this->getJobIterator( $conn, $uids );
+ }
+
+ /**
+ * @see JobQueue::getAllAbandonedJobs()
+ * @return Iterator
+ * @throws JobQueueError
+ */
+ public function getAllAbandonedJobs() {
+ $conn = $this->getConnection();
+ try {
+ $uids = $conn->zRange( $this->getQueueKey( 'z-abandoned' ), 0, -1 );
+ } catch ( RedisException $e ) {
+ $this->throwRedisException( $conn, $e );
+ }
+
+ return $this->getJobIterator( $conn, $uids );
+ }
+
+ /**
+ * @param RedisConnRef $conn
+ * @param array $uids List of job UUIDs
+ * @return MappedIterator
+ */
+ protected function getJobIterator( RedisConnRef $conn, array $uids ) {
+ return new MappedIterator(
+ $uids,
+ function ( $uid ) use ( $conn ) {
+ return $this->getJobFromUidInternal( $uid, $conn );
+ },
+ [ 'accept' => function ( $job ) {
+ return is_object( $job );
+ } ]
+ );
+ }
+
+ public function getCoalesceLocationInternal() {
+ return "RedisServer:" . $this->server;
+ }
+
+ protected function doGetSiblingQueuesWithJobs( array $types ) {
+ return array_keys( array_filter( $this->doGetSiblingQueueSizes( $types ) ) );
+ }
+
+ protected function doGetSiblingQueueSizes( array $types ) {
+ $sizes = []; // (type => size)
+ $types = array_values( $types ); // reindex
+ $conn = $this->getConnection();
+ try {
+ $conn->multi( Redis::PIPELINE );
+ foreach ( $types as $type ) {
+ $conn->lSize( $this->getQueueKey( 'l-unclaimed', $type ) );
+ }
+ $res = $conn->exec();
+ if ( is_array( $res ) ) {
+ foreach ( $res as $i => $size ) {
+ $sizes[$types[$i]] = $size;
+ }
+ }
+ } catch ( RedisException $e ) {
+ $this->throwRedisException( $conn, $e );
+ }
+
+ return $sizes;
+ }
+
+ /**
+ * This function should not be called outside JobQueueRedis
+ *
+ * @param string $uid
+ * @param RedisConnRef $conn
+ * @return Job|bool Returns false if the job does not exist
+ * @throws JobQueueError
+ * @throws UnexpectedValueException
+ */
+ public function getJobFromUidInternal( $uid, RedisConnRef $conn ) {
+ try {
+ $data = $conn->hGet( $this->getQueueKey( 'h-data' ), $uid );
+ if ( $data === false ) {
+ return false; // not found
+ }
+ $item = $this->unserialize( $data );
+ if ( !is_array( $item ) ) { // this shouldn't happen
+ throw new UnexpectedValueException( "Could not find job with ID '$uid'." );
+ }
+ $title = Title::makeTitle( $item['namespace'], $item['title'] );
+ $job = Job::factory( $item['type'], $title, $item['params'] );
+ $job->metadata['uuid'] = $item['uuid'];
+ $job->metadata['timestamp'] = $item['timestamp'];
+ // Add in attempt count for debugging at showJobs.php
+ $job->metadata['attempts'] = $conn->hGet( $this->getQueueKey( 'h-attempts' ), $uid );
+
+ return $job;
+ } catch ( RedisException $e ) {
+ $this->throwRedisException( $conn, $e );
+ }
+ }
+
+ /**
+ * @return array List of (wiki,type) tuples for queues with non-abandoned jobs
+ * @throws JobQueueConnectionError
+ * @throws JobQueueError
+ */
+ public function getServerQueuesWithJobs() {
+ $queues = [];
+
+ $conn = $this->getConnection();
+ try {
+ $set = $conn->sMembers( $this->getGlobalKey( 's-queuesWithJobs' ) );
+ foreach ( $set as $queue ) {
+ $queues[] = $this->decodeQueueName( $queue );
+ }
+ } catch ( RedisException $e ) {
+ $this->throwRedisException( $conn, $e );
+ }
+
+ return $queues;
+ }
+
+ /**
+ * @param IJobSpecification $job
+ * @return array
+ */
+ protected function getNewJobFields( IJobSpecification $job ) {
+ return [
+ // Fields that describe the nature of the job
+ 'type' => $job->getType(),
+ 'namespace' => $job->getTitle()->getNamespace(),
+ 'title' => $job->getTitle()->getDBkey(),
+ 'params' => $job->getParams(),
+ // Some jobs cannot run until a "release timestamp"
+ 'rtimestamp' => $job->getReleaseTimestamp() ?: 0,
+ // Additional job metadata
+ 'uuid' => UIDGenerator::newRawUUIDv4( UIDGenerator::QUICK_RAND ),
+ 'sha1' => $job->ignoreDuplicates()
+ ? Wikimedia\base_convert( sha1( serialize( $job->getDeduplicationInfo() ) ), 16, 36, 31 )
+ : '',
+ 'timestamp' => time() // UNIX timestamp
+ ];
+ }
+
+ /**
+ * @param array $fields
+ * @return Job|bool
+ */
+ protected function getJobFromFields( array $fields ) {
+ $title = Title::makeTitle( $fields['namespace'], $fields['title'] );
+ $job = Job::factory( $fields['type'], $title, $fields['params'] );
+ $job->metadata['uuid'] = $fields['uuid'];
+ $job->metadata['timestamp'] = $fields['timestamp'];
+
+ return $job;
+ }
+
+ /**
+ * @param array $fields
+ * @return string Serialized and possibly compressed version of $fields
+ */
+ protected function serialize( array $fields ) {
+ $blob = serialize( $fields );
+ if ( $this->compression === 'gzip'
+ && strlen( $blob ) >= 1024
+ && function_exists( 'gzdeflate' )
+ ) {
+ $object = (object)[ 'blob' => gzdeflate( $blob ), 'enc' => 'gzip' ];
+ $blobz = serialize( $object );
+
+ return ( strlen( $blobz ) < strlen( $blob ) ) ? $blobz : $blob;
+ } else {
+ return $blob;
+ }
+ }
+
+ /**
+ * @param string $blob
+ * @return array|bool Unserialized version of $blob or false
+ */
+ protected function unserialize( $blob ) {
+ $fields = unserialize( $blob );
+ if ( is_object( $fields ) ) {
+ if ( $fields->enc === 'gzip' && function_exists( 'gzinflate' ) ) {
+ $fields = unserialize( gzinflate( $fields->blob ) );
+ } else {
+ $fields = false;
+ }
+ }
+
+ return is_array( $fields ) ? $fields : false;
+ }
+
+ /**
+ * Get a connection to the server that handles all sub-queues for this queue
+ *
+ * @return RedisConnRef
+ * @throws JobQueueConnectionError
+ */
+ protected function getConnection() {
+ $conn = $this->redisPool->getConnection( $this->server, $this->logger );
+ if ( !$conn ) {
+ throw new JobQueueConnectionError(
+ "Unable to connect to redis server {$this->server}." );
+ }
+
+ return $conn;
+ }
+
+ /**
+ * @param RedisConnRef $conn
+ * @param RedisException $e
+ * @throws JobQueueError
+ */
+ protected function throwRedisException( RedisConnRef $conn, $e ) {
+ $this->redisPool->handleError( $conn, $e );
+ throw new JobQueueError( "Redis server error: {$e->getMessage()}\n" );
+ }
+
+ /**
+ * @return string JSON
+ */
+ private function encodeQueueName() {
+ return json_encode( [ $this->type, $this->wiki ] );
+ }
+
+ /**
+ * @param string $name JSON
+ * @return array (type, wiki)
+ */
+ private function decodeQueueName( $name ) {
+ return json_decode( $name );
+ }
+
+ /**
+ * @param string $name
+ * @return string
+ */
+ private function getGlobalKey( $name ) {
+ $parts = [ 'global', 'jobqueue', $name ];
+ foreach ( $parts as $part ) {
+ if ( !preg_match( '/[a-zA-Z0-9_-]+/', $part ) ) {
+ throw new InvalidArgumentException( "Key part characters are out of range." );
+ }
+ }
+
+ return implode( ':', $parts );
+ }
+
+ /**
+ * @param string $prop
+ * @param string|null $type Override this for sibling queues
+ * @return string
+ */
+ private function getQueueKey( $prop, $type = null ) {
+ $type = is_string( $type ) ? $type : $this->type;
+ list( $db, $prefix ) = wfSplitWikiID( $this->wiki );
+ $keyspace = $prefix ? "$db-$prefix" : $db;
+
+ $parts = [ $keyspace, 'jobqueue', $type, $prop ];
+
+ // Parts are typically ASCII, but encode for sanity to escape ":"
+ return implode( ':', array_map( 'rawurlencode', $parts ) );
+ }
+}
diff --git a/www/wiki/includes/jobqueue/JobQueueSecondTestQueue.php b/www/wiki/includes/jobqueue/JobQueueSecondTestQueue.php
new file mode 100644
index 00000000..01f467f2
--- /dev/null
+++ b/www/wiki/includes/jobqueue/JobQueueSecondTestQueue.php
@@ -0,0 +1,290 @@
+<?php
+
+/**
+ * A wrapper for the JobQueue that delegates all the method calls to a single,
+ * main queue, and also pushes all the jobs to a second job queue that's being
+ * debugged.
+ *
+ * This class was temporary added to test transitioning to the JobQueueEventBus
+ * and will removed after the transition is completed. This code is only needed
+ * while we are testing the new infrastructure to be able to compare the results
+ * between the queue implementations and make sure that they process the same jobs,
+ * deduplicate correctly, compare the delays, backlogs and make sure no jobs are lost.
+ * When the new infrastructure is well tested this will not be needed any more.
+ *
+ * @deprecated since 1.30
+ * @since 1.30
+ */
+class JobQueueSecondTestQueue extends JobQueue {
+
+ /**
+ * @var JobQueue
+ */
+ private $mainQueue;
+
+ /**
+ * @var JobQueue
+ */
+ private $debugQueue;
+
+ /**
+ * @var bool
+ */
+ private $onlyWriteToDebugQueue;
+
+ protected function __construct( array $params ) {
+ if ( !isset( $params['mainqueue'] ) ) {
+ throw new MWException( "mainqueue parameter must be provided to the debug queue" );
+ }
+
+ if ( !isset( $params['debugqueue'] ) ) {
+ throw new MWException( "debugqueue parameter must be provided to the debug queue" );
+ }
+
+ $conf = [ 'wiki' => $params['wiki'], 'type' => $params['type'] ];
+ $this->mainQueue = JobQueue::factory( $params['mainqueue'] + $conf );
+ $this->debugQueue = JobQueue::factory( $params['debugqueue'] + $conf );
+ $this->onlyWriteToDebugQueue = isset( $params['readonly'] ) ? $params['readonly'] : false;
+
+ // We need to construct parent after creating the main and debug queue
+ // because super constructor calls some methods we delegate to the main queue.
+ parent::__construct( $params );
+ }
+
+ /**
+ * Get the allowed queue orders for configuration validation
+ *
+ * @return array Subset of (random, timestamp, fifo, undefined)
+ */
+ protected function supportedOrders() {
+ return $this->mainQueue->supportedOrders();
+ }
+
+ /**
+ * Get the default queue order to use if configuration does not specify one
+ *
+ * @return string One of (random, timestamp, fifo, undefined)
+ */
+ protected function optimalOrder() {
+ return $this->mainQueue->optimalOrder();
+ }
+
+ /**
+ * Find out if delayed jobs are supported for configuration validation
+ *
+ * @return bool Whether delayed jobs are supported
+ */
+ protected function supportsDelayedJobs() {
+ return $this->mainQueue->supportsDelayedJobs();
+ }
+
+ /**
+ * @see JobQueue::isEmpty()
+ * @return bool
+ */
+ protected function doIsEmpty() {
+ return $this->mainQueue->doIsEmpty();
+ }
+
+ /**
+ * @see JobQueue::getSize()
+ * @return int
+ */
+ protected function doGetSize() {
+ return $this->mainQueue->doGetSize();
+ }
+
+ /**
+ * @see JobQueue::getAcquiredCount()
+ * @return int
+ */
+ protected function doGetAcquiredCount() {
+ return $this->mainQueue->doGetAcquiredCount();
+ }
+
+ /**
+ * @see JobQueue::getDelayedCount()
+ * @return int
+ */
+ protected function doGetDelayedCount() {
+ return $this->mainQueue->doGetDelayedCount();
+ }
+
+ /**
+ * @see JobQueue::getAbandonedCount()
+ * @return int
+ */
+ protected function doGetAbandonedCount() {
+ return $this->mainQueue->doGetAbandonedCount();
+ }
+
+ /**
+ * @see JobQueue::batchPush()
+ * @param IJobSpecification[] $jobs
+ * @param int $flags
+ */
+ protected function doBatchPush( array $jobs, $flags ) {
+ if ( !$this->onlyWriteToDebugQueue ) {
+ $this->mainQueue->doBatchPush( $jobs, $flags );
+ }
+
+ try {
+ $this->debugQueue->doBatchPush( $jobs, $flags );
+ } catch ( Exception $exception ) {
+ MWExceptionHandler::logException( $exception );
+ }
+ }
+
+ /**
+ * @see JobQueue::pop()
+ * @return Job|bool
+ */
+ protected function doPop() {
+ return $this->mainQueue->doPop();
+ }
+
+ /**
+ * @see JobQueue::ack()
+ * @param Job $job
+ * @return Job|bool
+ */
+ protected function doAck( Job $job ) {
+ return $this->mainQueue->doAck( $job );
+ }
+
+ /**
+ * @see JobQueue::deduplicateRootJob()
+ * @param IJobSpecification $job
+ * @throws MWException
+ * @return bool
+ */
+ protected function doDeduplicateRootJob( IJobSpecification $job ) {
+ return $this->mainQueue->doDeduplicateRootJob( $job );
+ }
+
+ /**
+ * @see JobQueue::isRootJobOldDuplicate()
+ * @param Job $job
+ * @return bool
+ */
+ protected function doIsRootJobOldDuplicate( Job $job ) {
+ return $this->mainQueue->doIsRootJobOldDuplicate( $job );
+ }
+
+ /**
+ * @param string $signature Hash identifier of the root job
+ * @return string
+ */
+ protected function getRootJobCacheKey( $signature ) {
+ return $this->mainQueue->getRootJobCacheKey( $signature );
+ }
+
+ /**
+ * @see JobQueue::delete()
+ * @return bool
+ * @throws MWException
+ */
+ protected function doDelete() {
+ return $this->mainQueue->doDelete();
+ }
+
+ /**
+ * @see JobQueue::waitForBackups()
+ * @return void
+ */
+ protected function doWaitForBackups() {
+ $this->mainQueue->doWaitForBackups();
+ }
+
+ /**
+ * @see JobQueue::flushCaches()
+ * @return void
+ */
+ protected function doFlushCaches() {
+ $this->mainQueue->doFlushCaches();
+ }
+
+ /**
+ * Get an iterator to traverse over all available jobs in this queue.
+ * This does not include jobs that are currently acquired or delayed.
+ * Note: results may be stale if the queue is concurrently modified.
+ *
+ * @return Iterator
+ * @throws JobQueueError
+ */
+ public function getAllQueuedJobs() {
+ return $this->mainQueue->getAllQueuedJobs();
+ }
+
+ /**
+ * Get an iterator to traverse over all delayed jobs in this queue.
+ * Note: results may be stale if the queue is concurrently modified.
+ *
+ * @return Iterator
+ * @throws JobQueueError
+ * @since 1.22
+ */
+ public function getAllDelayedJobs() {
+ return $this->mainQueue->getAllDelayedJobs();
+ }
+
+ /**
+ * Get an iterator to traverse over all claimed jobs in this queue
+ *
+ * Callers should be quick to iterator over it or few results
+ * will be returned due to jobs being acknowledged and deleted
+ *
+ * @return Iterator
+ * @throws JobQueueError
+ * @since 1.26
+ */
+ public function getAllAcquiredJobs() {
+ return $this->mainQueue->getAllAcquiredJobs();
+ }
+
+ /**
+ * Get an iterator to traverse over all abandoned jobs in this queue
+ *
+ * @return Iterator
+ * @throws JobQueueError
+ * @since 1.25
+ */
+ public function getAllAbandonedJobs() {
+ return $this->mainQueue->getAllAbandonedJobs();
+ }
+
+ /**
+ * Do not use this function outside of JobQueue/JobQueueGroup
+ *
+ * @return string
+ * @since 1.22
+ */
+ public function getCoalesceLocationInternal() {
+ return $this->mainQueue->getCoalesceLocationInternal();
+ }
+
+ /**
+ * @see JobQueue::getSiblingQueuesWithJobs()
+ * @param array $types List of queues types
+ * @return array|null (list of queue types) or null if unsupported
+ */
+ protected function doGetSiblingQueuesWithJobs( array $types ) {
+ return $this->mainQueue->doGetSiblingQueuesWithJobs( $types );
+ }
+
+ /**
+ * @see JobQueue::getSiblingQueuesSize()
+ * @param array $types List of queues types
+ * @return array|null (list of queue types) or null if unsupported
+ */
+ protected function doGetSiblingQueueSizes( array $types ) {
+ return $this->mainQueue->doGetSiblingQueueSizes( $types );
+ }
+
+ /**
+ * @throws JobQueueReadOnlyError
+ */
+ protected function assertNotReadOnly() {
+ $this->mainQueue->assertNotReadOnly();
+ }
+}
diff --git a/www/wiki/includes/jobqueue/JobRunner.php b/www/wiki/includes/jobqueue/JobRunner.php
new file mode 100644
index 00000000..977fbdaa
--- /dev/null
+++ b/www/wiki/includes/jobqueue/JobRunner.php
@@ -0,0 +1,607 @@
+<?php
+/**
+ * Job queue runner utility methods
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along
+ * with this program; if not, write to the Free Software Foundation, Inc.,
+ * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ * http://www.gnu.org/copyleft/gpl.html
+ *
+ * @file
+ * @ingroup JobQueue
+ */
+
+use MediaWiki\MediaWikiServices;
+use MediaWiki\Logger\LoggerFactory;
+use Liuggio\StatsdClient\Factory\StatsdDataFactory;
+use Psr\Log\LoggerAwareInterface;
+use Psr\Log\LoggerInterface;
+use Wikimedia\ScopedCallback;
+use Wikimedia\Rdbms\LBFactory;
+use Wikimedia\Rdbms\DBError;
+use Wikimedia\Rdbms\DBReplicationWaitError;
+
+/**
+ * Job queue runner utility methods
+ *
+ * @ingroup JobQueue
+ * @since 1.24
+ */
+class JobRunner implements LoggerAwareInterface {
+ /** @var Config */
+ protected $config;
+ /** @var callable|null Debug output handler */
+ protected $debug;
+
+ /**
+ * @var LoggerInterface $logger
+ */
+ protected $logger;
+
+ const MAX_ALLOWED_LAG = 3; // abort if more than this much DB lag is present
+ const LAG_CHECK_PERIOD = 1.0; // check replica DB lag this many seconds
+ const ERROR_BACKOFF_TTL = 1; // seconds to back off a queue due to errors
+ const READONLY_BACKOFF_TTL = 30; // seconds to back off a queue due to read-only errors
+
+ /**
+ * @param callable $debug Optional debug output handler
+ */
+ public function setDebugHandler( $debug ) {
+ $this->debug = $debug;
+ }
+
+ /**
+ * @param LoggerInterface $logger
+ * @return void
+ */
+ public function setLogger( LoggerInterface $logger ) {
+ $this->logger = $logger;
+ }
+
+ /**
+ * @param LoggerInterface $logger
+ */
+ public function __construct( LoggerInterface $logger = null ) {
+ if ( $logger === null ) {
+ $logger = LoggerFactory::getInstance( 'runJobs' );
+ }
+ $this->setLogger( $logger );
+ $this->config = MediaWikiServices::getInstance()->getMainConfig();
+ }
+
+ /**
+ * Run jobs of the specified number/type for the specified time
+ *
+ * The response map has a 'job' field that lists status of each job, including:
+ * - type : the job type
+ * - status : ok/failed
+ * - error : any error message string
+ * - time : the job run time in ms
+ * The response map also has:
+ * - backoffs : the (job type => seconds) map of backoff times
+ * - elapsed : the total time spent running tasks in ms
+ * - reached : the reason the script finished, one of (none-ready, job-limit, time-limit,
+ * memory-limit)
+ *
+ * This method outputs status information only if a debug handler was set.
+ * Any exceptions are caught and logged, but are not reported as output.
+ *
+ * @param array $options Map of parameters:
+ * - type : the job type (or false for the default types)
+ * - maxJobs : maximum number of jobs to run
+ * - maxTime : maximum time in seconds before stopping
+ * - throttle : whether to respect job backoff configuration
+ * @return array Summary response that can easily be JSON serialized
+ */
+ public function run( array $options ) {
+ $jobClasses = $this->config->get( 'JobClasses' );
+ $profilerLimits = $this->config->get( 'TrxProfilerLimits' );
+
+ $response = [ 'jobs' => [], 'reached' => 'none-ready' ];
+
+ $type = isset( $options['type'] ) ? $options['type'] : false;
+ $maxJobs = isset( $options['maxJobs'] ) ? $options['maxJobs'] : false;
+ $maxTime = isset( $options['maxTime'] ) ? $options['maxTime'] : false;
+ $noThrottle = isset( $options['throttle'] ) && !$options['throttle'];
+
+ // Bail if job type is invalid
+ if ( $type !== false && !isset( $jobClasses[$type] ) ) {
+ $response['reached'] = 'none-possible';
+ return $response;
+ }
+
+ // Bail out if DB is in read-only mode
+ if ( wfReadOnly() ) {
+ $response['reached'] = 'read-only';
+ return $response;
+ }
+
+ $lbFactory = MediaWikiServices::getInstance()->getDBLoadBalancerFactory();
+ if ( $lbFactory->hasTransactionRound() ) {
+ throw new LogicException( __METHOD__ . ' called with an active transaction round.' );
+ }
+ // Bail out if there is too much DB lag.
+ // This check should not block as we want to try other wiki queues.
+ list( , $maxLag ) = $lbFactory->getMainLB( wfWikiID() )->getMaxLag();
+ if ( $maxLag >= self::MAX_ALLOWED_LAG ) {
+ $response['reached'] = 'replica-lag-limit';
+ return $response;
+ }
+
+ // Catch huge single updates that lead to replica DB lag
+ $trxProfiler = Profiler::instance()->getTransactionProfiler();
+ $trxProfiler->setLogger( LoggerFactory::getInstance( 'DBPerformance' ) );
+ $trxProfiler->setExpectations( $profilerLimits['JobRunner'], __METHOD__ );
+
+ // Some jobs types should not run until a certain timestamp
+ $backoffs = []; // map of (type => UNIX expiry)
+ $backoffDeltas = []; // map of (type => seconds)
+ $wait = 'wait'; // block to read backoffs the first time
+
+ $group = JobQueueGroup::singleton();
+ $stats = MediaWikiServices::getInstance()->getStatsdDataFactory();
+ $jobsPopped = 0;
+ $timeMsTotal = 0;
+ $startTime = microtime( true ); // time since jobs started running
+ $lastCheckTime = 1; // timestamp of last replica DB check
+ do {
+ // Sync the persistent backoffs with concurrent runners
+ $backoffs = $this->syncBackoffDeltas( $backoffs, $backoffDeltas, $wait );
+ $blacklist = $noThrottle ? [] : array_keys( $backoffs );
+ $wait = 'nowait'; // less important now
+
+ if ( $type === false ) {
+ $job = $group->pop(
+ JobQueueGroup::TYPE_DEFAULT,
+ JobQueueGroup::USE_CACHE,
+ $blacklist
+ );
+ } elseif ( in_array( $type, $blacklist ) ) {
+ $job = false; // requested queue in backoff state
+ } else {
+ $job = $group->pop( $type ); // job from a single queue
+ }
+
+ if ( $job ) { // found a job
+ ++$jobsPopped;
+ $popTime = time();
+ $jType = $job->getType();
+
+ WebRequest::overrideRequestId( $job->getRequestId() );
+
+ // Back off of certain jobs for a while (for throttling and for errors)
+ $ttw = $this->getBackoffTimeToWait( $job );
+ if ( $ttw > 0 ) {
+ // Always add the delta for other runners in case the time running the
+ // job negated the backoff for each individually but not collectively.
+ $backoffDeltas[$jType] = isset( $backoffDeltas[$jType] )
+ ? $backoffDeltas[$jType] + $ttw
+ : $ttw;
+ $backoffs = $this->syncBackoffDeltas( $backoffs, $backoffDeltas, $wait );
+ }
+
+ $info = $this->executeJob( $job, $lbFactory, $stats, $popTime );
+ if ( $info['status'] !== false || !$job->allowRetries() ) {
+ $group->ack( $job ); // succeeded or job cannot be retried
+ }
+
+ // Back off of certain jobs for a while (for throttling and for errors)
+ if ( $info['status'] === false && mt_rand( 0, 49 ) == 0 ) {
+ $ttw = max( $ttw, $this->getErrorBackoffTTL( $info['error'] ) );
+ $backoffDeltas[$jType] = isset( $backoffDeltas[$jType] )
+ ? $backoffDeltas[$jType] + $ttw
+ : $ttw;
+ }
+
+ $response['jobs'][] = [
+ 'type' => $jType,
+ 'status' => ( $info['status'] === false ) ? 'failed' : 'ok',
+ 'error' => $info['error'],
+ 'time' => $info['timeMs']
+ ];
+ $timeMsTotal += $info['timeMs'];
+
+ // Break out if we hit the job count or wall time limits...
+ if ( $maxJobs && $jobsPopped >= $maxJobs ) {
+ $response['reached'] = 'job-limit';
+ break;
+ } elseif ( $maxTime && ( microtime( true ) - $startTime ) > $maxTime ) {
+ $response['reached'] = 'time-limit';
+ break;
+ }
+
+ // Don't let any of the main DB replica DBs get backed up.
+ // This only waits for so long before exiting and letting
+ // other wikis in the farm (on different masters) get a chance.
+ $timePassed = microtime( true ) - $lastCheckTime;
+ if ( $timePassed >= self::LAG_CHECK_PERIOD || $timePassed < 0 ) {
+ try {
+ $lbFactory->waitForReplication( [
+ 'ifWritesSince' => $lastCheckTime,
+ 'timeout' => self::MAX_ALLOWED_LAG
+ ] );
+ } catch ( DBReplicationWaitError $e ) {
+ $response['reached'] = 'replica-lag-limit';
+ break;
+ }
+ $lastCheckTime = microtime( true );
+ }
+ // Don't let any queue replica DBs/backups fall behind
+ if ( $jobsPopped > 0 && ( $jobsPopped % 100 ) == 0 ) {
+ $group->waitForBackups();
+ }
+
+ // Bail if near-OOM instead of in a job
+ if ( !$this->checkMemoryOK() ) {
+ $response['reached'] = 'memory-limit';
+ break;
+ }
+ }
+ } while ( $job ); // stop when there are no jobs
+
+ // Sync the persistent backoffs for the next runJobs.php pass
+ if ( $backoffDeltas ) {
+ $this->syncBackoffDeltas( $backoffs, $backoffDeltas, 'wait' );
+ }
+
+ $response['backoffs'] = $backoffs;
+ $response['elapsed'] = $timeMsTotal;
+
+ return $response;
+ }
+
+ /**
+ * @param string $error
+ * @return int TTL in seconds
+ */
+ private function getErrorBackoffTTL( $error ) {
+ return strpos( $error, 'DBReadOnlyError' ) !== false
+ ? self::READONLY_BACKOFF_TTL
+ : self::ERROR_BACKOFF_TTL;
+ }
+
+ /**
+ * @param Job $job
+ * @param LBFactory $lbFactory
+ * @param StatsdDataFactory $stats
+ * @param float $popTime
+ * @return array Map of status/error/timeMs
+ */
+ private function executeJob( Job $job, LBFactory $lbFactory, $stats, $popTime ) {
+ $jType = $job->getType();
+ $msg = $job->toString() . " STARTING";
+ $this->logger->debug( $msg, [
+ 'job_type' => $job->getType(),
+ ] );
+ $this->debugCallback( $msg );
+
+ // Run the job...
+ $rssStart = $this->getMaxRssKb();
+ $jobStartTime = microtime( true );
+ try {
+ $fnameTrxOwner = get_class( $job ) . '::run'; // give run() outer scope
+ if ( !$job->hasExecutionFlag( $job::JOB_NO_EXPLICIT_TRX_ROUND ) ) {
+ $lbFactory->beginMasterChanges( $fnameTrxOwner );
+ }
+ $status = $job->run();
+ $error = $job->getLastError();
+ $this->commitMasterChanges( $lbFactory, $job, $fnameTrxOwner );
+ // Important: this must be the last deferred update added (T100085, T154425)
+ DeferredUpdates::addCallableUpdate( [ JobQueueGroup::class, 'pushLazyJobs' ] );
+ // Run any deferred update tasks; doUpdates() manages transactions itself
+ DeferredUpdates::doUpdates();
+ } catch ( Exception $e ) {
+ MWExceptionHandler::rollbackMasterChangesAndLog( $e );
+ $status = false;
+ $error = get_class( $e ) . ': ' . $e->getMessage();
+ }
+ // Always attempt to call teardown() even if Job throws exception.
+ try {
+ $job->teardown( $status );
+ } catch ( Exception $e ) {
+ MWExceptionHandler::logException( $e );
+ }
+
+ // Commit all outstanding connections that are in a transaction
+ // to get a fresh repeatable read snapshot on every connection.
+ // Note that jobs are still responsible for handling replica DB lag.
+ $lbFactory->flushReplicaSnapshots( __METHOD__ );
+ // Clear out title cache data from prior snapshots
+ MediaWikiServices::getInstance()->getLinkCache()->clear();
+ $timeMs = intval( ( microtime( true ) - $jobStartTime ) * 1000 );
+ $rssEnd = $this->getMaxRssKb();
+
+ // Record how long jobs wait before getting popped
+ $readyTs = $job->getReadyTimestamp();
+ if ( $readyTs ) {
+ $pickupDelay = max( 0, $popTime - $readyTs );
+ $stats->timing( 'jobqueue.pickup_delay.all', 1000 * $pickupDelay );
+ $stats->timing( "jobqueue.pickup_delay.$jType", 1000 * $pickupDelay );
+ }
+ // Record root job age for jobs being run
+ $rootTimestamp = $job->getRootJobParams()['rootJobTimestamp'];
+ if ( $rootTimestamp ) {
+ $age = max( 0, $popTime - wfTimestamp( TS_UNIX, $rootTimestamp ) );
+ $stats->timing( "jobqueue.pickup_root_age.$jType", 1000 * $age );
+ }
+ // Track the execution time for jobs
+ $stats->timing( "jobqueue.run.$jType", $timeMs );
+ // Track RSS increases for jobs (in case of memory leaks)
+ if ( $rssStart && $rssEnd ) {
+ $stats->updateCount( "jobqueue.rss_delta.$jType", $rssEnd - $rssStart );
+ }
+
+ if ( $status === false ) {
+ $msg = $job->toString() . " t={job_duration} error={job_error}";
+ $this->logger->error( $msg, [
+ 'job_type' => $job->getType(),
+ 'job_duration' => $timeMs,
+ 'job_error' => $error,
+ ] );
+
+ $msg = $job->toString() . " t=$timeMs error={$error}";
+ $this->debugCallback( $msg );
+ } else {
+ $msg = $job->toString() . " t={job_duration} good";
+ $this->logger->info( $msg, [
+ 'job_type' => $job->getType(),
+ 'job_duration' => $timeMs,
+ ] );
+
+ $msg = $job->toString() . " t=$timeMs good";
+ $this->debugCallback( $msg );
+ }
+
+ return [ 'status' => $status, 'error' => $error, 'timeMs' => $timeMs ];
+ }
+
+ /**
+ * @return int|null Max memory RSS in kilobytes
+ */
+ private function getMaxRssKb() {
+ $info = wfGetRusage() ?: [];
+ // see https://linux.die.net/man/2/getrusage
+ return isset( $info['ru_maxrss'] ) ? (int)$info['ru_maxrss'] : null;
+ }
+
+ /**
+ * @param Job $job
+ * @return int Seconds for this runner to avoid doing more jobs of this type
+ * @see $wgJobBackoffThrottling
+ */
+ private function getBackoffTimeToWait( Job $job ) {
+ $throttling = $this->config->get( 'JobBackoffThrottling' );
+
+ if ( !isset( $throttling[$job->getType()] ) || $job instanceof DuplicateJob ) {
+ return 0; // not throttled
+ }
+
+ $itemsPerSecond = $throttling[$job->getType()];
+ if ( $itemsPerSecond <= 0 ) {
+ return 0; // not throttled
+ }
+
+ $seconds = 0;
+ if ( $job->workItemCount() > 0 ) {
+ $exactSeconds = $job->workItemCount() / $itemsPerSecond;
+ // use randomized rounding
+ $seconds = floor( $exactSeconds );
+ $remainder = $exactSeconds - $seconds;
+ $seconds += ( mt_rand() / mt_getrandmax() < $remainder ) ? 1 : 0;
+ }
+
+ return (int)$seconds;
+ }
+
+ /**
+ * Get the previous backoff expiries from persistent storage
+ * On I/O or lock acquisition failure this returns the original $backoffs.
+ *
+ * @param array $backoffs Map of (job type => UNIX timestamp)
+ * @param string $mode Lock wait mode - "wait" or "nowait"
+ * @return array Map of (job type => backoff expiry timestamp)
+ */
+ private function loadBackoffs( array $backoffs, $mode = 'wait' ) {
+ $file = wfTempDir() . '/mw-runJobs-backoffs.json';
+ if ( is_file( $file ) ) {
+ $noblock = ( $mode === 'nowait' ) ? LOCK_NB : 0;
+ $handle = fopen( $file, 'rb' );
+ if ( !flock( $handle, LOCK_SH | $noblock ) ) {
+ fclose( $handle );
+ return $backoffs; // don't wait on lock
+ }
+ $content = stream_get_contents( $handle );
+ flock( $handle, LOCK_UN );
+ fclose( $handle );
+ $ctime = microtime( true );
+ $cBackoffs = json_decode( $content, true ) ?: [];
+ foreach ( $cBackoffs as $type => $timestamp ) {
+ if ( $timestamp < $ctime ) {
+ unset( $cBackoffs[$type] );
+ }
+ }
+ } else {
+ $cBackoffs = [];
+ }
+
+ return $cBackoffs;
+ }
+
+ /**
+ * Merge the current backoff expiries from persistent storage
+ *
+ * The $deltas map is set to an empty array on success.
+ * On I/O or lock acquisition failure this returns the original $backoffs.
+ *
+ * @param array $backoffs Map of (job type => UNIX timestamp)
+ * @param array $deltas Map of (job type => seconds)
+ * @param string $mode Lock wait mode - "wait" or "nowait"
+ * @return array The new backoffs account for $backoffs and the latest file data
+ */
+ private function syncBackoffDeltas( array $backoffs, array &$deltas, $mode = 'wait' ) {
+ if ( !$deltas ) {
+ return $this->loadBackoffs( $backoffs, $mode );
+ }
+
+ $noblock = ( $mode === 'nowait' ) ? LOCK_NB : 0;
+ $file = wfTempDir() . '/mw-runJobs-backoffs.json';
+ $handle = fopen( $file, 'wb+' );
+ if ( !flock( $handle, LOCK_EX | $noblock ) ) {
+ fclose( $handle );
+ return $backoffs; // don't wait on lock
+ }
+ $ctime = microtime( true );
+ $content = stream_get_contents( $handle );
+ $cBackoffs = json_decode( $content, true ) ?: [];
+ foreach ( $deltas as $type => $seconds ) {
+ $cBackoffs[$type] = isset( $cBackoffs[$type] ) && $cBackoffs[$type] >= $ctime
+ ? $cBackoffs[$type] + $seconds
+ : $ctime + $seconds;
+ }
+ foreach ( $cBackoffs as $type => $timestamp ) {
+ if ( $timestamp < $ctime ) {
+ unset( $cBackoffs[$type] );
+ }
+ }
+ ftruncate( $handle, 0 );
+ fwrite( $handle, json_encode( $cBackoffs ) );
+ flock( $handle, LOCK_UN );
+ fclose( $handle );
+
+ $deltas = [];
+
+ return $cBackoffs;
+ }
+
+ /**
+ * Make sure that this script is not too close to the memory usage limit.
+ * It is better to die in between jobs than OOM right in the middle of one.
+ * @return bool
+ */
+ private function checkMemoryOK() {
+ static $maxBytes = null;
+ if ( $maxBytes === null ) {
+ $m = [];
+ if ( preg_match( '!^(\d+)(k|m|g|)$!i', ini_get( 'memory_limit' ), $m ) ) {
+ list( , $num, $unit ) = $m;
+ $conv = [ 'g' => 1073741824, 'm' => 1048576, 'k' => 1024, '' => 1 ];
+ $maxBytes = $num * $conv[strtolower( $unit )];
+ } else {
+ $maxBytes = 0;
+ }
+ }
+ $usedBytes = memory_get_usage();
+ if ( $maxBytes && $usedBytes >= 0.95 * $maxBytes ) {
+ $msg = "Detected excessive memory usage ({used_bytes}/{max_bytes}).";
+ $this->logger->error( $msg, [
+ 'used_bytes' => $usedBytes,
+ 'max_bytes' => $maxBytes,
+ ] );
+
+ $msg = "Detected excessive memory usage ($usedBytes/$maxBytes).";
+ $this->debugCallback( $msg );
+
+ return false;
+ }
+
+ return true;
+ }
+
+ /**
+ * Log the job message
+ * @param string $msg The message to log
+ */
+ private function debugCallback( $msg ) {
+ if ( $this->debug ) {
+ call_user_func_array( $this->debug, [ wfTimestamp( TS_DB ) . " $msg\n" ] );
+ }
+ }
+
+ /**
+ * Issue a commit on all masters who are currently in a transaction and have
+ * made changes to the database. It also supports sometimes waiting for the
+ * local wiki's replica DBs to catch up. See the documentation for
+ * $wgJobSerialCommitThreshold for more.
+ *
+ * @param LBFactory $lbFactory
+ * @param Job $job
+ * @param string $fnameTrxOwner
+ * @throws DBError
+ */
+ private function commitMasterChanges( LBFactory $lbFactory, Job $job, $fnameTrxOwner ) {
+ $syncThreshold = $this->config->get( 'JobSerialCommitThreshold' );
+
+ $time = false;
+ $lb = $lbFactory->getMainLB( wfWikiID() );
+ if ( $syncThreshold !== false && $lb->getServerCount() > 1 ) {
+ // Generally, there is one master connection to the local DB
+ $dbwSerial = $lb->getAnyOpenConnection( $lb->getWriterIndex() );
+ // We need natively blocking fast locks
+ if ( $dbwSerial && $dbwSerial->namedLocksEnqueue() ) {
+ $time = $dbwSerial->pendingWriteQueryDuration( $dbwSerial::ESTIMATE_DB_APPLY );
+ if ( $time < $syncThreshold ) {
+ $dbwSerial = false;
+ }
+ } else {
+ $dbwSerial = false;
+ }
+ } else {
+ // There are no replica DBs or writes are all to foreign DB (we don't handle that)
+ $dbwSerial = false;
+ }
+
+ if ( !$dbwSerial ) {
+ $lbFactory->commitMasterChanges(
+ $fnameTrxOwner,
+ // Abort if any transaction was too big
+ [ 'maxWriteDuration' => $this->config->get( 'MaxJobDBWriteDuration' ) ]
+ );
+
+ return;
+ }
+
+ $ms = intval( 1000 * $time );
+
+ $msg = $job->toString() . " COMMIT ENQUEUED [{job_commit_write_ms}ms of writes]";
+ $this->logger->info( $msg, [
+ 'job_type' => $job->getType(),
+ 'job_commit_write_ms' => $ms,
+ ] );
+
+ $msg = $job->toString() . " COMMIT ENQUEUED [{$ms}ms of writes]";
+ $this->debugCallback( $msg );
+
+ // Wait for an exclusive lock to commit
+ if ( !$dbwSerial->lock( 'jobrunner-serial-commit', __METHOD__, 30 ) ) {
+ // This will trigger a rollback in the main loop
+ throw new DBError( $dbwSerial, "Timed out waiting on commit queue." );
+ }
+ $unlocker = new ScopedCallback( function () use ( $dbwSerial ) {
+ $dbwSerial->unlock( 'jobrunner-serial-commit', __METHOD__ );
+ } );
+
+ // Wait for the replica DBs to catch up
+ $pos = $lb->getMasterPos();
+ if ( $pos ) {
+ $lb->waitForAll( $pos );
+ }
+
+ // Actually commit the DB master changes
+ $lbFactory->commitMasterChanges(
+ $fnameTrxOwner,
+ // Abort if any transaction was too big
+ [ 'maxWriteDuration' => $this->config->get( 'MaxJobDBWriteDuration' ) ]
+ );
+ ScopedCallback::consume( $unlocker );
+ }
+}
diff --git a/www/wiki/includes/jobqueue/JobSpecification.php b/www/wiki/includes/jobqueue/JobSpecification.php
new file mode 100644
index 00000000..b62b83c6
--- /dev/null
+++ b/www/wiki/includes/jobqueue/JobSpecification.php
@@ -0,0 +1,233 @@
+<?php
+/**
+ * Job queue task description base code.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along
+ * with this program; if not, write to the Free Software Foundation, Inc.,
+ * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ * http://www.gnu.org/copyleft/gpl.html
+ *
+ * @file
+ */
+
+/**
+ * Job queue task description interface
+ *
+ * @ingroup JobQueue
+ * @since 1.23
+ */
+interface IJobSpecification {
+ /**
+ * @return string Job type
+ */
+ public function getType();
+
+ /**
+ * @return array
+ */
+ public function getParams();
+
+ /**
+ * @return int|null UNIX timestamp to delay running this job until, otherwise null
+ */
+ public function getReleaseTimestamp();
+
+ /**
+ * @return bool Whether only one of each identical set of jobs should be run
+ */
+ public function ignoreDuplicates();
+
+ /**
+ * Subclasses may need to override this to make duplication detection work.
+ * The resulting map conveys everything that makes the job unique. This is
+ * only checked if ignoreDuplicates() returns true, meaning that duplicate
+ * jobs are supposed to be ignored.
+ *
+ * @return array Map of key/values
+ */
+ public function getDeduplicationInfo();
+
+ /**
+ * @see JobQueue::deduplicateRootJob()
+ * @return array
+ * @since 1.26
+ */
+ public function getRootJobParams();
+
+ /**
+ * @see JobQueue::deduplicateRootJob()
+ * @return bool
+ * @since 1.22
+ */
+ public function hasRootJobParams();
+
+ /**
+ * @see JobQueue::deduplicateRootJob()
+ * @return bool Whether this is job is a root job
+ */
+ public function isRootJob();
+
+ /**
+ * @return Title Descriptive title (this can simply be informative)
+ */
+ public function getTitle();
+}
+
+/**
+ * Job queue task description base code
+ *
+ * Example usage:
+ * @code
+ * $job = new JobSpecification(
+ * 'null',
+ * array( 'lives' => 1, 'usleep' => 100, 'pi' => 3.141569 ),
+ * array( 'removeDuplicates' => 1 ),
+ * Title::makeTitle( NS_SPECIAL, 'nullity' )
+ * );
+ * JobQueueGroup::singleton()->push( $job )
+ * @endcode
+ *
+ * @ingroup JobQueue
+ * @since 1.23
+ */
+class JobSpecification implements IJobSpecification {
+ /** @var string */
+ protected $type;
+
+ /** @var array Array of job parameters or false if none */
+ protected $params;
+
+ /** @var Title */
+ protected $title;
+
+ /** @var array */
+ protected $opts;
+
+ /**
+ * @param string $type
+ * @param array $params Map of key/values
+ * @param array $opts Map of key/values; includes 'removeDuplicates'
+ * @param Title $title Optional descriptive title
+ */
+ public function __construct(
+ $type, array $params, array $opts = [], Title $title = null
+ ) {
+ $this->validateParams( $params );
+ $this->validateParams( $opts );
+
+ $this->type = $type;
+ $this->params = $params;
+ $this->title = $title ?: Title::makeTitle( NS_SPECIAL, 'Badtitle/' . static::class );
+ $this->opts = $opts;
+ }
+
+ /**
+ * @param array $params
+ */
+ protected function validateParams( array $params ) {
+ foreach ( $params as $p => $v ) {
+ if ( is_array( $v ) ) {
+ $this->validateParams( $v );
+ } elseif ( !is_scalar( $v ) && $v !== null ) {
+ throw new UnexpectedValueException( "Job parameter $p is not JSON serializable." );
+ }
+ }
+ }
+
+ public function getType() {
+ return $this->type;
+ }
+
+ public function getTitle() {
+ return $this->title;
+ }
+
+ public function getParams() {
+ return $this->params;
+ }
+
+ public function getReleaseTimestamp() {
+ return isset( $this->params['jobReleaseTimestamp'] )
+ ? wfTimestampOrNull( TS_UNIX, $this->params['jobReleaseTimestamp'] )
+ : null;
+ }
+
+ public function ignoreDuplicates() {
+ return !empty( $this->opts['removeDuplicates'] );
+ }
+
+ public function getDeduplicationInfo() {
+ $info = [
+ 'type' => $this->getType(),
+ 'namespace' => $this->getTitle()->getNamespace(),
+ 'title' => $this->getTitle()->getDBkey(),
+ 'params' => $this->getParams()
+ ];
+ if ( is_array( $info['params'] ) ) {
+ // Identical jobs with different "root" jobs should count as duplicates
+ unset( $info['params']['rootJobSignature'] );
+ unset( $info['params']['rootJobTimestamp'] );
+ // Likewise for jobs with different delay times
+ unset( $info['params']['jobReleaseTimestamp'] );
+ }
+
+ return $info;
+ }
+
+ public function getRootJobParams() {
+ return [
+ 'rootJobSignature' => isset( $this->params['rootJobSignature'] )
+ ? $this->params['rootJobSignature']
+ : null,
+ 'rootJobTimestamp' => isset( $this->params['rootJobTimestamp'] )
+ ? $this->params['rootJobTimestamp']
+ : null
+ ];
+ }
+
+ public function hasRootJobParams() {
+ return isset( $this->params['rootJobSignature'] )
+ && isset( $this->params['rootJobTimestamp'] );
+ }
+
+ public function isRootJob() {
+ return $this->hasRootJobParams() && !empty( $this->params['rootJobIsSelf'] );
+ }
+
+ /**
+ * @return array Field/value map that can immediately be serialized
+ * @since 1.25
+ */
+ public function toSerializableArray() {
+ return [
+ 'type' => $this->type,
+ 'params' => $this->params,
+ 'opts' => $this->opts,
+ 'title' => [
+ 'ns' => $this->title->getNamespace(),
+ 'key' => $this->title->getDBkey()
+ ]
+ ];
+ }
+
+ /**
+ * @param array $map Field/value map
+ * @return JobSpecification
+ * @since 1.25
+ */
+ public static function newFromArray( array $map ) {
+ $title = Title::makeTitle( $map['title']['ns'], $map['title']['key'] );
+
+ return new self( $map['type'], $map['params'], $map['opts'], $title );
+ }
+}
diff --git a/www/wiki/includes/jobqueue/README b/www/wiki/includes/jobqueue/README
new file mode 100644
index 00000000..2073b0f8
--- /dev/null
+++ b/www/wiki/includes/jobqueue/README
@@ -0,0 +1,80 @@
+/*!
+\ingroup JobQueue
+\page jobqueue_design Job queue design
+
+Notes on the Job queuing system architecture.
+
+\section intro Introduction
+
+The data model consist of the following main components:
+* The Job object represents a particular deferred task that happens in the
+ background. All jobs subclass the Job object and put the main logic in the
+ function called run().
+* The JobQueue object represents a particular queue of jobs of a certain type.
+ For example there may be a queue for email jobs and a queue for CDN purge
+ jobs.
+
+\section jobqueue Job queues
+
+Each job type has its own queue and is associated to a storage medium. One
+queue might save its jobs in redis while another one uses would use a database.
+
+Storage medium are defined in a queue class. Before using it, you must
+define in $wgJobTypeConf a mapping of the job type to a queue class.
+
+The factory class JobQueueGroup provides helper functions:
+- getting the queue for a given job
+- route new job insertions to the proper queue
+
+The following queue classes are available:
+* JobQueueDB (stores jobs in the `job` table in a database)
+* JobQueueRedis (stores jobs in a redis server)
+
+All queue classes support some basic operations (though some may be no-ops):
+* enqueueing a batch of jobs
+* dequeueing a single job
+* acknowledging a job is completed
+* checking if the queue is empty
+
+Some queue classes (like JobQueueDB) may dequeue jobs in random order while other
+queues might dequeue jobs in exact FIFO order. Callers should thus not assume jobs
+are executed in FIFO order.
+
+Also note that not all queue classes will have the same reliability guarantees.
+In-memory queues may lose data when restarted depending on snapshot and journal
+settings (including journal fsync() frequency). Some queue types may totally remove
+jobs when dequeued while leaving the ack() function as a no-op; if a job is
+dequeued by a job runner, which crashes before completion, the job will be
+lost. Some jobs, like purging CDN caches after a template change, may not
+require durable queues, whereas other jobs might be more important.
+
+\section aggregator Job queue aggregator
+
+The aggregators are used by nextJobDB.php, which is a script that will return a
+random ready queue (on any wiki in the farm) that can be used with runJobs.php.
+This can be used in conjunction with any scripts that handle wiki farm job queues.
+Note that $wgLocalDatabases defines what wikis are in the wiki farm.
+
+Since each job type has its own queue, and wiki-farms may have many wikis,
+there might be a large number of queues to keep track of. To avoid wasting
+large amounts of time polling empty queues, aggregators exists to keep track
+of which queues are ready.
+
+The following queue aggregator classes are available:
+* JobQueueAggregatorRedis (uses a redis server to track ready queues)
+
+Some aggregators cache data for a few minutes while others may be always up to date.
+This can be an important factor for jobs that need a low pickup time (or latency).
+
+\section jobs Jobs
+
+Callers should also try to make jobs maintain correctness when executed twice.
+This is useful for queues that actually implement ack(), since they may recycle
+dequeued but un-acknowledged jobs back into the queue to be attempted again. If
+a runner dequeues a job, runs it, but then crashes before calling ack(), the
+job may be returned to the queue and run a second time. Jobs like cache purging can
+happen several times without any correctness problems. However, a pathological case
+would be if a bug causes the problem to systematically keep repeating. For example,
+a job may always throw a DB error at the end of run(). This problem is trickier to
+solve and more obnoxious for things like email jobs, for example. For such jobs,
+it might be useful to use a queue that does not retry jobs.
diff --git a/www/wiki/includes/jobqueue/aggregator/JobQueueAggregator.php b/www/wiki/includes/jobqueue/aggregator/JobQueueAggregator.php
new file mode 100644
index 00000000..433de93a
--- /dev/null
+++ b/www/wiki/includes/jobqueue/aggregator/JobQueueAggregator.php
@@ -0,0 +1,180 @@
+<?php
+/**
+ * Job queue aggregator code.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along
+ * with this program; if not, write to the Free Software Foundation, Inc.,
+ * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ * http://www.gnu.org/copyleft/gpl.html
+ *
+ * @file
+ */
+
+/**
+ * Class to handle tracking information about all queues
+ *
+ * @ingroup JobQueue
+ * @since 1.21
+ */
+abstract class JobQueueAggregator {
+ /** @var JobQueueAggregator */
+ protected static $instance = null;
+
+ /**
+ * @param array $params
+ */
+ public function __construct( array $params ) {
+ }
+
+ /**
+ * @throws MWException
+ * @return JobQueueAggregator
+ */
+ final public static function singleton() {
+ global $wgJobQueueAggregator;
+
+ if ( !isset( self::$instance ) ) {
+ $class = $wgJobQueueAggregator['class'];
+ $obj = new $class( $wgJobQueueAggregator );
+ if ( !( $obj instanceof JobQueueAggregator ) ) {
+ throw new MWException( "Class '$class' is not a JobQueueAggregator class." );
+ }
+ self::$instance = $obj;
+ }
+
+ return self::$instance;
+ }
+
+ /**
+ * Destroy the singleton instance
+ *
+ * @return void
+ */
+ final public static function destroySingleton() {
+ self::$instance = null;
+ }
+
+ /**
+ * Mark a queue as being empty
+ *
+ * @param string $wiki
+ * @param string $type
+ * @return bool Success
+ */
+ final public function notifyQueueEmpty( $wiki, $type ) {
+ $ok = $this->doNotifyQueueEmpty( $wiki, $type );
+
+ return $ok;
+ }
+
+ /**
+ * @see JobQueueAggregator::notifyQueueEmpty()
+ * @param string $wiki
+ * @param string $type
+ * @return bool
+ */
+ abstract protected function doNotifyQueueEmpty( $wiki, $type );
+
+ /**
+ * Mark a queue as being non-empty
+ *
+ * @param string $wiki
+ * @param string $type
+ * @return bool Success
+ */
+ final public function notifyQueueNonEmpty( $wiki, $type ) {
+ $ok = $this->doNotifyQueueNonEmpty( $wiki, $type );
+
+ return $ok;
+ }
+
+ /**
+ * @see JobQueueAggregator::notifyQueueNonEmpty()
+ * @param string $wiki
+ * @param string $type
+ * @return bool
+ */
+ abstract protected function doNotifyQueueNonEmpty( $wiki, $type );
+
+ /**
+ * Get the list of all of the queues with jobs
+ *
+ * @return array (job type => (list of wiki IDs))
+ */
+ final public function getAllReadyWikiQueues() {
+ $res = $this->doGetAllReadyWikiQueues();
+
+ return $res;
+ }
+
+ /**
+ * @see JobQueueAggregator::getAllReadyWikiQueues()
+ */
+ abstract protected function doGetAllReadyWikiQueues();
+
+ /**
+ * Purge all of the aggregator information
+ *
+ * @return bool Success
+ */
+ final public function purge() {
+ $res = $this->doPurge();
+
+ return $res;
+ }
+
+ /**
+ * @see JobQueueAggregator::purge()
+ */
+ abstract protected function doPurge();
+
+ /**
+ * Get all databases that have a pending job.
+ * This poll all the queues and is this expensive.
+ *
+ * @return array (job type => (list of wiki IDs))
+ */
+ protected function findPendingWikiQueues() {
+ global $wgLocalDatabases;
+
+ $pendingDBs = []; // (job type => (db list))
+ foreach ( $wgLocalDatabases as $db ) {
+ foreach ( JobQueueGroup::singleton( $db )->getQueuesWithJobs() as $type ) {
+ $pendingDBs[$type][] = $db;
+ }
+ }
+
+ return $pendingDBs;
+ }
+}
+
+/**
+ * @ingroup JobQueue
+ */
+class JobQueueAggregatorNull extends JobQueueAggregator {
+ protected function doNotifyQueueEmpty( $wiki, $type ) {
+ return true;
+ }
+
+ protected function doNotifyQueueNonEmpty( $wiki, $type ) {
+ return true;
+ }
+
+ protected function doGetAllReadyWikiQueues() {
+ return [];
+ }
+
+ protected function doPurge() {
+ return true;
+ }
+}
diff --git a/www/wiki/includes/jobqueue/aggregator/JobQueueAggregatorRedis.php b/www/wiki/includes/jobqueue/aggregator/JobQueueAggregatorRedis.php
new file mode 100644
index 00000000..db07086f
--- /dev/null
+++ b/www/wiki/includes/jobqueue/aggregator/JobQueueAggregatorRedis.php
@@ -0,0 +1,135 @@
+<?php
+/**
+ * Job queue aggregator code that uses PhpRedis.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along
+ * with this program; if not, write to the Free Software Foundation, Inc.,
+ * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ * http://www.gnu.org/copyleft/gpl.html
+ *
+ * @file
+ */
+use Psr\Log\LoggerInterface;
+
+/**
+ * Class to handle tracking information about all queues using PhpRedis
+ *
+ * The mediawiki/services/jobrunner background service must be set up and running.
+ *
+ * @ingroup JobQueue
+ * @ingroup Redis
+ * @since 1.21
+ */
+class JobQueueAggregatorRedis extends JobQueueAggregator {
+ /** @var RedisConnectionPool */
+ protected $redisPool;
+ /** @var LoggerInterface */
+ protected $logger;
+ /** @var array List of Redis server addresses */
+ protected $servers;
+
+ /**
+ * @param array $params Possible keys:
+ * - redisConfig : An array of parameters to RedisConnectionPool::__construct().
+ * - redisServers : Array of server entries, the first being the primary and the
+ * others being fallback servers. Each entry is either a hostname/port
+ * combination or the absolute path of a UNIX socket.
+ * If a hostname is specified but no port, the standard port number
+ * 6379 will be used. Required.
+ */
+ public function __construct( array $params ) {
+ parent::__construct( $params );
+ $this->servers = isset( $params['redisServers'] )
+ ? $params['redisServers']
+ : [ $params['redisServer'] ]; // b/c
+ $params['redisConfig']['serializer'] = 'none';
+ $this->redisPool = RedisConnectionPool::singleton( $params['redisConfig'] );
+ $this->logger = \MediaWiki\Logger\LoggerFactory::getInstance( 'redis' );
+ }
+
+ protected function doNotifyQueueEmpty( $wiki, $type ) {
+ return true; // managed by the service
+ }
+
+ protected function doNotifyQueueNonEmpty( $wiki, $type ) {
+ return true; // managed by the service
+ }
+
+ protected function doGetAllReadyWikiQueues() {
+ $conn = $this->getConnection();
+ if ( !$conn ) {
+ return [];
+ }
+ try {
+ $map = $conn->hGetAll( $this->getReadyQueueKey() );
+
+ if ( is_array( $map ) && isset( $map['_epoch'] ) ) {
+ unset( $map['_epoch'] ); // ignore
+ $pendingDBs = []; // (type => list of wikis)
+ foreach ( $map as $key => $time ) {
+ list( $type, $wiki ) = $this->decodeQueueName( $key );
+ $pendingDBs[$type][] = $wiki;
+ }
+ } else {
+ throw new UnexpectedValueException(
+ "No queue listing found; make sure redisJobChronService is running."
+ );
+ }
+
+ return $pendingDBs;
+ } catch ( RedisException $e ) {
+ $this->redisPool->handleError( $conn, $e );
+
+ return [];
+ }
+ }
+
+ protected function doPurge() {
+ return true; // fully and only refreshed by the service
+ }
+
+ /**
+ * Get a connection to the server that handles all sub-queues for this queue
+ *
+ * @return RedisConnRef|bool Returns false on failure
+ * @throws MWException
+ */
+ protected function getConnection() {
+ $conn = false;
+ foreach ( $this->servers as $server ) {
+ $conn = $this->redisPool->getConnection( $server, $this->logger );
+ if ( $conn ) {
+ break;
+ }
+ }
+
+ return $conn;
+ }
+
+ /**
+ * @return string
+ */
+ private function getReadyQueueKey() {
+ return "jobqueue:aggregator:h-ready-queues:v2"; // global
+ }
+
+ /**
+ * @param string $name
+ * @return string[]
+ */
+ private function decodeQueueName( $name ) {
+ list( $type, $wiki ) = explode( '/', $name, 2 );
+
+ return [ rawurldecode( $type ), rawurldecode( $wiki ) ];
+ }
+}
diff --git a/www/wiki/includes/jobqueue/jobs/ActivityUpdateJob.php b/www/wiki/includes/jobqueue/jobs/ActivityUpdateJob.php
new file mode 100644
index 00000000..8cc14e51
--- /dev/null
+++ b/www/wiki/includes/jobqueue/jobs/ActivityUpdateJob.php
@@ -0,0 +1,82 @@
+<?php
+/**
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along
+ * with this program; if not, write to the Free Software Foundation, Inc.,
+ * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ * http://www.gnu.org/copyleft/gpl.html
+ *
+ * @file
+ * @ingroup JobQueue
+ */
+
+/**
+ * Job for updating user activity like "last viewed" timestamps
+ *
+ * Job parameters include:
+ * - type: one of (updateWatchlistNotification) [required]
+ * - userid: affected user ID [required]
+ * - notifTime: timestamp to set watchlist entries to [required]
+ * - curTime: UNIX timestamp of the event that triggered this job [required]
+ *
+ * @ingroup JobQueue
+ * @since 1.26
+ */
+class ActivityUpdateJob extends Job {
+ function __construct( Title $title, array $params ) {
+ parent::__construct( 'activityUpdateJob', $title, $params );
+
+ static $required = [ 'type', 'userid', 'notifTime', 'curTime' ];
+ $missing = implode( ', ', array_diff( $required, array_keys( $this->params ) ) );
+ if ( $missing != '' ) {
+ throw new InvalidArgumentException( "Missing paramter(s) $missing" );
+ }
+
+ $this->removeDuplicates = true;
+ }
+
+ public function run() {
+ if ( $this->params['type'] === 'updateWatchlistNotification' ) {
+ $this->updateWatchlistNotification();
+ } else {
+ throw new InvalidArgumentException( "Invalid 'type' '{$this->params['type']}'." );
+ }
+
+ return true;
+ }
+
+ protected function updateWatchlistNotification() {
+ $casTimestamp = ( $this->params['notifTime'] !== null )
+ ? $this->params['notifTime']
+ : $this->params['curTime'];
+
+ $dbw = wfGetDB( DB_MASTER );
+ $dbw->update( 'watchlist',
+ [
+ 'wl_notificationtimestamp' => $dbw->timestampOrNull( $this->params['notifTime'] )
+ ],
+ [
+ 'wl_user' => $this->params['userid'],
+ 'wl_namespace' => $this->title->getNamespace(),
+ 'wl_title' => $this->title->getDBkey(),
+ // Add a "check and set" style comparison to handle conflicts.
+ // The inequality always avoids updates when the current value
+ // is already NULL per ANSI SQL. This is desired since NULL means
+ // that the user is "caught up" on edits already. When the field
+ // is non-NULL, make sure not to set it back in time or set it to
+ // NULL when newer revisions were in fact added to the page.
+ 'wl_notificationtimestamp < ' . $dbw->addQuotes( $dbw->timestamp( $casTimestamp ) )
+ ],
+ __METHOD__
+ );
+ }
+}
diff --git a/www/wiki/includes/jobqueue/jobs/AssembleUploadChunksJob.php b/www/wiki/includes/jobqueue/jobs/AssembleUploadChunksJob.php
new file mode 100644
index 00000000..e2914be5
--- /dev/null
+++ b/www/wiki/includes/jobqueue/jobs/AssembleUploadChunksJob.php
@@ -0,0 +1,138 @@
+<?php
+/**
+ * Assemble the segments of a chunked upload.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along
+ * with this program; if not, write to the Free Software Foundation, Inc.,
+ * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ * http://www.gnu.org/copyleft/gpl.html
+ *
+ * @file
+ * @ingroup Upload
+ */
+use Wikimedia\ScopedCallback;
+
+/**
+ * Assemble the segments of a chunked upload.
+ *
+ * @ingroup Upload
+ */
+class AssembleUploadChunksJob extends Job {
+ public function __construct( Title $title, array $params ) {
+ parent::__construct( 'AssembleUploadChunks', $title, $params );
+ $this->removeDuplicates = true;
+ }
+
+ public function run() {
+ $scope = RequestContext::importScopedSession( $this->params['session'] );
+ $this->addTeardownCallback( function () use ( &$scope ) {
+ ScopedCallback::consume( $scope ); // T126450
+ } );
+
+ $context = RequestContext::getMain();
+ $user = $context->getUser();
+ try {
+ if ( !$user->isLoggedIn() ) {
+ $this->setLastError( "Could not load the author user from session." );
+
+ return false;
+ }
+
+ UploadBase::setSessionStatus(
+ $user,
+ $this->params['filekey'],
+ [ 'result' => 'Poll', 'stage' => 'assembling', 'status' => Status::newGood() ]
+ );
+
+ $upload = new UploadFromChunks( $user );
+ $upload->continueChunks(
+ $this->params['filename'],
+ $this->params['filekey'],
+ new WebRequestUpload( $context->getRequest(), 'null' )
+ );
+
+ // Combine all of the chunks into a local file and upload that to a new stash file
+ $status = $upload->concatenateChunks();
+ if ( !$status->isGood() ) {
+ UploadBase::setSessionStatus(
+ $user,
+ $this->params['filekey'],
+ [ 'result' => 'Failure', 'stage' => 'assembling', 'status' => $status ]
+ );
+ $this->setLastError( $status->getWikiText( false, false, 'en' ) );
+
+ return false;
+ }
+
+ // We can only get warnings like 'duplicate' after concatenating the chunks
+ $status = Status::newGood();
+ $status->value = [ 'warnings' => $upload->checkWarnings() ];
+
+ // We have a new filekey for the fully concatenated file
+ $newFileKey = $upload->getStashFile()->getFileKey();
+
+ // Remove the old stash file row and first chunk file
+ $upload->stash->removeFileNoAuth( $this->params['filekey'] );
+
+ // Build the image info array while we have the local reference handy
+ $apiMain = new ApiMain(); // dummy object (XXX)
+ $imageInfo = $upload->getImageInfo( $apiMain->getResult() );
+
+ // Cleanup any temporary local file
+ $upload->cleanupTempFile();
+
+ // Cache the info so the user doesn't have to wait forever to get the final info
+ UploadBase::setSessionStatus(
+ $user,
+ $this->params['filekey'],
+ [
+ 'result' => 'Success',
+ 'stage' => 'assembling',
+ 'filekey' => $newFileKey,
+ 'imageinfo' => $imageInfo,
+ 'status' => $status
+ ]
+ );
+ } catch ( Exception $e ) {
+ UploadBase::setSessionStatus(
+ $user,
+ $this->params['filekey'],
+ [
+ 'result' => 'Failure',
+ 'stage' => 'assembling',
+ 'status' => Status::newFatal( 'api-error-stashfailed' )
+ ]
+ );
+ $this->setLastError( get_class( $e ) . ": " . $e->getMessage() );
+ // To be extra robust.
+ MWExceptionHandler::rollbackMasterChangesAndLog( $e );
+
+ return false;
+ }
+
+ return true;
+ }
+
+ public function getDeduplicationInfo() {
+ $info = parent::getDeduplicationInfo();
+ if ( is_array( $info['params'] ) ) {
+ $info['params'] = [ 'filekey' => $info['params']['filekey'] ];
+ }
+
+ return $info;
+ }
+
+ public function allowRetries() {
+ return false;
+ }
+}
diff --git a/www/wiki/includes/jobqueue/jobs/CategoryMembershipChangeJob.php b/www/wiki/includes/jobqueue/jobs/CategoryMembershipChangeJob.php
new file mode 100644
index 00000000..16640be4
--- /dev/null
+++ b/www/wiki/includes/jobqueue/jobs/CategoryMembershipChangeJob.php
@@ -0,0 +1,253 @@
+<?php
+/**
+ * Updater for link tracking tables after a page edit.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along
+ * with this program; if not, write to the Free Software Foundation, Inc.,
+ * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ * http://www.gnu.org/copyleft/gpl.html
+ *
+ * @file
+ */
+use MediaWiki\MediaWikiServices;
+use Wikimedia\Rdbms\LBFactory;
+
+/**
+ * Job to add recent change entries mentioning category membership changes
+ *
+ * This allows users to easily scan categories for recent page membership changes
+ *
+ * Parameters include:
+ * - pageId : page ID
+ * - revTimestamp : timestamp of the triggering revision
+ *
+ * Category changes will be mentioned for revisions at/after the timestamp for this page
+ *
+ * @since 1.27
+ */
+class CategoryMembershipChangeJob extends Job {
+ /** @var int|null */
+ private $ticket;
+
+ const ENQUEUE_FUDGE_SEC = 60;
+
+ public function __construct( Title $title, array $params ) {
+ parent::__construct( 'categoryMembershipChange', $title, $params );
+ // Only need one job per page. Note that ENQUEUE_FUDGE_SEC handles races where an
+ // older revision job gets inserted while the newer revision job is de-duplicated.
+ $this->removeDuplicates = true;
+ }
+
+ public function run() {
+ $lbFactory = MediaWikiServices::getInstance()->getDBLoadBalancerFactory();
+ $lb = $lbFactory->getMainLB();
+ $dbw = $lb->getConnection( DB_MASTER );
+
+ $this->ticket = $lbFactory->getEmptyTransactionTicket( __METHOD__ );
+
+ $page = WikiPage::newFromID( $this->params['pageId'], WikiPage::READ_LATEST );
+ if ( !$page ) {
+ $this->setLastError( "Could not find page #{$this->params['pageId']}" );
+ return false; // deleted?
+ }
+
+ // Cut down on the time spent in safeWaitForMasterPos() in the critical section
+ $dbr = $lb->getConnection( DB_REPLICA, [ 'recentchanges' ] );
+ if ( !$lb->safeWaitForMasterPos( $dbr ) ) {
+ $this->setLastError( "Timed out while pre-waiting for replica DB to catch up" );
+ return false;
+ }
+
+ // Use a named lock so that jobs for this page see each others' changes
+ $lockKey = "CategoryMembershipUpdates:{$page->getId()}";
+ $scopedLock = $dbw->getScopedLockAndFlush( $lockKey, __METHOD__, 3 );
+ if ( !$scopedLock ) {
+ $this->setLastError( "Could not acquire lock '$lockKey'" );
+ return false;
+ }
+
+ // Wait till replica DB is caught up so that jobs for this page see each others' changes
+ if ( !$lb->safeWaitForMasterPos( $dbr ) ) {
+ $this->setLastError( "Timed out while waiting for replica DB to catch up" );
+ return false;
+ }
+ // Clear any stale REPEATABLE-READ snapshot
+ $dbr->flushSnapshot( __METHOD__ );
+
+ $cutoffUnix = wfTimestamp( TS_UNIX, $this->params['revTimestamp'] );
+ // Using ENQUEUE_FUDGE_SEC handles jobs inserted out of revision order due to the delay
+ // between COMMIT and actual enqueueing of the CategoryMembershipChangeJob job.
+ $cutoffUnix -= self::ENQUEUE_FUDGE_SEC;
+
+ // Get the newest page revision that has a SRC_CATEGORIZE row.
+ // Assume that category changes before it were already handled.
+ $row = $dbr->selectRow(
+ 'revision',
+ [ 'rev_timestamp', 'rev_id' ],
+ [
+ 'rev_page' => $page->getId(),
+ 'rev_timestamp >= ' . $dbr->addQuotes( $dbr->timestamp( $cutoffUnix ) ),
+ 'EXISTS (' . $dbr->selectSQLText(
+ 'recentchanges',
+ '1',
+ [
+ 'rc_this_oldid = rev_id',
+ 'rc_source' => RecentChange::SRC_CATEGORIZE,
+ // Allow rc_cur_id or rc_timestamp index usage
+ 'rc_cur_id = rev_page',
+ 'rc_timestamp = rev_timestamp'
+ ]
+ ) . ')'
+ ],
+ __METHOD__,
+ [ 'ORDER BY' => 'rev_timestamp DESC, rev_id DESC' ]
+ );
+ // Only consider revisions newer than any such revision
+ if ( $row ) {
+ $cutoffUnix = wfTimestamp( TS_UNIX, $row->rev_timestamp );
+ $lastRevId = (int)$row->rev_id;
+ } else {
+ $lastRevId = 0;
+ }
+
+ // Find revisions to this page made around and after this revision which lack category
+ // notifications in recent changes. This lets jobs pick up were the last one left off.
+ $encCutoff = $dbr->addQuotes( $dbr->timestamp( $cutoffUnix ) );
+ $revQuery = Revision::getQueryInfo();
+ $res = $dbr->select(
+ $revQuery['tables'],
+ $revQuery['fields'],
+ [
+ 'rev_page' => $page->getId(),
+ "rev_timestamp > $encCutoff" .
+ " OR (rev_timestamp = $encCutoff AND rev_id > $lastRevId)"
+ ],
+ __METHOD__,
+ [ 'ORDER BY' => 'rev_timestamp ASC, rev_id ASC' ],
+ $revQuery['joins']
+ );
+
+ // Apply all category updates in revision timestamp order
+ foreach ( $res as $row ) {
+ $this->notifyUpdatesForRevision( $lbFactory, $page, Revision::newFromRow( $row ) );
+ }
+
+ return true;
+ }
+
+ /**
+ * @param LBFactory $lbFactory
+ * @param WikiPage $page
+ * @param Revision $newRev
+ * @throws MWException
+ */
+ protected function notifyUpdatesForRevision(
+ LBFactory $lbFactory, WikiPage $page, Revision $newRev
+ ) {
+ $config = RequestContext::getMain()->getConfig();
+ $title = $page->getTitle();
+
+ // Get the new revision
+ if ( !$newRev->getContent() ) {
+ return; // deleted?
+ }
+
+ // Get the prior revision (the same for null edits)
+ if ( $newRev->getParentId() ) {
+ $oldRev = Revision::newFromId( $newRev->getParentId(), Revision::READ_LATEST );
+ if ( !$oldRev->getContent() ) {
+ return; // deleted?
+ }
+ } else {
+ $oldRev = null;
+ }
+
+ // Parse the new revision and get the categories
+ $categoryChanges = $this->getExplicitCategoriesChanges( $title, $newRev, $oldRev );
+ list( $categoryInserts, $categoryDeletes ) = $categoryChanges;
+ if ( !$categoryInserts && !$categoryDeletes ) {
+ return; // nothing to do
+ }
+
+ $catMembChange = new CategoryMembershipChange( $title, $newRev );
+ $catMembChange->checkTemplateLinks();
+
+ $batchSize = $config->get( 'UpdateRowsPerQuery' );
+ $insertCount = 0;
+
+ foreach ( $categoryInserts as $categoryName ) {
+ $categoryTitle = Title::makeTitle( NS_CATEGORY, $categoryName );
+ $catMembChange->triggerCategoryAddedNotification( $categoryTitle );
+ if ( $insertCount++ && ( $insertCount % $batchSize ) == 0 ) {
+ $lbFactory->commitAndWaitForReplication( __METHOD__, $this->ticket );
+ }
+ }
+
+ foreach ( $categoryDeletes as $categoryName ) {
+ $categoryTitle = Title::makeTitle( NS_CATEGORY, $categoryName );
+ $catMembChange->triggerCategoryRemovedNotification( $categoryTitle );
+ if ( $insertCount++ && ( $insertCount++ % $batchSize ) == 0 ) {
+ $lbFactory->commitAndWaitForReplication( __METHOD__, $this->ticket );
+ }
+ }
+ }
+
+ private function getExplicitCategoriesChanges(
+ Title $title, Revision $newRev, Revision $oldRev = null
+ ) {
+ // Inject the same timestamp for both revision parses to avoid seeing category changes
+ // due to time-based parser functions. Inject the same page title for the parses too.
+ // Note that REPEATABLE-READ makes template/file pages appear unchanged between parses.
+ $parseTimestamp = $newRev->getTimestamp();
+ // Parse the old rev and get the categories. Do not use link tables as that
+ // assumes these updates are perfectly FIFO and that link tables are always
+ // up to date, neither of which are true.
+ $oldCategories = $oldRev
+ ? $this->getCategoriesAtRev( $title, $oldRev, $parseTimestamp )
+ : [];
+ // Parse the new revision and get the categories
+ $newCategories = $this->getCategoriesAtRev( $title, $newRev, $parseTimestamp );
+
+ $categoryInserts = array_values( array_diff( $newCategories, $oldCategories ) );
+ $categoryDeletes = array_values( array_diff( $oldCategories, $newCategories ) );
+
+ return [ $categoryInserts, $categoryDeletes ];
+ }
+
+ /**
+ * @param Title $title
+ * @param Revision $rev
+ * @param string $parseTimestamp TS_MW
+ *
+ * @return string[] category names
+ */
+ private function getCategoriesAtRev( Title $title, Revision $rev, $parseTimestamp ) {
+ $content = $rev->getContent();
+ $options = $content->getContentHandler()->makeParserOptions( 'canonical' );
+ $options->setTimestamp( $parseTimestamp );
+ // This could possibly use the parser cache if it checked the revision ID,
+ // but that's more complicated than it's worth.
+ $output = $content->getParserOutput( $title, $rev->getId(), $options );
+
+ // array keys will cast numeric category names to ints
+ // so we need to cast them back to strings to avoid breaking things!
+ return array_map( 'strval', array_keys( $output->getCategories() ) );
+ }
+
+ public function getDeduplicationInfo() {
+ $info = parent::getDeduplicationInfo();
+ unset( $info['params']['revTimestamp'] ); // first job wins
+
+ return $info;
+ }
+}
diff --git a/www/wiki/includes/jobqueue/jobs/CdnPurgeJob.php b/www/wiki/includes/jobqueue/jobs/CdnPurgeJob.php
new file mode 100644
index 00000000..356eebab
--- /dev/null
+++ b/www/wiki/includes/jobqueue/jobs/CdnPurgeJob.php
@@ -0,0 +1,46 @@
+<?php
+/**
+ * Job to purge a set of URLs from CDN
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along
+ * with this program; if not, write to the Free Software Foundation, Inc.,
+ * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ * http://www.gnu.org/copyleft/gpl.html
+ *
+ * @file
+ * @ingroup JobQueue
+ */
+
+/**
+ * Job to purge a set of URLs from CDN
+ *
+ * @ingroup JobQueue
+ * @since 1.27
+ */
+class CdnPurgeJob extends Job {
+ /**
+ * @param Title $title
+ * @param array $params Job parameters (urls)
+ */
+ function __construct( Title $title, array $params ) {
+ parent::__construct( 'cdnPurge', $title, $params );
+ $this->removeDuplicates = false; // delay semantics are critical
+ }
+
+ public function run() {
+ // Use purge() directly to avoid infinite recursion
+ CdnCacheUpdate::purge( $this->params['urls'] );
+
+ return true;
+ }
+}
diff --git a/www/wiki/includes/jobqueue/jobs/ClearUserWatchlistJob.php b/www/wiki/includes/jobqueue/jobs/ClearUserWatchlistJob.php
new file mode 100644
index 00000000..77adfa1a
--- /dev/null
+++ b/www/wiki/includes/jobqueue/jobs/ClearUserWatchlistJob.php
@@ -0,0 +1,118 @@
+<?php
+
+use MediaWiki\MediaWikiServices;
+
+/**
+ * Job to clear a users watchlist in batches.
+ *
+ * @author Addshore
+ *
+ * @ingroup JobQueue
+ * @since 1.31
+ */
+class ClearUserWatchlistJob extends Job {
+
+ /**
+ * @param User $user User to clear the watchlist for.
+ * @param int $maxWatchlistId The maximum wl_id at the time the job was first created.
+ *
+ * @return ClearUserWatchlistJob
+ */
+ public static function newForUser( User $user, $maxWatchlistId ) {
+ return new self(
+ null,
+ [ 'userId' => $user->getId(), 'maxWatchlistId' => $maxWatchlistId ]
+ );
+ }
+
+ /**
+ * @param Title|null $title Not used by this job.
+ * @param array $params
+ * - userId, The ID for the user whose watchlist is being cleared.
+ * - maxWatchlistId, The maximum wl_id at the time the job was first created,
+ */
+ public function __construct( Title $title = null, array $params ) {
+ parent::__construct(
+ 'clearUserWatchlist',
+ SpecialPage::getTitleFor( 'EditWatchlist', 'clear' ),
+ $params
+ );
+
+ $this->removeDuplicates = true;
+ }
+
+ public function run() {
+ global $wgUpdateRowsPerQuery;
+ $userId = $this->params['userId'];
+ $maxWatchlistId = $this->params['maxWatchlistId'];
+ $batchSize = $wgUpdateRowsPerQuery;
+
+ $loadBalancer = MediaWikiServices::getInstance()->getDBLoadBalancer();
+ $dbw = $loadBalancer->getConnection( DB_MASTER );
+ $dbr = $loadBalancer->getConnection( DB_REPLICA, [ 'watchlist' ] );
+
+ // Wait before lock to try to reduce time waiting in the lock.
+ if ( !$loadBalancer->safeWaitForMasterPos( $dbr ) ) {
+ $this->setLastError( 'Timed out waiting for replica to catch up before lock' );
+ return false;
+ }
+
+ // Use a named lock so that jobs for this user see each others' changes
+ $lockKey = "ClearUserWatchlistJob:$userId";
+ $scopedLock = $dbw->getScopedLockAndFlush( $lockKey, __METHOD__, 10 );
+ if ( !$scopedLock ) {
+ $this->setLastError( "Could not acquire lock '$lockKey'" );
+ return false;
+ }
+
+ if ( !$loadBalancer->safeWaitForMasterPos( $dbr ) ) {
+ $this->setLastError( 'Timed out waiting for replica to catch up within lock' );
+ return false;
+ }
+
+ // Clear any stale REPEATABLE-READ snapshot
+ $dbr->flushSnapshot( __METHOD__ );
+
+ $watchlistIds = $dbr->selectFieldValues(
+ 'watchlist',
+ 'wl_id',
+ [
+ 'wl_user' => $userId,
+ 'wl_id <= ' . $maxWatchlistId
+ ],
+ __METHOD__,
+ [
+ 'ORDER BY' => 'wl_id ASC',
+ 'LIMIT' => $batchSize,
+ ]
+ );
+
+ if ( count( $watchlistIds ) == 0 ) {
+ return true;
+ }
+
+ $dbw->delete( 'watchlist', [ 'wl_id' => $watchlistIds ], __METHOD__ );
+
+ // Commit changes and remove lock before inserting next job.
+ $lbf = MediaWikiServices::getInstance()->getDBLoadBalancerFactory();
+ $lbf->commitMasterChanges( __METHOD__ );
+ unset( $scopedLock );
+
+ if ( count( $watchlistIds ) === (int)$batchSize ) {
+ // Until we get less results than the limit, recursively push
+ // the same job again.
+ JobQueueGroup::singleton()->push( new self( $this->getTitle(), $this->getParams() ) );
+ }
+
+ return true;
+ }
+
+ public function getDeduplicationInfo() {
+ $info = parent::getDeduplicationInfo();
+ // This job never has a namespace or title so we can't use it for deduplication
+ unset( $info['namespace'] );
+ unset( $info['title'] );
+ return $info;
+ }
+
+}
diff --git a/www/wiki/includes/jobqueue/jobs/ClearWatchlistNotificationsJob.php b/www/wiki/includes/jobqueue/jobs/ClearWatchlistNotificationsJob.php
new file mode 100644
index 00000000..94c1351a
--- /dev/null
+++ b/www/wiki/includes/jobqueue/jobs/ClearWatchlistNotificationsJob.php
@@ -0,0 +1,79 @@
+<?php
+/**
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along
+ * with this program; if not, write to the Free Software Foundation, Inc.,
+ * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ * http://www.gnu.org/copyleft/gpl.html
+ *
+ * @file
+ * @ingroup JobQueue
+ */
+
+use MediaWiki\MediaWikiServices;
+
+/**
+ * Job for clearing all of the "last viewed" timestamps for a user's watchlist
+ *
+ * Job parameters include:
+ * - userId: affected user ID [required]
+ * - casTime: UNIX timestamp of the event that triggered this job [required]
+ *
+ * @ingroup JobQueue
+ * @since 1.31
+ */
+class ClearWatchlistNotificationsJob extends Job {
+ function __construct( Title $title, array $params ) {
+ parent::__construct( 'clearWatchlistNotifications', $title, $params );
+
+ static $required = [ 'userId', 'casTime' ];
+ $missing = implode( ', ', array_diff( $required, array_keys( $this->params ) ) );
+ if ( $missing != '' ) {
+ throw new InvalidArgumentException( "Missing paramter(s) $missing" );
+ }
+
+ $this->removeDuplicates = true;
+ }
+
+ public function run() {
+ $services = MediaWikiServices::getInstance();
+ $lbFactory = $services->getDBLoadBalancerFactory();
+ $rowsPerQuery = $services->getMainConfig()->get( 'UpdateRowsPerQuery' );
+
+ $dbw = $lbFactory->getMainLB()->getConnection( DB_MASTER );
+ $ticket = $lbFactory->getEmptyTransactionTicket( __METHOD__ );
+
+ $asOfTimes = array_unique( $dbw->selectFieldValues(
+ 'watchlist',
+ 'wl_notificationtimestamp',
+ [ 'wl_user' => $this->params['userId'], 'wl_notificationtimestamp IS NOT NULL' ],
+ __METHOD__,
+ [ 'ORDER BY' => 'wl_notificationtimestamp DESC' ]
+ ) );
+
+ foreach ( array_chunk( $asOfTimes, $rowsPerQuery ) as $asOfTimeBatch ) {
+ $dbw->update(
+ 'watchlist',
+ [ 'wl_notificationtimestamp' => null ],
+ [
+ 'wl_user' => $this->params['userId'],
+ 'wl_notificationtimestamp' => $asOfTimeBatch,
+ // New notifications since the reset should not be cleared
+ 'wl_notificationtimestamp < ' .
+ $dbw->addQuotes( $dbw->timestamp( $this->params['casTime'] ) )
+ ],
+ __METHOD__
+ );
+ $lbFactory->commitAndWaitForReplication( __METHOD__, $ticket );
+ }
+ }
+}
diff --git a/www/wiki/includes/jobqueue/jobs/DeleteLinksJob.php b/www/wiki/includes/jobqueue/jobs/DeleteLinksJob.php
new file mode 100644
index 00000000..d0969e46
--- /dev/null
+++ b/www/wiki/includes/jobqueue/jobs/DeleteLinksJob.php
@@ -0,0 +1,67 @@
+<?php
+/**
+ * Job to update link tables for pages
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along
+ * with this program; if not, write to the Free Software Foundation, Inc.,
+ * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ * http://www.gnu.org/copyleft/gpl.html
+ *
+ * @file
+ * @ingroup JobQueue
+ */
+
+use MediaWiki\MediaWikiServices;
+
+/**
+ * Job to prune link tables for pages that were deleted
+ *
+ * Only DataUpdate classes should construct these jobs
+ *
+ * @ingroup JobQueue
+ * @since 1.27
+ */
+class DeleteLinksJob extends Job {
+ function __construct( Title $title, array $params ) {
+ parent::__construct( 'deleteLinks', $title, $params );
+ $this->removeDuplicates = true;
+ }
+
+ function run() {
+ if ( is_null( $this->title ) ) {
+ $this->setLastError( "deleteLinks: Invalid title" );
+ return false;
+ }
+
+ $pageId = $this->params['pageId'];
+
+ // Serialize links updates by page ID so they see each others' changes
+ $scopedLock = LinksUpdate::acquirePageLock( wfGetDB( DB_MASTER ), $pageId, 'job' );
+
+ if ( WikiPage::newFromID( $pageId, WikiPage::READ_LATEST ) ) {
+ // The page was restored somehow or something went wrong
+ $this->setLastError( "deleteLinks: Page #$pageId exists" );
+ return false;
+ }
+
+ $factory = MediaWikiServices::getInstance()->getDBLoadBalancerFactory();
+ $timestamp = isset( $this->params['timestamp'] ) ? $this->params['timestamp'] : null;
+ $page = WikiPage::factory( $this->title ); // title when deleted
+
+ $update = new LinksDeletionUpdate( $page, $pageId, $timestamp );
+ $update->setTransactionTicket( $factory->getEmptyTransactionTicket( __METHOD__ ) );
+ $update->doUpdate();
+
+ return true;
+ }
+}
diff --git a/www/wiki/includes/jobqueue/jobs/DoubleRedirectJob.php b/www/wiki/includes/jobqueue/jobs/DoubleRedirectJob.php
new file mode 100644
index 00000000..74c446fc
--- /dev/null
+++ b/www/wiki/includes/jobqueue/jobs/DoubleRedirectJob.php
@@ -0,0 +1,252 @@
+<?php
+/**
+ * Job to fix double redirects after moving a page.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along
+ * with this program; if not, write to the Free Software Foundation, Inc.,
+ * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ * http://www.gnu.org/copyleft/gpl.html
+ *
+ * @file
+ * @ingroup JobQueue
+ */
+
+/**
+ * Job to fix double redirects after moving a page
+ *
+ * @ingroup JobQueue
+ */
+class DoubleRedirectJob extends Job {
+ /** @var string Reason for the change, 'maintenance' or 'move'. Suffix fo
+ * message key 'double-redirect-fixed-'.
+ */
+ private $reason;
+
+ /** @var Title The title which has changed, redirects pointing to this
+ * title are fixed
+ */
+ private $redirTitle;
+
+ /** @var User */
+ private static $user;
+
+ /**
+ * @param Title $title
+ * @param array $params
+ */
+ function __construct( Title $title, array $params ) {
+ parent::__construct( 'fixDoubleRedirect', $title, $params );
+ $this->reason = $params['reason'];
+ $this->redirTitle = Title::newFromText( $params['redirTitle'] );
+ }
+
+ /**
+ * Insert jobs into the job queue to fix redirects to the given title
+ * @param string $reason The reason for the fix, see message
+ * "double-redirect-fixed-<reason>"
+ * @param Title $redirTitle The title which has changed, redirects
+ * pointing to this title are fixed
+ * @param bool $destTitle Not used
+ */
+ public static function fixRedirects( $reason, $redirTitle, $destTitle = false ) {
+ # Need to use the master to get the redirect table updated in the same transaction
+ $dbw = wfGetDB( DB_MASTER );
+ $res = $dbw->select(
+ [ 'redirect', 'page' ],
+ [ 'page_namespace', 'page_title' ],
+ [
+ 'page_id = rd_from',
+ 'rd_namespace' => $redirTitle->getNamespace(),
+ 'rd_title' => $redirTitle->getDBkey()
+ ], __METHOD__ );
+ if ( !$res->numRows() ) {
+ return;
+ }
+ $jobs = [];
+ foreach ( $res as $row ) {
+ $title = Title::makeTitle( $row->page_namespace, $row->page_title );
+ if ( !$title ) {
+ continue;
+ }
+
+ $jobs[] = new self( $title, [
+ 'reason' => $reason,
+ 'redirTitle' => $redirTitle->getPrefixedDBkey() ] );
+ # Avoid excessive memory usage
+ if ( count( $jobs ) > 10000 ) {
+ JobQueueGroup::singleton()->push( $jobs );
+ $jobs = [];
+ }
+ }
+ JobQueueGroup::singleton()->push( $jobs );
+ }
+
+ /**
+ * @return bool
+ */
+ function run() {
+ if ( !$this->redirTitle ) {
+ $this->setLastError( 'Invalid title' );
+
+ return false;
+ }
+
+ $targetRev = Revision::newFromTitle( $this->title, false, Revision::READ_LATEST );
+ if ( !$targetRev ) {
+ wfDebug( __METHOD__ . ": target redirect already deleted, ignoring\n" );
+
+ return true;
+ }
+ $content = $targetRev->getContent();
+ $currentDest = $content ? $content->getRedirectTarget() : null;
+ if ( !$currentDest || !$currentDest->equals( $this->redirTitle ) ) {
+ wfDebug( __METHOD__ . ": Redirect has changed since the job was queued\n" );
+
+ return true;
+ }
+
+ // Check for a suppression tag (used e.g. in periodically archived discussions)
+ $mw = MagicWord::get( 'staticredirect' );
+ if ( $content->matchMagicWord( $mw ) ) {
+ wfDebug( __METHOD__ . ": skipping: suppressed with __STATICREDIRECT__\n" );
+
+ return true;
+ }
+
+ // Find the current final destination
+ $newTitle = self::getFinalDestination( $this->redirTitle );
+ if ( !$newTitle ) {
+ wfDebug( __METHOD__ .
+ ": skipping: single redirect, circular redirect or invalid redirect destination\n" );
+
+ return true;
+ }
+ if ( $newTitle->equals( $this->redirTitle ) ) {
+ // The redirect is already right, no need to change it
+ // This can happen if the page was moved back (say after vandalism)
+ wfDebug( __METHOD__ . " : skipping, already good\n" );
+ }
+
+ // Preserve fragment (T16904)
+ $newTitle = Title::makeTitle( $newTitle->getNamespace(), $newTitle->getDBkey(),
+ $currentDest->getFragment(), $newTitle->getInterwiki() );
+
+ // Fix the text
+ $newContent = $content->updateRedirect( $newTitle );
+
+ if ( $newContent->equals( $content ) ) {
+ $this->setLastError( 'Content unchanged???' );
+
+ return false;
+ }
+
+ $user = $this->getUser();
+ if ( !$user ) {
+ $this->setLastError( 'Invalid user' );
+
+ return false;
+ }
+
+ // Save it
+ global $wgUser;
+ $oldUser = $wgUser;
+ $wgUser = $user;
+ $article = WikiPage::factory( $this->title );
+
+ // Messages: double-redirect-fixed-move, double-redirect-fixed-maintenance
+ $reason = wfMessage( 'double-redirect-fixed-' . $this->reason,
+ $this->redirTitle->getPrefixedText(), $newTitle->getPrefixedText()
+ )->inContentLanguage()->text();
+ $flags = EDIT_UPDATE | EDIT_SUPPRESS_RC | EDIT_INTERNAL;
+ $article->doEditContent( $newContent, $reason, $flags, false, $user );
+ $wgUser = $oldUser;
+
+ return true;
+ }
+
+ /**
+ * Get the final destination of a redirect
+ *
+ * @param Title $title
+ *
+ * @return Title|bool The final Title after following all redirects, or false if
+ * the page is not a redirect or the redirect loops.
+ */
+ public static function getFinalDestination( $title ) {
+ $dbw = wfGetDB( DB_MASTER );
+
+ // Circular redirect check
+ $seenTitles = [];
+ $dest = false;
+
+ while ( true ) {
+ $titleText = $title->getPrefixedDBkey();
+ if ( isset( $seenTitles[$titleText] ) ) {
+ wfDebug( __METHOD__, "Circular redirect detected, aborting\n" );
+
+ return false;
+ }
+ $seenTitles[$titleText] = true;
+
+ if ( $title->isExternal() ) {
+ // If the target is interwiki, we have to break early (T42352).
+ // Otherwise it will look up a row in the local page table
+ // with the namespace/page of the interwiki target which can cause
+ // unexpected results (e.g. X -> foo:Bar -> Bar -> .. )
+ break;
+ }
+
+ $row = $dbw->selectRow(
+ [ 'redirect', 'page' ],
+ [ 'rd_namespace', 'rd_title', 'rd_interwiki' ],
+ [
+ 'rd_from=page_id',
+ 'page_namespace' => $title->getNamespace(),
+ 'page_title' => $title->getDBkey()
+ ], __METHOD__ );
+ if ( !$row ) {
+ # No redirect from here, chain terminates
+ break;
+ } else {
+ $dest = $title = Title::makeTitle(
+ $row->rd_namespace,
+ $row->rd_title,
+ '',
+ $row->rd_interwiki
+ );
+ }
+ }
+
+ return $dest;
+ }
+
+ /**
+ * Get a user object for doing edits, from a request-lifetime cache
+ * False will be returned if the user name specified in the
+ * 'double-redirect-fixer' message is invalid.
+ *
+ * @return User|bool
+ */
+ function getUser() {
+ if ( !self::$user ) {
+ $username = wfMessage( 'double-redirect-fixer' )->inContentLanguage()->text();
+ self::$user = User::newFromName( $username );
+ # User::newFromName() can return false on a badly configured wiki.
+ if ( self::$user && !self::$user->isLoggedIn() ) {
+ self::$user->addToDatabase();
+ }
+ }
+
+ return self::$user;
+ }
+}
diff --git a/www/wiki/includes/jobqueue/jobs/DuplicateJob.php b/www/wiki/includes/jobqueue/jobs/DuplicateJob.php
new file mode 100644
index 00000000..c005a29a
--- /dev/null
+++ b/www/wiki/includes/jobqueue/jobs/DuplicateJob.php
@@ -0,0 +1,59 @@
+<?php
+/**
+ * No-op job that does nothing.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along
+ * with this program; if not, write to the Free Software Foundation, Inc.,
+ * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ * http://www.gnu.org/copyleft/gpl.html
+ *
+ * @file
+ * @ingroup JobQueue
+ */
+
+/**
+ * No-op job that does nothing. Used to represent duplicates.
+ *
+ * @ingroup JobQueue
+ */
+final class DuplicateJob extends Job {
+ /**
+ * Callers should use DuplicateJob::newFromJob() instead
+ *
+ * @param Title $title
+ * @param array $params Job parameters
+ */
+ function __construct( Title $title, array $params ) {
+ parent::__construct( 'duplicate', $title, $params );
+ }
+
+ /**
+ * Get a duplicate no-op version of a job
+ *
+ * @param Job $job
+ * @return Job
+ */
+ public static function newFromJob( Job $job ) {
+ $djob = new self( $job->getTitle(), $job->getParams() );
+ $djob->command = $job->getType();
+ $djob->params = is_array( $djob->params ) ? $djob->params : [];
+ $djob->params = [ 'isDuplicate' => true ] + $djob->params;
+ $djob->metadata = $job->metadata;
+
+ return $djob;
+ }
+
+ public function run() {
+ return true;
+ }
+}
diff --git a/www/wiki/includes/jobqueue/jobs/EmaillingJob.php b/www/wiki/includes/jobqueue/jobs/EmaillingJob.php
new file mode 100644
index 00000000..960e8828
--- /dev/null
+++ b/www/wiki/includes/jobqueue/jobs/EmaillingJob.php
@@ -0,0 +1,46 @@
+<?php
+/**
+ * Old job for notification emails.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along
+ * with this program; if not, write to the Free Software Foundation, Inc.,
+ * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ * http://www.gnu.org/copyleft/gpl.html
+ *
+ * @file
+ * @ingroup JobQueue
+ */
+
+/**
+ * Old job used for sending single notification emails;
+ * kept for backwards-compatibility
+ *
+ * @ingroup JobQueue
+ */
+class EmaillingJob extends Job {
+ function __construct( Title $title = null, array $params ) {
+ parent::__construct( 'sendMail', Title::newMainPage(), $params );
+ }
+
+ function run() {
+ $status = UserMailer::send(
+ $this->params['to'],
+ $this->params['from'],
+ $this->params['subj'],
+ $this->params['body'],
+ [ 'replyTo' => $this->params['replyto'] ]
+ );
+
+ return $status->isOK();
+ }
+}
diff --git a/www/wiki/includes/jobqueue/jobs/EnotifNotifyJob.php b/www/wiki/includes/jobqueue/jobs/EnotifNotifyJob.php
new file mode 100644
index 00000000..9a5c3c72
--- /dev/null
+++ b/www/wiki/includes/jobqueue/jobs/EnotifNotifyJob.php
@@ -0,0 +1,57 @@
+<?php
+/**
+ * Job for notification emails.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along
+ * with this program; if not, write to the Free Software Foundation, Inc.,
+ * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ * http://www.gnu.org/copyleft/gpl.html
+ *
+ * @file
+ * @ingroup JobQueue
+ */
+
+/**
+ * Job for email notification mails
+ *
+ * @ingroup JobQueue
+ */
+class EnotifNotifyJob extends Job {
+ function __construct( Title $title, array $params ) {
+ parent::__construct( 'enotifNotify', $title, $params );
+ }
+
+ function run() {
+ $enotif = new EmailNotification();
+ // Get the user from ID (rename safe). Anons are 0, so defer to name.
+ if ( isset( $this->params['editorID'] ) && $this->params['editorID'] ) {
+ $editor = User::newFromId( $this->params['editorID'] );
+ // B/C, only the name might be given.
+ } else {
+ # @todo FIXME: newFromName could return false on a badly configured wiki.
+ $editor = User::newFromName( $this->params['editor'], false );
+ }
+ $enotif->actuallyNotifyOnPageChange(
+ $editor,
+ $this->title,
+ $this->params['timestamp'],
+ $this->params['summary'],
+ $this->params['minorEdit'],
+ $this->params['oldid'],
+ $this->params['watchers'],
+ $this->params['pageStatus']
+ );
+
+ return true;
+ }
+}
diff --git a/www/wiki/includes/jobqueue/jobs/EnqueueJob.php b/www/wiki/includes/jobqueue/jobs/EnqueueJob.php
new file mode 100644
index 00000000..ea7a8d78
--- /dev/null
+++ b/www/wiki/includes/jobqueue/jobs/EnqueueJob.php
@@ -0,0 +1,98 @@
+<?php
+/**
+ * Router job that takes jobs and enqueues them.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along
+ * with this program; if not, write to the Free Software Foundation, Inc.,
+ * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ * http://www.gnu.org/copyleft/gpl.html
+ *
+ * @file
+ * @ingroup JobQueue
+ */
+
+/**
+ * Router job that takes jobs and enqueues them to their proper queues
+ *
+ * This can be used for getting sets of multiple jobs or sets of jobs intended for multiple
+ * queues to be inserted more robustly. This is a single job that, upon running, enqueues the
+ * wrapped jobs. If some of those fail to enqueue then the EnqueueJob will be retried. Due to
+ * the possibility of duplicate enqueues, the wrapped jobs should be idempotent.
+ *
+ * @ingroup JobQueue
+ * @since 1.25
+ */
+final class EnqueueJob extends Job {
+ /**
+ * Callers should use the factory methods instead
+ *
+ * @param Title $title
+ * @param array $params Job parameters
+ */
+ function __construct( Title $title, array $params ) {
+ parent::__construct( 'enqueue', $title, $params );
+ }
+
+ /**
+ * @param JobSpecification|JobSpecification[] $jobs
+ * @return EnqueueJob
+ */
+ public static function newFromLocalJobs( $jobs ) {
+ $jobs = is_array( $jobs ) ? $jobs : [ $jobs ];
+
+ return self::newFromJobsByWiki( [ wfWikiID() => $jobs ] );
+ }
+
+ /**
+ * @param array $jobsByWiki Map of (wiki => JobSpecification list)
+ * @return EnqueueJob
+ */
+ public static function newFromJobsByWiki( array $jobsByWiki ) {
+ $deduplicate = true;
+
+ $jobMapsByWiki = [];
+ foreach ( $jobsByWiki as $wiki => $jobs ) {
+ $jobMapsByWiki[$wiki] = [];
+ foreach ( $jobs as $job ) {
+ if ( $job instanceof JobSpecification ) {
+ $jobMapsByWiki[$wiki][] = $job->toSerializableArray();
+ } else {
+ throw new InvalidArgumentException( "Jobs must be of type JobSpecification." );
+ }
+ $deduplicate = $deduplicate && $job->ignoreDuplicates();
+ }
+ }
+
+ $eJob = new self(
+ Title::makeTitle( NS_SPECIAL, 'Badtitle/' . __CLASS__ ),
+ [ 'jobsByWiki' => $jobMapsByWiki ]
+ );
+ // If *all* jobs to be pushed are to be de-duplicated (a common case), then
+ // de-duplicate this whole job itself to avoid build up in high traffic cases
+ $eJob->removeDuplicates = $deduplicate;
+
+ return $eJob;
+ }
+
+ public function run() {
+ foreach ( $this->params['jobsByWiki'] as $wiki => $jobMaps ) {
+ $jobSpecs = [];
+ foreach ( $jobMaps as $jobMap ) {
+ $jobSpecs[] = JobSpecification::newFromArray( $jobMap );
+ }
+ JobQueueGroup::singleton( $wiki )->push( $jobSpecs );
+ }
+
+ return true;
+ }
+}
diff --git a/www/wiki/includes/jobqueue/jobs/HTMLCacheUpdateJob.php b/www/wiki/includes/jobqueue/jobs/HTMLCacheUpdateJob.php
new file mode 100644
index 00000000..34028df1
--- /dev/null
+++ b/www/wiki/includes/jobqueue/jobs/HTMLCacheUpdateJob.php
@@ -0,0 +1,202 @@
+<?php
+/**
+ * HTML cache invalidation of all pages linking to a given title.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along
+ * with this program; if not, write to the Free Software Foundation, Inc.,
+ * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ * http://www.gnu.org/copyleft/gpl.html
+ *
+ * @file
+ * @ingroup JobQueue
+ * @ingroup Cache
+ */
+
+use MediaWiki\MediaWikiServices;
+
+/**
+ * Job to purge the cache for all pages that link to or use another page or file
+ *
+ * This job comes in a few variants:
+ * - a) Recursive jobs to purge caches for backlink pages for a given title.
+ * These jobs have (recursive:true,table:<table>) set.
+ * - b) Jobs to purge caches for a set of titles (the job title is ignored).
+ * These jobs have (pages:(<page ID>:(<namespace>,<title>),...) set.
+ *
+ * @ingroup JobQueue
+ */
+class HTMLCacheUpdateJob extends Job {
+ function __construct( Title $title, array $params ) {
+ parent::__construct( 'htmlCacheUpdate', $title, $params );
+ // Avoid the overhead of de-duplication when it would be pointless.
+ // Note that these jobs always set page_touched to the current time,
+ // so letting the older existing job "win" is still correct.
+ $this->removeDuplicates = (
+ // Ranges rarely will line up
+ !isset( $params['range'] ) &&
+ // Multiple pages per job make matches unlikely
+ !( isset( $params['pages'] ) && count( $params['pages'] ) != 1 )
+ );
+ $this->params += [ 'causeAction' => 'unknown', 'causeAgent' => 'unknown' ];
+ }
+
+ /**
+ * @param Title $title Title to purge backlink pages from
+ * @param string $table Backlink table name
+ * @param array $params Additional job parameters
+ * @return HTMLCacheUpdateJob
+ */
+ public static function newForBacklinks( Title $title, $table, $params = [] ) {
+ return new self(
+ $title,
+ [
+ 'table' => $table,
+ 'recursive' => true
+ ] + Job::newRootJobParams( // "overall" refresh links job info
+ "htmlCacheUpdate:{$table}:{$title->getPrefixedText()}"
+ ) + $params
+ );
+ }
+
+ function run() {
+ global $wgUpdateRowsPerJob, $wgUpdateRowsPerQuery;
+
+ if ( isset( $this->params['table'] ) && !isset( $this->params['pages'] ) ) {
+ $this->params['recursive'] = true; // b/c; base job
+ }
+
+ // Job to purge all (or a range of) backlink pages for a page
+ if ( !empty( $this->params['recursive'] ) ) {
+ // Carry over information for de-duplication
+ $extraParams = $this->getRootJobParams();
+ // Carry over cause information for logging
+ $extraParams['causeAction'] = $this->params['causeAction'];
+ $extraParams['causeAgent'] = $this->params['causeAgent'];
+ // Convert this into no more than $wgUpdateRowsPerJob HTMLCacheUpdateJob per-title
+ // jobs and possibly a recursive HTMLCacheUpdateJob job for the rest of the backlinks
+ $jobs = BacklinkJobUtils::partitionBacklinkJob(
+ $this,
+ $wgUpdateRowsPerJob,
+ $wgUpdateRowsPerQuery, // jobs-per-title
+ // Carry over information for de-duplication
+ [ 'params' => $extraParams ]
+ );
+ JobQueueGroup::singleton()->push( $jobs );
+ // Job to purge pages for a set of titles
+ } elseif ( isset( $this->params['pages'] ) ) {
+ $this->invalidateTitles( $this->params['pages'] );
+ // Job to update a single title
+ } else {
+ $t = $this->title;
+ $this->invalidateTitles( [
+ $t->getArticleID() => [ $t->getNamespace(), $t->getDBkey() ]
+ ] );
+ }
+
+ return true;
+ }
+
+ /**
+ * @param array $pages Map of (page ID => (namespace, DB key)) entries
+ */
+ protected function invalidateTitles( array $pages ) {
+ global $wgUpdateRowsPerQuery, $wgUseFileCache, $wgPageLanguageUseDB;
+
+ // Get all page IDs in this query into an array
+ $pageIds = array_keys( $pages );
+ if ( !$pageIds ) {
+ return;
+ }
+
+ // Bump page_touched to the current timestamp. This used to use the root job timestamp
+ // (e.g. template/file edit time), which was a bit more efficient when template edits are
+ // rare and don't effect the same pages much. However, this way allows for better
+ // de-duplication, which is much more useful for wikis with high edit rates. Note that
+ // RefreshLinksJob, which is enqueued alongside HTMLCacheUpdateJob, saves the parser output
+ // since it has to parse anyway. We assume that vast majority of the cache jobs finish
+ // before the link jobs, so using the current timestamp instead of the root timestamp is
+ // not expected to invalidate these cache entries too often.
+ $touchTimestamp = wfTimestampNow();
+ // If page_touched is higher than this, then something else already bumped it after enqueue
+ $condTimestamp = isset( $this->params['rootJobTimestamp'] )
+ ? $this->params['rootJobTimestamp']
+ : $touchTimestamp;
+
+ $dbw = wfGetDB( DB_MASTER );
+ $factory = MediaWikiServices::getInstance()->getDBLoadBalancerFactory();
+ $ticket = $factory->getEmptyTransactionTicket( __METHOD__ );
+ // Update page_touched (skipping pages already touched since the root job).
+ // Check $wgUpdateRowsPerQuery for sanity; batch jobs are sized by that already.
+ foreach ( array_chunk( $pageIds, $wgUpdateRowsPerQuery ) as $batch ) {
+ $factory->commitAndWaitForReplication( __METHOD__, $ticket );
+
+ $dbw->update( 'page',
+ [ 'page_touched' => $dbw->timestamp( $touchTimestamp ) ],
+ [ 'page_id' => $batch,
+ // don't invalidated pages that were already invalidated
+ "page_touched < " . $dbw->addQuotes( $dbw->timestamp( $condTimestamp ) )
+ ],
+ __METHOD__
+ );
+ }
+ // Get the list of affected pages (races only mean something else did the purge)
+ $titleArray = TitleArray::newFromResult( $dbw->select(
+ 'page',
+ array_merge(
+ [ 'page_namespace', 'page_title' ],
+ $wgPageLanguageUseDB ? [ 'page_lang' ] : []
+ ),
+ [ 'page_id' => $pageIds, 'page_touched' => $dbw->timestamp( $touchTimestamp ) ],
+ __METHOD__
+ ) );
+
+ // Update CDN; call purge() directly so as to not bother with secondary purges
+ $urls = [];
+ foreach ( $titleArray as $title ) {
+ /** @var Title $title */
+ $urls = array_merge( $urls, $title->getCdnUrls() );
+ }
+ CdnCacheUpdate::purge( $urls );
+
+ // Update file cache
+ if ( $wgUseFileCache ) {
+ foreach ( $titleArray as $title ) {
+ HTMLFileCache::clearFileCache( $title );
+ }
+ }
+ }
+
+ public function getDeduplicationInfo() {
+ $info = parent::getDeduplicationInfo();
+ if ( is_array( $info['params'] ) ) {
+ // For per-pages jobs, the job title is that of the template that changed
+ // (or similar), so remove that since it ruins duplicate detection
+ if ( isset( $info['params']['pages'] ) ) {
+ unset( $info['namespace'] );
+ unset( $info['title'] );
+ }
+ }
+
+ return $info;
+ }
+
+ public function workItemCount() {
+ if ( !empty( $this->params['recursive'] ) ) {
+ return 0; // nothing actually purged
+ } elseif ( isset( $this->params['pages'] ) ) {
+ return count( $this->params['pages'] );
+ }
+
+ return 1; // one title
+ }
+}
diff --git a/www/wiki/includes/jobqueue/jobs/NullJob.php b/www/wiki/includes/jobqueue/jobs/NullJob.php
new file mode 100644
index 00000000..80826fe1
--- /dev/null
+++ b/www/wiki/includes/jobqueue/jobs/NullJob.php
@@ -0,0 +1,76 @@
+<?php
+/**
+ * Degenerate job that does nothing.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along
+ * with this program; if not, write to the Free Software Foundation, Inc.,
+ * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ * http://www.gnu.org/copyleft/gpl.html
+ *
+ * @file
+ * @ingroup JobQueue
+ */
+
+/**
+ * Degenerate job that does nothing, but can optionally replace itself
+ * in the queue and/or sleep for a brief time period. These can be used
+ * to represent "no-op" jobs or test lock contention and performance.
+ *
+ * @par Example:
+ * Inserting a null job in the configured job queue:
+ * @code
+ * $ php maintenance/eval.php
+ * > $queue = JobQueueGroup::singleton();
+ * > $job = new NullJob( Title::newMainPage(), [ 'lives' => 10 ] );
+ * > $queue->push( $job );
+ * @endcode
+ * You can then confirm the job has been enqueued by using the showJobs.php
+ * maintenance utility:
+ * @code
+ * $ php maintenance/showJobs.php --group
+ * null: 1 queue; 0 claimed (0 active, 0 abandoned)
+ * $
+ * @endcode
+ *
+ * @ingroup JobQueue
+ */
+class NullJob extends Job {
+ /**
+ * @param Title $title
+ * @param array $params Job parameters (lives, usleep)
+ */
+ function __construct( Title $title, array $params ) {
+ parent::__construct( 'null', $title, $params );
+ if ( !isset( $this->params['lives'] ) ) {
+ $this->params['lives'] = 1;
+ }
+ if ( !isset( $this->params['usleep'] ) ) {
+ $this->params['usleep'] = 0;
+ }
+ $this->removeDuplicates = !empty( $this->params['removeDuplicates'] );
+ }
+
+ public function run() {
+ if ( $this->params['usleep'] > 0 ) {
+ usleep( $this->params['usleep'] );
+ }
+ if ( $this->params['lives'] > 1 ) {
+ $params = $this->params;
+ $params['lives']--;
+ $job = new self( $this->title, $params );
+ JobQueueGroup::singleton()->push( $job );
+ }
+
+ return true;
+ }
+}
diff --git a/www/wiki/includes/jobqueue/jobs/PublishStashedFileJob.php b/www/wiki/includes/jobqueue/jobs/PublishStashedFileJob.php
new file mode 100644
index 00000000..e89812be
--- /dev/null
+++ b/www/wiki/includes/jobqueue/jobs/PublishStashedFileJob.php
@@ -0,0 +1,152 @@
+<?php
+/**
+ * Upload a file from the upload stash into the local file repo.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along
+ * with this program; if not, write to the Free Software Foundation, Inc.,
+ * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ * http://www.gnu.org/copyleft/gpl.html
+ *
+ * @file
+ * @ingroup Upload
+ * @ingroup JobQueue
+ */
+use Wikimedia\ScopedCallback;
+
+/**
+ * Upload a file from the upload stash into the local file repo.
+ *
+ * @ingroup Upload
+ * @ingroup JobQueue
+ */
+class PublishStashedFileJob extends Job {
+ public function __construct( Title $title, array $params ) {
+ parent::__construct( 'PublishStashedFile', $title, $params );
+ $this->removeDuplicates = true;
+ }
+
+ public function run() {
+ $scope = RequestContext::importScopedSession( $this->params['session'] );
+ $this->addTeardownCallback( function () use ( &$scope ) {
+ ScopedCallback::consume( $scope ); // T126450
+ } );
+
+ $context = RequestContext::getMain();
+ $user = $context->getUser();
+ try {
+ if ( !$user->isLoggedIn() ) {
+ $this->setLastError( "Could not load the author user from session." );
+
+ return false;
+ }
+
+ UploadBase::setSessionStatus(
+ $user,
+ $this->params['filekey'],
+ [ 'result' => 'Poll', 'stage' => 'publish', 'status' => Status::newGood() ]
+ );
+
+ $upload = new UploadFromStash( $user );
+ // @todo initialize() causes a GET, ideally we could frontload the antivirus
+ // checks and anything else to the stash stage (which includes concatenation and
+ // the local file is thus already there). That way, instead of GET+PUT, there could
+ // just be a COPY operation from the stash to the public zone.
+ $upload->initialize( $this->params['filekey'], $this->params['filename'] );
+
+ // Check if the local file checks out (this is generally a no-op)
+ $verification = $upload->verifyUpload();
+ if ( $verification['status'] !== UploadBase::OK ) {
+ $status = Status::newFatal( 'verification-error' );
+ $status->value = [ 'verification' => $verification ];
+ UploadBase::setSessionStatus(
+ $user,
+ $this->params['filekey'],
+ [ 'result' => 'Failure', 'stage' => 'publish', 'status' => $status ]
+ );
+ $this->setLastError( "Could not verify upload." );
+
+ return false;
+ }
+
+ // Upload the stashed file to a permanent location
+ $status = $upload->performUpload(
+ $this->params['comment'],
+ $this->params['text'],
+ $this->params['watch'],
+ $user,
+ isset( $this->params['tags'] ) ? $this->params['tags'] : []
+ );
+ if ( !$status->isGood() ) {
+ UploadBase::setSessionStatus(
+ $user,
+ $this->params['filekey'],
+ [ 'result' => 'Failure', 'stage' => 'publish', 'status' => $status ]
+ );
+ $this->setLastError( $status->getWikiText( false, false, 'en' ) );
+
+ return false;
+ }
+
+ // Build the image info array while we have the local reference handy
+ $apiMain = new ApiMain(); // dummy object (XXX)
+ $imageInfo = $upload->getImageInfo( $apiMain->getResult() );
+
+ // Cleanup any temporary local file
+ $upload->cleanupTempFile();
+
+ // Cache the info so the user doesn't have to wait forever to get the final info
+ UploadBase::setSessionStatus(
+ $user,
+ $this->params['filekey'],
+ [
+ 'result' => 'Success',
+ 'stage' => 'publish',
+ 'filename' => $upload->getLocalFile()->getName(),
+ 'imageinfo' => $imageInfo,
+ 'status' => Status::newGood()
+ ]
+ );
+ } catch ( Exception $e ) {
+ UploadBase::setSessionStatus(
+ $user,
+ $this->params['filekey'],
+ [
+ 'result' => 'Failure',
+ 'stage' => 'publish',
+ 'status' => Status::newFatal( 'api-error-publishfailed' )
+ ]
+ );
+ $this->setLastError( get_class( $e ) . ": " . $e->getMessage() );
+ // To prevent potential database referential integrity issues.
+ // See T34551.
+ MWExceptionHandler::rollbackMasterChangesAndLog( $e );
+
+ return false;
+ }
+
+ return true;
+ }
+
+ public function getDeduplicationInfo() {
+ $info = parent::getDeduplicationInfo();
+ if ( is_array( $info['params'] ) ) {
+ $info['params'] = [ 'filekey' => $info['params']['filekey'] ];
+ }
+
+ return $info;
+ }
+
+ public function allowRetries() {
+ return false;
+ }
+}
diff --git a/www/wiki/includes/jobqueue/jobs/RecentChangesUpdateJob.php b/www/wiki/includes/jobqueue/jobs/RecentChangesUpdateJob.php
new file mode 100644
index 00000000..8f508283
--- /dev/null
+++ b/www/wiki/includes/jobqueue/jobs/RecentChangesUpdateJob.php
@@ -0,0 +1,246 @@
+<?php
+/**
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along
+ * with this program; if not, write to the Free Software Foundation, Inc.,
+ * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ * http://www.gnu.org/copyleft/gpl.html
+ *
+ * @file
+ * @ingroup JobQueue
+ */
+use MediaWiki\MediaWikiServices;
+use Wikimedia\Rdbms\DBReplicationWaitError;
+
+/**
+ * Job for pruning recent changes
+ *
+ * @ingroup JobQueue
+ * @since 1.25
+ */
+class RecentChangesUpdateJob extends Job {
+ function __construct( Title $title, array $params ) {
+ parent::__construct( 'recentChangesUpdate', $title, $params );
+
+ if ( !isset( $params['type'] ) ) {
+ throw new Exception( "Missing 'type' parameter." );
+ }
+
+ $this->executionFlags |= self::JOB_NO_EXPLICIT_TRX_ROUND;
+ $this->removeDuplicates = true;
+ }
+
+ /**
+ * @return RecentChangesUpdateJob
+ */
+ final public static function newPurgeJob() {
+ return new self(
+ SpecialPage::getTitleFor( 'Recentchanges' ), [ 'type' => 'purge' ]
+ );
+ }
+
+ /**
+ * @return RecentChangesUpdateJob
+ * @since 1.26
+ */
+ final public static function newCacheUpdateJob() {
+ return new self(
+ SpecialPage::getTitleFor( 'Recentchanges' ), [ 'type' => 'cacheUpdate' ]
+ );
+ }
+
+ public function run() {
+ if ( $this->params['type'] === 'purge' ) {
+ $this->purgeExpiredRows();
+ } elseif ( $this->params['type'] === 'cacheUpdate' ) {
+ $this->updateActiveUsers();
+ } else {
+ throw new InvalidArgumentException(
+ "Invalid 'type' parameter '{$this->params['type']}'." );
+ }
+
+ return true;
+ }
+
+ protected function purgeExpiredRows() {
+ global $wgRCMaxAge, $wgUpdateRowsPerQuery;
+
+ $lockKey = wfWikiID() . ':recentchanges-prune';
+
+ $dbw = wfGetDB( DB_MASTER );
+ if ( !$dbw->lock( $lockKey, __METHOD__, 0 ) ) {
+ // already in progress
+ return;
+ }
+
+ $factory = MediaWikiServices::getInstance()->getDBLoadBalancerFactory();
+ $ticket = $factory->getEmptyTransactionTicket( __METHOD__ );
+ $cutoff = $dbw->timestamp( time() - $wgRCMaxAge );
+ $rcQuery = RecentChange::getQueryInfo();
+ do {
+ $rcIds = [];
+ $rows = [];
+ $res = $dbw->select(
+ $rcQuery['tables'],
+ $rcQuery['fields'],
+ [ 'rc_timestamp < ' . $dbw->addQuotes( $cutoff ) ],
+ __METHOD__,
+ [ 'LIMIT' => $wgUpdateRowsPerQuery ],
+ $rcQuery['joins']
+ );
+ foreach ( $res as $row ) {
+ $rcIds[] = $row->rc_id;
+ $rows[] = $row;
+ }
+ if ( $rcIds ) {
+ $dbw->delete( 'recentchanges', [ 'rc_id' => $rcIds ], __METHOD__ );
+ Hooks::run( 'RecentChangesPurgeRows', [ $rows ] );
+ // There might be more, so try waiting for replica DBs
+ try {
+ $factory->commitAndWaitForReplication(
+ __METHOD__, $ticket, [ 'timeout' => 3 ]
+ );
+ } catch ( DBReplicationWaitError $e ) {
+ // Another job will continue anyway
+ break;
+ }
+ }
+ } while ( $rcIds );
+
+ $dbw->unlock( $lockKey, __METHOD__ );
+ }
+
+ protected function updateActiveUsers() {
+ global $wgActiveUserDays;
+
+ // Users that made edits at least this many days ago are "active"
+ $days = $wgActiveUserDays;
+ // Pull in the full window of active users in this update
+ $window = $wgActiveUserDays * 86400;
+
+ $dbw = wfGetDB( DB_MASTER );
+ $factory = MediaWikiServices::getInstance()->getDBLoadBalancerFactory();
+ $ticket = $factory->getEmptyTransactionTicket( __METHOD__ );
+
+ $lockKey = wfWikiID() . '-activeusers';
+ if ( !$dbw->lock( $lockKey, __METHOD__, 0 ) ) {
+ // Exclusive update (avoids duplicate entries)… it's usually fine to just
+ // drop out here, if the Job is already running.
+ return;
+ }
+
+ // Long-running queries expected
+ $dbw->setSessionOptions( [ 'connTimeout' => 900 ] );
+
+ $nowUnix = time();
+ // Get the last-updated timestamp for the cache
+ $cTime = $dbw->selectField( 'querycache_info',
+ 'qci_timestamp',
+ [ 'qci_type' => 'activeusers' ]
+ );
+ $cTimeUnix = $cTime ? wfTimestamp( TS_UNIX, $cTime ) : 1;
+
+ // Pick the date range to fetch from. This is normally from the last
+ // update to till the present time, but has a limited window for sanity.
+ // If the window is limited, multiple runs are need to fully populate it.
+ $sTimestamp = max( $cTimeUnix, $nowUnix - $days * 86400 );
+ $eTimestamp = min( $sTimestamp + $window, $nowUnix );
+
+ // Get all the users active since the last update
+ $actorQuery = ActorMigration::newMigration()->getJoin( 'rc_user' );
+ $res = $dbw->select(
+ [ 'recentchanges' ] + $actorQuery['tables'],
+ [
+ 'rc_user_text' => $actorQuery['fields']['rc_user_text'],
+ 'lastedittime' => 'MAX(rc_timestamp)'
+ ],
+ [
+ $actorQuery['fields']['rc_user'] . ' > 0', // actual accounts
+ 'rc_type != ' . $dbw->addQuotes( RC_EXTERNAL ), // no wikidata
+ 'rc_log_type IS NULL OR rc_log_type != ' . $dbw->addQuotes( 'newusers' ),
+ 'rc_timestamp >= ' . $dbw->addQuotes( $dbw->timestamp( $sTimestamp ) ),
+ 'rc_timestamp <= ' . $dbw->addQuotes( $dbw->timestamp( $eTimestamp ) )
+ ],
+ __METHOD__,
+ [
+ 'GROUP BY' => [ 'rc_user_text' ],
+ 'ORDER BY' => 'NULL' // avoid filesort
+ ],
+ $actorQuery['joins']
+ );
+ $names = [];
+ foreach ( $res as $row ) {
+ $names[$row->rc_user_text] = $row->lastedittime;
+ }
+
+ // Find which of the recently active users are already accounted for
+ if ( count( $names ) ) {
+ $res = $dbw->select( 'querycachetwo',
+ [ 'user_name' => 'qcc_title' ],
+ [
+ 'qcc_type' => 'activeusers',
+ 'qcc_namespace' => NS_USER,
+ 'qcc_title' => array_keys( $names ),
+ 'qcc_value >= ' . $dbw->addQuotes( $nowUnix - $days * 86400 ), // TS_UNIX
+ ],
+ __METHOD__
+ );
+ // Note: In order for this to be actually consistent, we would need
+ // to update these rows with the new lastedittime.
+ foreach ( $res as $row ) {
+ unset( $names[$row->user_name] );
+ }
+ }
+
+ // Insert the users that need to be added to the list
+ if ( count( $names ) ) {
+ $newRows = [];
+ foreach ( $names as $name => $lastEditTime ) {
+ $newRows[] = [
+ 'qcc_type' => 'activeusers',
+ 'qcc_namespace' => NS_USER,
+ 'qcc_title' => $name,
+ 'qcc_value' => wfTimestamp( TS_UNIX, $lastEditTime ),
+ 'qcc_namespacetwo' => 0, // unused
+ 'qcc_titletwo' => '' // unused
+ ];
+ }
+ foreach ( array_chunk( $newRows, 500 ) as $rowBatch ) {
+ $dbw->insert( 'querycachetwo', $rowBatch, __METHOD__ );
+ $factory->commitAndWaitForReplication( __METHOD__, $ticket );
+ }
+ }
+
+ // If a transaction was already started, it might have an old
+ // snapshot, so kludge the timestamp range back as needed.
+ $asOfTimestamp = min( $eTimestamp, (int)$dbw->trxTimestamp() );
+
+ // Touch the data freshness timestamp
+ $dbw->replace( 'querycache_info',
+ [ 'qci_type' ],
+ [ 'qci_type' => 'activeusers',
+ 'qci_timestamp' => $dbw->timestamp( $asOfTimestamp ) ], // not always $now
+ __METHOD__
+ );
+
+ $dbw->unlock( $lockKey, __METHOD__ );
+
+ // Rotate out users that have not edited in too long (according to old data set)
+ $dbw->delete( 'querycachetwo',
+ [
+ 'qcc_type' => 'activeusers',
+ 'qcc_value < ' . $dbw->addQuotes( $nowUnix - $days * 86400 ) // TS_UNIX
+ ],
+ __METHOD__
+ );
+ }
+}
diff --git a/www/wiki/includes/jobqueue/jobs/RefreshLinksJob.php b/www/wiki/includes/jobqueue/jobs/RefreshLinksJob.php
new file mode 100644
index 00000000..8854c656
--- /dev/null
+++ b/www/wiki/includes/jobqueue/jobs/RefreshLinksJob.php
@@ -0,0 +1,320 @@
+<?php
+/**
+ * Job to update link tables for pages
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along
+ * with this program; if not, write to the Free Software Foundation, Inc.,
+ * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ * http://www.gnu.org/copyleft/gpl.html
+ *
+ * @file
+ * @ingroup JobQueue
+ */
+use MediaWiki\MediaWikiServices;
+use Wikimedia\Rdbms\DBReplicationWaitError;
+
+/**
+ * Job to update link tables for pages
+ *
+ * This job comes in a few variants:
+ * - a) Recursive jobs to update links for backlink pages for a given title.
+ * These jobs have (recursive:true,table:<table>) set.
+ * - b) Jobs to update links for a set of pages (the job title is ignored).
+ * These jobs have (pages:(<page ID>:(<namespace>,<title>),...) set.
+ * - c) Jobs to update links for a single page (the job title)
+ * These jobs need no extra fields set.
+ *
+ * @ingroup JobQueue
+ */
+class RefreshLinksJob extends Job {
+ /** @var float Cache parser output when it takes this long to render */
+ const PARSE_THRESHOLD_SEC = 1.0;
+ /** @var int Lag safety margin when comparing root job times to last-refresh times */
+ const CLOCK_FUDGE = 10;
+ /** @var int How many seconds to wait for replica DBs to catch up */
+ const LAG_WAIT_TIMEOUT = 15;
+
+ function __construct( Title $title, array $params ) {
+ parent::__construct( 'refreshLinks', $title, $params );
+ // Avoid the overhead of de-duplication when it would be pointless
+ $this->removeDuplicates = (
+ // Ranges rarely will line up
+ !isset( $params['range'] ) &&
+ // Multiple pages per job make matches unlikely
+ !( isset( $params['pages'] ) && count( $params['pages'] ) != 1 )
+ );
+ $this->params += [ 'causeAction' => 'unknown', 'causeAgent' => 'unknown' ];
+ }
+
+ /**
+ * @param Title $title
+ * @param array $params
+ * @return RefreshLinksJob
+ */
+ public static function newPrioritized( Title $title, array $params ) {
+ $job = new self( $title, $params );
+ $job->command = 'refreshLinksPrioritized';
+
+ return $job;
+ }
+
+ /**
+ * @param Title $title
+ * @param array $params
+ * @return RefreshLinksJob
+ */
+ public static function newDynamic( Title $title, array $params ) {
+ $job = new self( $title, $params );
+ $job->command = 'refreshLinksDynamic';
+
+ return $job;
+ }
+
+ function run() {
+ global $wgUpdateRowsPerJob;
+
+ // Job to update all (or a range of) backlink pages for a page
+ if ( !empty( $this->params['recursive'] ) ) {
+ // When the base job branches, wait for the replica DBs to catch up to the master.
+ // From then on, we know that any template changes at the time the base job was
+ // enqueued will be reflected in backlink page parses when the leaf jobs run.
+ if ( !isset( $this->params['range'] ) ) {
+ try {
+ $lbFactory = MediaWikiServices::getInstance()->getDBLoadBalancerFactory();
+ $lbFactory->waitForReplication( [
+ 'wiki' => wfWikiID(),
+ 'timeout' => self::LAG_WAIT_TIMEOUT
+ ] );
+ } catch ( DBReplicationWaitError $e ) { // only try so hard
+ $stats = MediaWikiServices::getInstance()->getStatsdDataFactory();
+ $stats->increment( 'refreshlinks.lag_wait_failed' );
+ }
+ }
+ // Carry over information for de-duplication
+ $extraParams = $this->getRootJobParams();
+ $extraParams['triggeredRecursive'] = true;
+ // Carry over cause information for logging
+ $extraParams['causeAction'] = $this->params['causeAction'];
+ $extraParams['causeAgent'] = $this->params['causeAgent'];
+ // Convert this into no more than $wgUpdateRowsPerJob RefreshLinks per-title
+ // jobs and possibly a recursive RefreshLinks job for the rest of the backlinks
+ $jobs = BacklinkJobUtils::partitionBacklinkJob(
+ $this,
+ $wgUpdateRowsPerJob,
+ 1, // job-per-title
+ [ 'params' => $extraParams ]
+ );
+ JobQueueGroup::singleton()->push( $jobs );
+ // Job to update link tables for a set of titles
+ } elseif ( isset( $this->params['pages'] ) ) {
+ foreach ( $this->params['pages'] as $nsAndKey ) {
+ list( $ns, $dbKey ) = $nsAndKey;
+ $this->runForTitle( Title::makeTitleSafe( $ns, $dbKey ) );
+ }
+ // Job to update link tables for a given title
+ } else {
+ $this->runForTitle( $this->title );
+ }
+
+ return true;
+ }
+
+ /**
+ * @param Title $title
+ * @return bool
+ */
+ protected function runForTitle( Title $title ) {
+ $services = MediaWikiServices::getInstance();
+ $stats = $services->getStatsdDataFactory();
+ $lbFactory = $services->getDBLoadBalancerFactory();
+ $ticket = $lbFactory->getEmptyTransactionTicket( __METHOD__ );
+
+ $page = WikiPage::factory( $title );
+ $page->loadPageData( WikiPage::READ_LATEST );
+
+ // Serialize links updates by page ID so they see each others' changes
+ $dbw = $lbFactory->getMainLB()->getConnection( DB_MASTER );
+ /** @noinspection PhpUnusedLocalVariableInspection */
+ $scopedLock = LinksUpdate::acquirePageLock( $dbw, $page->getId(), 'job' );
+ // Get the latest ID *after* acquirePageLock() flushed the transaction.
+ // This is used to detect edits/moves after loadPageData() but before the scope lock.
+ // The works around the chicken/egg problem of determining the scope lock key.
+ $latest = $title->getLatestRevID( Title::GAID_FOR_UPDATE );
+
+ if ( !empty( $this->params['triggeringRevisionId'] ) ) {
+ // Fetch the specified revision; lockAndGetLatest() below detects if the page
+ // was edited since and aborts in order to avoid corrupting the link tables
+ $revision = Revision::newFromId(
+ $this->params['triggeringRevisionId'],
+ Revision::READ_LATEST
+ );
+ } else {
+ // Fetch current revision; READ_LATEST reduces lockAndGetLatest() check failures
+ $revision = Revision::newFromTitle( $title, false, Revision::READ_LATEST );
+ }
+
+ if ( !$revision ) {
+ $stats->increment( 'refreshlinks.rev_not_found' );
+ $this->setLastError( "Revision not found for {$title->getPrefixedDBkey()}" );
+ return false; // just deleted?
+ } elseif ( $revision->getId() != $latest || $revision->getPage() !== $page->getId() ) {
+ // Do not clobber over newer updates with older ones. If all jobs where FIFO and
+ // serialized, it would be OK to update links based on older revisions since it
+ // would eventually get to the latest. Since that is not the case (by design),
+ // only update the link tables to a state matching the current revision's output.
+ $stats->increment( 'refreshlinks.rev_not_current' );
+ $this->setLastError( "Revision {$revision->getId()} is not current" );
+ return false;
+ }
+
+ $content = $revision->getContent( Revision::RAW );
+ if ( !$content ) {
+ // If there is no content, pretend the content is empty
+ $content = $revision->getContentHandler()->makeEmptyContent();
+ }
+
+ $parserOutput = false;
+ $parserOptions = $page->makeParserOptions( 'canonical' );
+ // If page_touched changed after this root job, then it is likely that
+ // any views of the pages already resulted in re-parses which are now in
+ // cache. The cache can be reused to avoid expensive parsing in some cases.
+ if ( isset( $this->params['rootJobTimestamp'] ) ) {
+ $opportunistic = !empty( $this->params['isOpportunistic'] );
+
+ $skewedTimestamp = $this->params['rootJobTimestamp'];
+ if ( $opportunistic ) {
+ // Neither clock skew nor DB snapshot/replica DB lag matter much for such
+ // updates; focus on reusing the (often recently updated) cache
+ } else {
+ // For transclusion updates, the template changes must be reflected
+ $skewedTimestamp = wfTimestamp( TS_MW,
+ wfTimestamp( TS_UNIX, $skewedTimestamp ) + self::CLOCK_FUDGE
+ );
+ }
+
+ if ( $page->getLinksTimestamp() > $skewedTimestamp ) {
+ // Something already updated the backlinks since this job was made
+ $stats->increment( 'refreshlinks.update_skipped' );
+ return true;
+ }
+
+ if ( $page->getTouched() >= $this->params['rootJobTimestamp'] || $opportunistic ) {
+ // Cache is suspected to be up-to-date. As long as the cache rev ID matches
+ // and it reflects the job's triggering change, then it is usable.
+ $parserOutput = $services->getParserCache()->getDirty( $page, $parserOptions );
+ if ( !$parserOutput
+ || $parserOutput->getCacheRevisionId() != $revision->getId()
+ || $parserOutput->getCacheTime() < $skewedTimestamp
+ ) {
+ $parserOutput = false; // too stale
+ }
+ }
+ }
+
+ // Fetch the current revision and parse it if necessary...
+ if ( $parserOutput ) {
+ $stats->increment( 'refreshlinks.parser_cached' );
+ } else {
+ $start = microtime( true );
+ // Revision ID must be passed to the parser output to get revision variables correct
+ $parserOutput = $content->getParserOutput(
+ $title, $revision->getId(), $parserOptions, false );
+ $elapsed = microtime( true ) - $start;
+ // If it took a long time to render, then save this back to the cache to avoid
+ // wasted CPU by other apaches or job runners. We don't want to always save to
+ // cache as this can cause high cache I/O and LRU churn when a template changes.
+ if ( $elapsed >= self::PARSE_THRESHOLD_SEC
+ && $page->shouldCheckParserCache( $parserOptions, $revision->getId() )
+ && $parserOutput->isCacheable()
+ ) {
+ $ctime = wfTimestamp( TS_MW, (int)$start ); // cache time
+ $services->getParserCache()->save(
+ $parserOutput, $page, $parserOptions, $ctime, $revision->getId()
+ );
+ }
+ $stats->increment( 'refreshlinks.parser_uncached' );
+ }
+
+ $updates = $content->getSecondaryDataUpdates(
+ $title,
+ null,
+ !empty( $this->params['useRecursiveLinksUpdate'] ),
+ $parserOutput
+ );
+
+ // For legacy hook handlers doing updates via LinksUpdateConstructed, make sure
+ // any pending writes they made get flushed before the doUpdate() calls below.
+ // This avoids snapshot-clearing errors in LinksUpdate::acquirePageLock().
+ $lbFactory->commitAndWaitForReplication( __METHOD__, $ticket );
+
+ foreach ( $updates as $update ) {
+ // Carry over cause in case so the update can do extra logging
+ $update->setCause( $this->params['causeAction'], $this->params['causeAgent'] );
+ // FIXME: This code probably shouldn't be here?
+ // Needed by things like Echo notifications which need
+ // to know which user caused the links update
+ if ( $update instanceof LinksUpdate ) {
+ $update->setRevision( $revision );
+ if ( !empty( $this->params['triggeringUser'] ) ) {
+ $userInfo = $this->params['triggeringUser'];
+ if ( $userInfo['userId'] ) {
+ $user = User::newFromId( $userInfo['userId'] );
+ } else {
+ // Anonymous, use the username
+ $user = User::newFromName( $userInfo['userName'], false );
+ }
+ $update->setTriggeringUser( $user );
+ }
+ }
+ }
+
+ foreach ( $updates as $update ) {
+ $update->setTransactionTicket( $ticket );
+ $update->doUpdate();
+ }
+
+ InfoAction::invalidateCache( $title );
+
+ // Commit any writes here in case this method is called in a loop.
+ // In that case, the scoped lock will fail to be acquired.
+ $lbFactory->commitAndWaitForReplication( __METHOD__, $ticket );
+
+ return true;
+ }
+
+ public function getDeduplicationInfo() {
+ $info = parent::getDeduplicationInfo();
+ unset( $info['causeAction'] );
+ unset( $info['causeAgent'] );
+ if ( is_array( $info['params'] ) ) {
+ // For per-pages jobs, the job title is that of the template that changed
+ // (or similar), so remove that since it ruins duplicate detection
+ if ( isset( $info['params']['pages'] ) ) {
+ unset( $info['namespace'] );
+ unset( $info['title'] );
+ }
+ }
+
+ return $info;
+ }
+
+ public function workItemCount() {
+ if ( !empty( $this->params['recursive'] ) ) {
+ return 0; // nothing actually refreshed
+ } elseif ( isset( $this->params['pages'] ) ) {
+ return count( $this->params['pages'] );
+ }
+
+ return 1; // one title
+ }
+}
diff --git a/www/wiki/includes/jobqueue/jobs/ThumbnailRenderJob.php b/www/wiki/includes/jobqueue/jobs/ThumbnailRenderJob.php
new file mode 100644
index 00000000..cf3155d7
--- /dev/null
+++ b/www/wiki/includes/jobqueue/jobs/ThumbnailRenderJob.php
@@ -0,0 +1,111 @@
+<?php
+/**
+ * Job for asynchronous rendering of thumbnails.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along
+ * with this program; if not, write to the Free Software Foundation, Inc.,
+ * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ * http://www.gnu.org/copyleft/gpl.html
+ *
+ * @file
+ * @ingroup JobQueue
+ */
+
+/**
+ * Job for asynchronous rendering of thumbnails.
+ *
+ * @ingroup JobQueue
+ */
+class ThumbnailRenderJob extends Job {
+ public function __construct( Title $title, array $params ) {
+ parent::__construct( 'ThumbnailRender', $title, $params );
+ }
+
+ public function run() {
+ global $wgUploadThumbnailRenderMethod;
+
+ $transformParams = $this->params['transformParams'];
+
+ $file = wfLocalFile( $this->title );
+ $file->load( File::READ_LATEST );
+
+ if ( $file && $file->exists() ) {
+ if ( $wgUploadThumbnailRenderMethod === 'jobqueue' ) {
+ $thumb = $file->transform( $transformParams, File::RENDER_NOW );
+
+ if ( $thumb && !$thumb->isError() ) {
+ return true;
+ } else {
+ $this->setLastError( __METHOD__ . ': thumbnail couln\'t be generated' );
+ return false;
+ }
+ } elseif ( $wgUploadThumbnailRenderMethod === 'http' ) {
+ $thumbUrl = '';
+ $status = $this->hitThumbUrl( $file, $transformParams, $thumbUrl );
+
+ wfDebug( __METHOD__ . ": received status {$status}\n" );
+
+ // 400 happens when requesting a size greater or equal than the original
+ if ( $status === 200 || $status === 301 || $status === 302 || $status === 400 ) {
+ return true;
+ } elseif ( $status ) {
+ $this->setLastError( __METHOD__ . ': incorrect HTTP status ' .
+ $status . ' when hitting ' . $thumbUrl );
+ return false;
+ } else {
+ $this->setLastError( __METHOD__ . ': HTTP request failure' );
+ return false;
+ }
+ } else {
+ $this->setLastError( __METHOD__ . ': unknown thumbnail render method ' .
+ $wgUploadThumbnailRenderMethod );
+ return false;
+ }
+ } else {
+ $this->setLastError( __METHOD__ . ': file doesn\'t exist' );
+ return false;
+ }
+ }
+
+ protected function hitThumbUrl( LocalFile $file, $transformParams, &$thumbUrl ) {
+ global $wgUploadThumbnailRenderHttpCustomHost, $wgUploadThumbnailRenderHttpCustomDomain;
+
+ $thumbName = $file->thumbName( $transformParams );
+ $thumbUrl = $file->getThumbUrl( $thumbName );
+
+ if ( $wgUploadThumbnailRenderHttpCustomDomain ) {
+ $parsedUrl = wfParseUrl( $thumbUrl );
+
+ if ( !$parsedUrl || !isset( $parsedUrl['path'] ) || !strlen( $parsedUrl['path'] ) ) {
+ return false;
+ }
+
+ $thumbUrl = '//' . $wgUploadThumbnailRenderHttpCustomDomain . $parsedUrl['path'];
+ }
+
+ wfDebug( __METHOD__ . ": hitting url {$thumbUrl}\n" );
+
+ $request = MWHttpRequest::factory( $thumbUrl,
+ [ 'method' => 'HEAD', 'followRedirects' => true ],
+ __METHOD__
+ );
+
+ if ( $wgUploadThumbnailRenderHttpCustomHost ) {
+ $request->setHeader( 'Host', $wgUploadThumbnailRenderHttpCustomHost );
+ }
+
+ $status = $request->execute();
+
+ return $request->getStatus();
+ }
+}
diff --git a/www/wiki/includes/jobqueue/jobs/UserGroupExpiryJob.php b/www/wiki/includes/jobqueue/jobs/UserGroupExpiryJob.php
new file mode 100644
index 00000000..0945e58f
--- /dev/null
+++ b/www/wiki/includes/jobqueue/jobs/UserGroupExpiryJob.php
@@ -0,0 +1,39 @@
+<?php
+/**
+ * Job that purges expired user group memberships.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along
+ * with this program; if not, write to the Free Software Foundation, Inc.,
+ * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ * http://www.gnu.org/copyleft/gpl.html
+ *
+ * @file
+ * @ingroup JobQueue
+ */
+
+class UserGroupExpiryJob extends Job {
+ public function __construct( $params = false ) {
+ parent::__construct( 'userGroupExpiry', Title::newMainPage(), $params );
+ $this->removeDuplicates = true;
+ }
+
+ /**
+ * Run the job
+ * @return bool Success
+ */
+ public function run() {
+ UserGroupMembership::purgeExpired();
+
+ return true;
+ }
+}
diff --git a/www/wiki/includes/jobqueue/utils/BacklinkJobUtils.php b/www/wiki/includes/jobqueue/utils/BacklinkJobUtils.php
new file mode 100644
index 00000000..76f8d6d2
--- /dev/null
+++ b/www/wiki/includes/jobqueue/utils/BacklinkJobUtils.php
@@ -0,0 +1,149 @@
+<?php
+/**
+ * Job to update links for a given title.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along
+ * with this program; if not, write to the Free Software Foundation, Inc.,
+ * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ * http://www.gnu.org/copyleft/gpl.html
+ *
+ * @file
+ * @ingroup JobQueue
+ */
+
+/**
+ * Class with Backlink related Job helper methods
+ *
+ * When an asset changes, a base job can be inserted to update all assets that depend on it.
+ * The base job splits into per-title "leaf" jobs and a "remnant" job to handle the remaining
+ * range of backlinks. This recurs until the remnant job's backlink range is small enough that
+ * only leaf jobs are created from it.
+ *
+ * For example, if templates A and B are edited (at the same time) the queue will have:
+ * (A base, B base)
+ * When these jobs run, the queue will have per-title and remnant partition jobs:
+ * (titleX,titleY,titleZ,...,A remnant,titleM,titleN,titleO,...,B remnant)
+ *
+ * This works best when the queue is FIFO, for several reasons:
+ * - a) Since the remnant jobs are enqueued after the leaf jobs, the slower leaf jobs have to
+ * get popped prior to the fast remnant jobs. This avoids flooding the queue with leaf jobs
+ * for every single backlink of widely used assets (which can be millions).
+ * - b) Other jobs going in the queue still get a chance to run after a widely used asset changes.
+ * This is due to the large remnant job pushing to the end of the queue with each division.
+ *
+ * The size of the queues used in this manner depend on the number of assets changes and the
+ * number of workers. Also, with FIFO-per-partition queues, the queue size can be somewhat larger,
+ * depending on the number of queue partitions.
+ *
+ * @ingroup JobQueue
+ * @since 1.23
+ */
+class BacklinkJobUtils {
+ /**
+ * Break down $job into approximately ($bSize/$cSize) leaf jobs and a single partition
+ * job that covers the remaining backlink range (if needed). Jobs for the first $bSize
+ * titles are collated ($cSize per job) into leaf jobs to do actual work. All the
+ * resulting jobs are of the same class as $job. No partition job is returned if the
+ * range covered by $job was less than $bSize, as the leaf jobs have full coverage.
+ *
+ * The leaf jobs have the 'pages' param set to a (<page ID>:(<namespace>,<DB key>),...)
+ * map so that the run() function knows what pages to act on. The leaf jobs will keep
+ * the same job title as the parent job (e.g. $job).
+ *
+ * The partition jobs have the 'range' parameter set to a map of the format
+ * (start:<integer>, end:<integer>, batchSize:<integer>, subranges:((<start>,<end>),...)),
+ * the 'table' parameter set to that of $job, and the 'recursive' parameter set to true.
+ * This method can be called on the resulting job to repeat the process again.
+ *
+ * The job provided ($job) must have the 'recursive' parameter set to true and the 'table'
+ * parameter must be set to a backlink table. The job title will be used as the title to
+ * find backlinks for. Any 'range' parameter must follow the same format as mentioned above.
+ * This should be managed by recursive calls to this method.
+ *
+ * The first jobs return are always the leaf jobs. This lets the caller use push() to
+ * put them directly into the queue and works well if the queue is FIFO. In such a queue,
+ * the leaf jobs have to get finished first before anything can resolve the next partition
+ * job, which keeps the queue very small.
+ *
+ * $opts includes:
+ * - params : extra job parameters to include in each job
+ *
+ * @param Job $job
+ * @param int $bSize BacklinkCache partition size; usually $wgUpdateRowsPerJob
+ * @param int $cSize Max titles per leaf job; Usually 1 or a modest value
+ * @param array $opts Optional parameter map
+ * @return Job[] List of Job objects
+ */
+ public static function partitionBacklinkJob( Job $job, $bSize, $cSize, $opts = [] ) {
+ $class = get_class( $job );
+ $title = $job->getTitle();
+ $params = $job->getParams();
+
+ if ( isset( $params['pages'] ) || empty( $params['recursive'] ) ) {
+ $ranges = []; // sanity; this is a leaf node
+ $realBSize = 0;
+ wfWarn( __METHOD__ . " called on {$job->getType()} leaf job (explosive recursion)." );
+ } elseif ( isset( $params['range'] ) ) {
+ // This is a range job to trigger the insertion of partitioned/title jobs...
+ $ranges = $params['range']['subranges'];
+ $realBSize = $params['range']['batchSize'];
+ } else {
+ // This is a base job to trigger the insertion of partitioned jobs...
+ $ranges = $title->getBacklinkCache()->partition( $params['table'], $bSize );
+ $realBSize = $bSize;
+ }
+
+ $extraParams = isset( $opts['params'] ) ? $opts['params'] : [];
+
+ $jobs = [];
+ // Combine the first range (of size $bSize) backlinks into leaf jobs
+ if ( isset( $ranges[0] ) ) {
+ list( $start, $end ) = $ranges[0];
+ $iter = $title->getBacklinkCache()->getLinks( $params['table'], $start, $end );
+ $titles = iterator_to_array( $iter );
+ /** @var Title[] $titleBatch */
+ foreach ( array_chunk( $titles, $cSize ) as $titleBatch ) {
+ $pages = [];
+ foreach ( $titleBatch as $tl ) {
+ $pages[$tl->getArticleID()] = [ $tl->getNamespace(), $tl->getDBkey() ];
+ }
+ $jobs[] = new $class(
+ $title, // maintain parent job title
+ [ 'pages' => $pages ] + $extraParams
+ );
+ }
+ }
+ // Take all of the remaining ranges and build a partition job from it
+ if ( isset( $ranges[1] ) ) {
+ $jobs[] = new $class(
+ $title, // maintain parent job title
+ [
+ 'recursive' => true,
+ 'table' => $params['table'],
+ 'range' => [
+ 'start' => $ranges[1][0],
+ 'end' => $ranges[count( $ranges ) - 1][1],
+ 'batchSize' => $realBSize,
+ 'subranges' => array_slice( $ranges, 1 )
+ ],
+ // Track how many times the base job divided for debugging
+ 'division' => isset( $params['division'] )
+ ? ( $params['division'] + 1 )
+ : 1
+ ] + $extraParams
+ );
+ }
+
+ return $jobs;
+ }
+}
diff --git a/www/wiki/includes/jobqueue/utils/PurgeJobUtils.php b/www/wiki/includes/jobqueue/utils/PurgeJobUtils.php
new file mode 100644
index 00000000..ba80c8e4
--- /dev/null
+++ b/www/wiki/includes/jobqueue/utils/PurgeJobUtils.php
@@ -0,0 +1,81 @@
+<?php
+/**
+ * Base code for update jobs that put some secondary data extracted
+ * from article content into the database.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along
+ * with this program; if not, write to the Free Software Foundation, Inc.,
+ * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ * http://www.gnu.org/copyleft/gpl.html
+ *
+ * @file
+ */
+use Wikimedia\Rdbms\IDatabase;
+use MediaWiki\MediaWikiServices;
+
+class PurgeJobUtils {
+ /**
+ * Invalidate the cache of a list of pages from a single namespace.
+ * This is intended for use by subclasses.
+ *
+ * @param IDatabase $dbw
+ * @param int $namespace Namespace number
+ * @param array $dbkeys
+ */
+ public static function invalidatePages( IDatabase $dbw, $namespace, array $dbkeys ) {
+ if ( $dbkeys === [] ) {
+ return;
+ }
+
+ $dbw->onTransactionIdle(
+ function () use ( $dbw, $namespace, $dbkeys ) {
+ $services = MediaWikiServices::getInstance();
+ $lbFactory = $services->getDBLoadBalancerFactory();
+ // Determine which pages need to be updated.
+ // This is necessary to prevent the job queue from smashing the DB with
+ // large numbers of concurrent invalidations of the same page.
+ $now = $dbw->timestamp();
+ $ids = $dbw->selectFieldValues(
+ 'page',
+ 'page_id',
+ [
+ 'page_namespace' => $namespace,
+ 'page_title' => $dbkeys,
+ 'page_touched < ' . $dbw->addQuotes( $now )
+ ],
+ __METHOD__
+ );
+
+ if ( !$ids ) {
+ return;
+ }
+
+ $batchSize = $services->getMainConfig()->get( 'UpdateRowsPerQuery' );
+ $ticket = $lbFactory->getEmptyTransactionTicket( __METHOD__ );
+ foreach ( array_chunk( $ids, $batchSize ) as $idBatch ) {
+ $dbw->update(
+ 'page',
+ [ 'page_touched' => $now ],
+ [
+ 'page_id' => $idBatch,
+ 'page_touched < ' . $dbw->addQuotes( $now ) // handle races
+ ],
+ __METHOD__
+ );
+ $lbFactory->commitAndWaitForReplication( __METHOD__, $ticket );
+ }
+ },
+ __METHOD__
+ );
+ }
+}