diff options
author | Yaco <franco@reevo.org> | 2020-06-04 11:01:00 -0300 |
---|---|---|
committer | Yaco <franco@reevo.org> | 2020-06-04 11:01:00 -0300 |
commit | fc7369835258467bf97eb64f184b93691f9a9fd5 (patch) | |
tree | daabd60089d2dd76d9f5fb416b005fbe159c799d /www/wiki/includes/jobqueue |
first commit
Diffstat (limited to 'www/wiki/includes/jobqueue')
34 files changed, 8230 insertions, 0 deletions
diff --git a/www/wiki/includes/jobqueue/Job.php b/www/wiki/includes/jobqueue/Job.php new file mode 100644 index 00000000..f9c416f3 --- /dev/null +++ b/www/wiki/includes/jobqueue/Job.php @@ -0,0 +1,426 @@ +<?php +/** + * Job queue task base code. + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with this program; if not, write to the Free Software Foundation, Inc., + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + * http://www.gnu.org/copyleft/gpl.html + * + * @file + * @defgroup JobQueue JobQueue + */ + +/** + * Class to both describe a background job and handle jobs. + * The queue aspects of this class are now deprecated. + * Using the class to push jobs onto queues is deprecated (use JobSpecification). + * + * @ingroup JobQueue + */ +abstract class Job implements IJobSpecification { + /** @var string */ + public $command; + + /** @var array Array of job parameters */ + public $params; + + /** @var array Additional queue metadata */ + public $metadata = []; + + /** @var Title */ + protected $title; + + /** @var bool Expensive jobs may set this to true */ + protected $removeDuplicates; + + /** @var string Text for error that occurred last */ + protected $error; + + /** @var callable[] */ + protected $teardownCallbacks = []; + + /** @var int Bitfield of JOB_* class constants */ + protected $executionFlags = 0; + + /** @var int Job must not be wrapped in the usual explicit LBFactory transaction round */ + const JOB_NO_EXPLICIT_TRX_ROUND = 1; + + /** + * Run the job + * @return bool Success + */ + abstract public function run(); + + /** + * Create the appropriate object to handle a specific job + * + * @param string $command Job command + * @param Title $title Associated title + * @param array $params Job parameters + * @throws MWException + * @return Job + */ + public static function factory( $command, Title $title, $params = [] ) { + global $wgJobClasses; + + if ( isset( $wgJobClasses[$command] ) ) { + $handler = $wgJobClasses[$command]; + + if ( is_callable( $handler ) ) { + $job = call_user_func( $handler, $title, $params ); + } elseif ( class_exists( $handler ) ) { + $job = new $handler( $title, $params ); + } else { + $job = null; + } + + if ( $job instanceof Job ) { + $job->command = $command; + return $job; + } else { + throw new InvalidArgumentException( "Cannot instantiate job '$command': bad spec!" ); + } + } + + throw new InvalidArgumentException( "Invalid job command '{$command}'" ); + } + + /** + * @param string $command + * @param Title $title + * @param array|bool $params Can not be === true + */ + public function __construct( $command, $title, $params = false ) { + $this->command = $command; + $this->title = $title; + $this->params = is_array( $params ) ? $params : []; // sanity + + // expensive jobs may set this to true + $this->removeDuplicates = false; + + if ( !isset( $this->params['requestId'] ) ) { + $this->params['requestId'] = WebRequest::getRequestId(); + } + } + + /** + * @param int $flag JOB_* class constant + * @return bool + * @since 1.31 + */ + public function hasExecutionFlag( $flag ) { + return ( $this->executionFlags && $flag ) === $flag; + } + + /** + * Batch-insert a group of jobs into the queue. + * This will be wrapped in a transaction with a forced commit. + * + * This may add duplicate at insert time, but they will be + * removed later on, when the first one is popped. + * + * @param Job[] $jobs Array of Job objects + * @return bool + * @deprecated since 1.21 + */ + public static function batchInsert( $jobs ) { + wfDeprecated( __METHOD__, '1.21' ); + JobQueueGroup::singleton()->push( $jobs ); + return true; + } + + /** + * @return string + */ + public function getType() { + return $this->command; + } + + /** + * @return Title + */ + public function getTitle() { + return $this->title; + } + + /** + * @return array + */ + public function getParams() { + return $this->params; + } + + /** + * @return int|null UNIX timestamp to delay running this job until, otherwise null + * @since 1.22 + */ + public function getReleaseTimestamp() { + return isset( $this->params['jobReleaseTimestamp'] ) + ? wfTimestampOrNull( TS_UNIX, $this->params['jobReleaseTimestamp'] ) + : null; + } + + /** + * @return int|null UNIX timestamp of when the job was queued, or null + * @since 1.26 + */ + public function getQueuedTimestamp() { + return isset( $this->metadata['timestamp'] ) + ? wfTimestampOrNull( TS_UNIX, $this->metadata['timestamp'] ) + : null; + } + + /** + * @return string|null Id of the request that created this job. Follows + * jobs recursively, allowing to track the id of the request that started a + * job when jobs insert jobs which insert other jobs. + * @since 1.27 + */ + public function getRequestId() { + return isset( $this->params['requestId'] ) + ? $this->params['requestId'] + : null; + } + + /** + * @return int|null UNIX timestamp of when the job was runnable, or null + * @since 1.26 + */ + public function getReadyTimestamp() { + return $this->getReleaseTimestamp() ?: $this->getQueuedTimestamp(); + } + + /** + * Whether the queue should reject insertion of this job if a duplicate exists + * + * This can be used to avoid duplicated effort or combined with delayed jobs to + * coalesce updates into larger batches. Claimed jobs are never treated as + * duplicates of new jobs, and some queues may allow a few duplicates due to + * network partitions and fail-over. Thus, additional locking is needed to + * enforce mutual exclusion if this is really needed. + * + * @return bool + */ + public function ignoreDuplicates() { + return $this->removeDuplicates; + } + + /** + * @return bool Whether this job can be retried on failure by job runners + * @since 1.21 + */ + public function allowRetries() { + return true; + } + + /** + * @return int Number of actually "work items" handled in this job + * @see $wgJobBackoffThrottling + * @since 1.23 + */ + public function workItemCount() { + return 1; + } + + /** + * Subclasses may need to override this to make duplication detection work. + * The resulting map conveys everything that makes the job unique. This is + * only checked if ignoreDuplicates() returns true, meaning that duplicate + * jobs are supposed to be ignored. + * + * @return array Map of key/values + * @since 1.21 + */ + public function getDeduplicationInfo() { + $info = [ + 'type' => $this->getType(), + 'namespace' => $this->getTitle()->getNamespace(), + 'title' => $this->getTitle()->getDBkey(), + 'params' => $this->getParams() + ]; + if ( is_array( $info['params'] ) ) { + // Identical jobs with different "root" jobs should count as duplicates + unset( $info['params']['rootJobSignature'] ); + unset( $info['params']['rootJobTimestamp'] ); + // Likewise for jobs with different delay times + unset( $info['params']['jobReleaseTimestamp'] ); + // Identical jobs from different requests should count as duplicates + unset( $info['params']['requestId'] ); + // Queues pack and hash this array, so normalize the order + ksort( $info['params'] ); + } + + return $info; + } + + /** + * Get "root job" parameters for a task + * + * This is used to no-op redundant jobs, including child jobs of jobs, + * as long as the children inherit the root job parameters. When a job + * with root job parameters and "rootJobIsSelf" set is pushed, the + * deduplicateRootJob() method is automatically called on it. If the + * root job is only virtual and not actually pushed (e.g. the sub-jobs + * are inserted directly), then call deduplicateRootJob() directly. + * + * @see JobQueue::deduplicateRootJob() + * + * @param string $key A key that identifies the task + * @return array Map of: + * - rootJobIsSelf : true + * - rootJobSignature : hash (e.g. SHA1) that identifies the task + * - rootJobTimestamp : TS_MW timestamp of this instance of the task + * @since 1.21 + */ + public static function newRootJobParams( $key ) { + return [ + 'rootJobIsSelf' => true, + 'rootJobSignature' => sha1( $key ), + 'rootJobTimestamp' => wfTimestampNow() + ]; + } + + /** + * @see JobQueue::deduplicateRootJob() + * @return array + * @since 1.21 + */ + public function getRootJobParams() { + return [ + 'rootJobSignature' => isset( $this->params['rootJobSignature'] ) + ? $this->params['rootJobSignature'] + : null, + 'rootJobTimestamp' => isset( $this->params['rootJobTimestamp'] ) + ? $this->params['rootJobTimestamp'] + : null + ]; + } + + /** + * @see JobQueue::deduplicateRootJob() + * @return bool + * @since 1.22 + */ + public function hasRootJobParams() { + return isset( $this->params['rootJobSignature'] ) + && isset( $this->params['rootJobTimestamp'] ); + } + + /** + * @see JobQueue::deduplicateRootJob() + * @return bool Whether this is job is a root job + */ + public function isRootJob() { + return $this->hasRootJobParams() && !empty( $this->params['rootJobIsSelf'] ); + } + + /** + * @param callable $callback A function with one parameter, the success status, which will be + * false if the job failed or it succeeded but the DB changes could not be committed or + * any deferred updates threw an exception. (This parameter was added in 1.28.) + * @since 1.27 + */ + protected function addTeardownCallback( $callback ) { + $this->teardownCallbacks[] = $callback; + } + + /** + * Do any final cleanup after run(), deferred updates, and all DB commits happen + * @param bool $status Whether the job, its deferred updates, and DB commit all succeeded + * @since 1.27 + */ + public function teardown( $status ) { + foreach ( $this->teardownCallbacks as $callback ) { + call_user_func( $callback, $status ); + } + } + + /** + * Insert a single job into the queue. + * @return bool True on success + * @deprecated since 1.21 + */ + public function insert() { + wfDeprecated( __METHOD__, '1.21' ); + JobQueueGroup::singleton()->push( $this ); + return true; + } + + /** + * @return string + */ + public function toString() { + $paramString = ''; + if ( $this->params ) { + foreach ( $this->params as $key => $value ) { + if ( $paramString != '' ) { + $paramString .= ' '; + } + if ( is_array( $value ) ) { + $filteredValue = []; + foreach ( $value as $k => $v ) { + $json = FormatJson::encode( $v ); + if ( $json === false || mb_strlen( $json ) > 512 ) { + $filteredValue[$k] = gettype( $v ) . '(...)'; + } else { + $filteredValue[$k] = $v; + } + } + if ( count( $filteredValue ) <= 10 ) { + $value = FormatJson::encode( $filteredValue ); + } else { + $value = "array(" . count( $value ) . ")"; + } + } elseif ( is_object( $value ) && !method_exists( $value, '__toString' ) ) { + $value = "object(" . get_class( $value ) . ")"; + } + + $flatValue = (string)$value; + if ( mb_strlen( $value ) > 1024 ) { + $flatValue = "string(" . mb_strlen( $value ) . ")"; + } + + $paramString .= "$key={$flatValue}"; + } + } + + $metaString = ''; + foreach ( $this->metadata as $key => $value ) { + if ( is_scalar( $value ) && mb_strlen( $value ) < 1024 ) { + $metaString .= ( $metaString ? ",$key=$value" : "$key=$value" ); + } + } + + $s = $this->command; + if ( is_object( $this->title ) ) { + $s .= " {$this->title->getPrefixedDBkey()}"; + } + if ( $paramString != '' ) { + $s .= " $paramString"; + } + if ( $metaString != '' ) { + $s .= " ($metaString)"; + } + + return $s; + } + + protected function setLastError( $error ) { + $this->error = $error; + } + + public function getLastError() { + return $this->error; + } +} diff --git a/www/wiki/includes/jobqueue/JobQueue.php b/www/wiki/includes/jobqueue/JobQueue.php new file mode 100644 index 00000000..3a8bf1ab --- /dev/null +++ b/www/wiki/includes/jobqueue/JobQueue.php @@ -0,0 +1,731 @@ +<?php +/** + * Job queue base code. + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with this program; if not, write to the Free Software Foundation, Inc., + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + * http://www.gnu.org/copyleft/gpl.html + * + * @file + * @defgroup JobQueue JobQueue + */ +use MediaWiki\MediaWikiServices; + +/** + * Class to handle enqueueing and running of background jobs + * + * @ingroup JobQueue + * @since 1.21 + */ +abstract class JobQueue { + /** @var string Wiki ID */ + protected $wiki; + /** @var string Job type */ + protected $type; + /** @var string Job priority for pop() */ + protected $order; + /** @var int Time to live in seconds */ + protected $claimTTL; + /** @var int Maximum number of times to try a job */ + protected $maxTries; + /** @var string|bool Read only rationale (or false if r/w) */ + protected $readOnlyReason; + + /** @var BagOStuff */ + protected $dupCache; + /** @var JobQueueAggregator */ + protected $aggr; + + const QOS_ATOMIC = 1; // integer; "all-or-nothing" job insertions + + const ROOTJOB_TTL = 2419200; // integer; seconds to remember root jobs (28 days) + + /** + * @param array $params + * @throws MWException + */ + protected function __construct( array $params ) { + $this->wiki = $params['wiki']; + $this->type = $params['type']; + $this->claimTTL = isset( $params['claimTTL'] ) ? $params['claimTTL'] : 0; + $this->maxTries = isset( $params['maxTries'] ) ? $params['maxTries'] : 3; + if ( isset( $params['order'] ) && $params['order'] !== 'any' ) { + $this->order = $params['order']; + } else { + $this->order = $this->optimalOrder(); + } + if ( !in_array( $this->order, $this->supportedOrders() ) ) { + throw new MWException( __CLASS__ . " does not support '{$this->order}' order." ); + } + $this->dupCache = wfGetCache( CACHE_ANYTHING ); + $this->aggr = isset( $params['aggregator'] ) + ? $params['aggregator'] + : new JobQueueAggregatorNull( [] ); + $this->readOnlyReason = isset( $params['readOnlyReason'] ) + ? $params['readOnlyReason'] + : false; + } + + /** + * Get a job queue object of the specified type. + * $params includes: + * - class : What job class to use (determines job type) + * - wiki : wiki ID of the wiki the jobs are for (defaults to current wiki) + * - type : The name of the job types this queue handles + * - order : Order that pop() selects jobs, one of "fifo", "timestamp" or "random". + * If "fifo" is used, the queue will effectively be FIFO. Note that job + * completion will not appear to be exactly FIFO if there are multiple + * job runners since jobs can take different times to finish once popped. + * If "timestamp" is used, the queue will at least be loosely ordered + * by timestamp, allowing for some jobs to be popped off out of order. + * If "random" is used, pop() will pick jobs in random order. + * Note that it may only be weakly random (e.g. a lottery of the oldest X). + * If "any" is choosen, the queue will use whatever order is the fastest. + * This might be useful for improving concurrency for job acquisition. + * - claimTTL : If supported, the queue will recycle jobs that have been popped + * but not acknowledged as completed after this many seconds. Recycling + * of jobs simply means re-inserting them into the queue. Jobs can be + * attempted up to three times before being discarded. + * - readOnlyReason : Set this to a string to make the queue read-only. + * + * Queue classes should throw an exception if they do not support the options given. + * + * @param array $params + * @return JobQueue + * @throws MWException + */ + final public static function factory( array $params ) { + $class = $params['class']; + if ( !class_exists( $class ) ) { + throw new MWException( "Invalid job queue class '$class'." ); + } + $obj = new $class( $params ); + if ( !( $obj instanceof self ) ) { + throw new MWException( "Class '$class' is not a " . __CLASS__ . " class." ); + } + + return $obj; + } + + /** + * @return string Wiki ID + */ + final public function getWiki() { + return $this->wiki; + } + + /** + * @return string Job type that this queue handles + */ + final public function getType() { + return $this->type; + } + + /** + * @return string One of (random, timestamp, fifo, undefined) + */ + final public function getOrder() { + return $this->order; + } + + /** + * Get the allowed queue orders for configuration validation + * + * @return array Subset of (random, timestamp, fifo, undefined) + */ + abstract protected function supportedOrders(); + + /** + * Get the default queue order to use if configuration does not specify one + * + * @return string One of (random, timestamp, fifo, undefined) + */ + abstract protected function optimalOrder(); + + /** + * Find out if delayed jobs are supported for configuration validation + * + * @return bool Whether delayed jobs are supported + */ + protected function supportsDelayedJobs() { + return false; // not implemented + } + + /** + * @return bool Whether delayed jobs are enabled + * @since 1.22 + */ + final public function delayedJobsEnabled() { + return $this->supportsDelayedJobs(); + } + + /** + * @return string|bool Read-only rational or false if r/w + * @since 1.27 + */ + public function getReadOnlyReason() { + return $this->readOnlyReason; + } + + /** + * Quickly check if the queue has no available (unacquired, non-delayed) jobs. + * Queue classes should use caching if they are any slower without memcached. + * + * If caching is used, this might return false when there are actually no jobs. + * If pop() is called and returns false then it should correct the cache. Also, + * calling flushCaches() first prevents this. However, this affect is typically + * not distinguishable from the race condition between isEmpty() and pop(). + * + * @return bool + * @throws JobQueueError + */ + final public function isEmpty() { + $res = $this->doIsEmpty(); + + return $res; + } + + /** + * @see JobQueue::isEmpty() + * @return bool + */ + abstract protected function doIsEmpty(); + + /** + * Get the number of available (unacquired, non-delayed) jobs in the queue. + * Queue classes should use caching if they are any slower without memcached. + * + * If caching is used, this number might be out of date for a minute. + * + * @return int + * @throws JobQueueError + */ + final public function getSize() { + $res = $this->doGetSize(); + + return $res; + } + + /** + * @see JobQueue::getSize() + * @return int + */ + abstract protected function doGetSize(); + + /** + * Get the number of acquired jobs (these are temporarily out of the queue). + * Queue classes should use caching if they are any slower without memcached. + * + * If caching is used, this number might be out of date for a minute. + * + * @return int + * @throws JobQueueError + */ + final public function getAcquiredCount() { + $res = $this->doGetAcquiredCount(); + + return $res; + } + + /** + * @see JobQueue::getAcquiredCount() + * @return int + */ + abstract protected function doGetAcquiredCount(); + + /** + * Get the number of delayed jobs (these are temporarily out of the queue). + * Queue classes should use caching if they are any slower without memcached. + * + * If caching is used, this number might be out of date for a minute. + * + * @return int + * @throws JobQueueError + * @since 1.22 + */ + final public function getDelayedCount() { + $res = $this->doGetDelayedCount(); + + return $res; + } + + /** + * @see JobQueue::getDelayedCount() + * @return int + */ + protected function doGetDelayedCount() { + return 0; // not implemented + } + + /** + * Get the number of acquired jobs that can no longer be attempted. + * Queue classes should use caching if they are any slower without memcached. + * + * If caching is used, this number might be out of date for a minute. + * + * @return int + * @throws JobQueueError + */ + final public function getAbandonedCount() { + $res = $this->doGetAbandonedCount(); + + return $res; + } + + /** + * @see JobQueue::getAbandonedCount() + * @return int + */ + protected function doGetAbandonedCount() { + return 0; // not implemented + } + + /** + * Push one or more jobs into the queue. + * This does not require $wgJobClasses to be set for the given job type. + * Outside callers should use JobQueueGroup::push() instead of this function. + * + * @param IJobSpecification|IJobSpecification[] $jobs + * @param int $flags Bitfield (supports JobQueue::QOS_ATOMIC) + * @return void + * @throws JobQueueError + */ + final public function push( $jobs, $flags = 0 ) { + $jobs = is_array( $jobs ) ? $jobs : [ $jobs ]; + $this->batchPush( $jobs, $flags ); + } + + /** + * Push a batch of jobs into the queue. + * This does not require $wgJobClasses to be set for the given job type. + * Outside callers should use JobQueueGroup::push() instead of this function. + * + * @param IJobSpecification[] $jobs + * @param int $flags Bitfield (supports JobQueue::QOS_ATOMIC) + * @return void + * @throws MWException + */ + final public function batchPush( array $jobs, $flags = 0 ) { + $this->assertNotReadOnly(); + + if ( !count( $jobs ) ) { + return; // nothing to do + } + + foreach ( $jobs as $job ) { + if ( $job->getType() !== $this->type ) { + throw new MWException( + "Got '{$job->getType()}' job; expected a '{$this->type}' job." ); + } elseif ( $job->getReleaseTimestamp() && !$this->supportsDelayedJobs() ) { + throw new MWException( + "Got delayed '{$job->getType()}' job; delays are not supported." ); + } + } + + $this->doBatchPush( $jobs, $flags ); + $this->aggr->notifyQueueNonEmpty( $this->wiki, $this->type ); + + foreach ( $jobs as $job ) { + if ( $job->isRootJob() ) { + $this->deduplicateRootJob( $job ); + } + } + } + + /** + * @see JobQueue::batchPush() + * @param IJobSpecification[] $jobs + * @param int $flags + */ + abstract protected function doBatchPush( array $jobs, $flags ); + + /** + * Pop a job off of the queue. + * This requires $wgJobClasses to be set for the given job type. + * Outside callers should use JobQueueGroup::pop() instead of this function. + * + * @throws MWException + * @return Job|bool Returns false if there are no jobs + */ + final public function pop() { + global $wgJobClasses; + + $this->assertNotReadOnly(); + if ( !WikiMap::isCurrentWikiDbDomain( $this->wiki ) ) { + throw new MWException( "Cannot pop '{$this->type}' job off foreign wiki queue." ); + } elseif ( !isset( $wgJobClasses[$this->type] ) ) { + // Do not pop jobs if there is no class for the queue type + throw new MWException( "Unrecognized job type '{$this->type}'." ); + } + + $job = $this->doPop(); + + if ( !$job ) { + $this->aggr->notifyQueueEmpty( $this->wiki, $this->type ); + } + + // Flag this job as an old duplicate based on its "root" job... + try { + if ( $job && $this->isRootJobOldDuplicate( $job ) ) { + self::incrStats( 'dupe_pops', $this->type ); + $job = DuplicateJob::newFromJob( $job ); // convert to a no-op + } + } catch ( Exception $e ) { + // don't lose jobs over this + } + + return $job; + } + + /** + * @see JobQueue::pop() + * @return Job|bool + */ + abstract protected function doPop(); + + /** + * Acknowledge that a job was completed. + * + * This does nothing for certain queue classes or if "claimTTL" is not set. + * Outside callers should use JobQueueGroup::ack() instead of this function. + * + * @param Job $job + * @return void + * @throws MWException + */ + final public function ack( Job $job ) { + $this->assertNotReadOnly(); + if ( $job->getType() !== $this->type ) { + throw new MWException( "Got '{$job->getType()}' job; expected '{$this->type}'." ); + } + + $this->doAck( $job ); + } + + /** + * @see JobQueue::ack() + * @param Job $job + */ + abstract protected function doAck( Job $job ); + + /** + * Register the "root job" of a given job into the queue for de-duplication. + * This should only be called right *after* all the new jobs have been inserted. + * This is used to turn older, duplicate, job entries into no-ops. The root job + * information will remain in the registry until it simply falls out of cache. + * + * This requires that $job has two special fields in the "params" array: + * - rootJobSignature : hash (e.g. SHA1) that identifies the task + * - rootJobTimestamp : TS_MW timestamp of this instance of the task + * + * A "root job" is a conceptual job that consist of potentially many smaller jobs + * that are actually inserted into the queue. For example, "refreshLinks" jobs are + * spawned when a template is edited. One can think of the task as "update links + * of pages that use template X" and an instance of that task as a "root job". + * However, what actually goes into the queue are range and leaf job subtypes. + * Since these jobs include things like page ID ranges and DB master positions, + * and can morph into smaller jobs recursively, simple duplicate detection + * for individual jobs being identical (like that of job_sha1) is not useful. + * + * In the case of "refreshLinks", if these jobs are still in the queue when the template + * is edited again, we want all of these old refreshLinks jobs for that template to become + * no-ops. This can greatly reduce server load, since refreshLinks jobs involves parsing. + * Essentially, the new batch of jobs belong to a new "root job" and the older ones to a + * previous "root job" for the same task of "update links of pages that use template X". + * + * This does nothing for certain queue classes. + * + * @param IJobSpecification $job + * @throws MWException + * @return bool + */ + final public function deduplicateRootJob( IJobSpecification $job ) { + $this->assertNotReadOnly(); + if ( $job->getType() !== $this->type ) { + throw new MWException( "Got '{$job->getType()}' job; expected '{$this->type}'." ); + } + + return $this->doDeduplicateRootJob( $job ); + } + + /** + * @see JobQueue::deduplicateRootJob() + * @param IJobSpecification $job + * @throws MWException + * @return bool + */ + protected function doDeduplicateRootJob( IJobSpecification $job ) { + if ( !$job->hasRootJobParams() ) { + throw new MWException( "Cannot register root job; missing parameters." ); + } + $params = $job->getRootJobParams(); + + $key = $this->getRootJobCacheKey( $params['rootJobSignature'] ); + // Callers should call batchInsert() and then this function so that if the insert + // fails, the de-duplication registration will be aborted. Since the insert is + // deferred till "transaction idle", do the same here, so that the ordering is + // maintained. Having only the de-duplication registration succeed would cause + // jobs to become no-ops without any actual jobs that made them redundant. + $timestamp = $this->dupCache->get( $key ); // current last timestamp of this job + if ( $timestamp && $timestamp >= $params['rootJobTimestamp'] ) { + return true; // a newer version of this root job was enqueued + } + + // Update the timestamp of the last root job started at the location... + return $this->dupCache->set( $key, $params['rootJobTimestamp'], JobQueueDB::ROOTJOB_TTL ); + } + + /** + * Check if the "root" job of a given job has been superseded by a newer one + * + * @param Job $job + * @throws MWException + * @return bool + */ + final protected function isRootJobOldDuplicate( Job $job ) { + if ( $job->getType() !== $this->type ) { + throw new MWException( "Got '{$job->getType()}' job; expected '{$this->type}'." ); + } + $isDuplicate = $this->doIsRootJobOldDuplicate( $job ); + + return $isDuplicate; + } + + /** + * @see JobQueue::isRootJobOldDuplicate() + * @param Job $job + * @return bool + */ + protected function doIsRootJobOldDuplicate( Job $job ) { + if ( !$job->hasRootJobParams() ) { + return false; // job has no de-deplication info + } + $params = $job->getRootJobParams(); + + $key = $this->getRootJobCacheKey( $params['rootJobSignature'] ); + // Get the last time this root job was enqueued + $timestamp = $this->dupCache->get( $key ); + + // Check if a new root job was started at the location after this one's... + return ( $timestamp && $timestamp > $params['rootJobTimestamp'] ); + } + + /** + * @param string $signature Hash identifier of the root job + * @return string + */ + protected function getRootJobCacheKey( $signature ) { + list( $db, $prefix ) = wfSplitWikiID( $this->wiki ); + + return wfForeignMemcKey( $db, $prefix, 'jobqueue', $this->type, 'rootjob', $signature ); + } + + /** + * Deleted all unclaimed and delayed jobs from the queue + * + * @throws JobQueueError + * @since 1.22 + * @return void + */ + final public function delete() { + $this->assertNotReadOnly(); + + $this->doDelete(); + } + + /** + * @see JobQueue::delete() + * @throws MWException + */ + protected function doDelete() { + throw new MWException( "This method is not implemented." ); + } + + /** + * Wait for any replica DBs or backup servers to catch up. + * + * This does nothing for certain queue classes. + * + * @return void + * @throws JobQueueError + */ + final public function waitForBackups() { + $this->doWaitForBackups(); + } + + /** + * @see JobQueue::waitForBackups() + * @return void + */ + protected function doWaitForBackups() { + } + + /** + * Clear any process and persistent caches + * + * @return void + */ + final public function flushCaches() { + $this->doFlushCaches(); + } + + /** + * @see JobQueue::flushCaches() + * @return void + */ + protected function doFlushCaches() { + } + + /** + * Get an iterator to traverse over all available jobs in this queue. + * This does not include jobs that are currently acquired or delayed. + * Note: results may be stale if the queue is concurrently modified. + * + * @return Iterator + * @throws JobQueueError + */ + abstract public function getAllQueuedJobs(); + + /** + * Get an iterator to traverse over all delayed jobs in this queue. + * Note: results may be stale if the queue is concurrently modified. + * + * @return Iterator + * @throws JobQueueError + * @since 1.22 + */ + public function getAllDelayedJobs() { + return new ArrayIterator( [] ); // not implemented + } + + /** + * Get an iterator to traverse over all claimed jobs in this queue + * + * Callers should be quick to iterator over it or few results + * will be returned due to jobs being acknowledged and deleted + * + * @return Iterator + * @throws JobQueueError + * @since 1.26 + */ + public function getAllAcquiredJobs() { + return new ArrayIterator( [] ); // not implemented + } + + /** + * Get an iterator to traverse over all abandoned jobs in this queue + * + * @return Iterator + * @throws JobQueueError + * @since 1.25 + */ + public function getAllAbandonedJobs() { + return new ArrayIterator( [] ); // not implemented + } + + /** + * Do not use this function outside of JobQueue/JobQueueGroup + * + * @return string + * @since 1.22 + */ + public function getCoalesceLocationInternal() { + return null; + } + + /** + * Check whether each of the given queues are empty. + * This is used for batching checks for queues stored at the same place. + * + * @param array $types List of queues types + * @return array|null (list of non-empty queue types) or null if unsupported + * @throws MWException + * @since 1.22 + */ + final public function getSiblingQueuesWithJobs( array $types ) { + return $this->doGetSiblingQueuesWithJobs( $types ); + } + + /** + * @see JobQueue::getSiblingQueuesWithJobs() + * @param array $types List of queues types + * @return array|null (list of queue types) or null if unsupported + */ + protected function doGetSiblingQueuesWithJobs( array $types ) { + return null; // not supported + } + + /** + * Check the size of each of the given queues. + * For queues not served by the same store as this one, 0 is returned. + * This is used for batching checks for queues stored at the same place. + * + * @param array $types List of queues types + * @return array|null (job type => whether queue is empty) or null if unsupported + * @throws MWException + * @since 1.22 + */ + final public function getSiblingQueueSizes( array $types ) { + return $this->doGetSiblingQueueSizes( $types ); + } + + /** + * @see JobQueue::getSiblingQueuesSize() + * @param array $types List of queues types + * @return array|null (list of queue types) or null if unsupported + */ + protected function doGetSiblingQueueSizes( array $types ) { + return null; // not supported + } + + /** + * @throws JobQueueReadOnlyError + */ + protected function assertNotReadOnly() { + if ( $this->readOnlyReason !== false ) { + throw new JobQueueReadOnlyError( "Job queue is read-only: {$this->readOnlyReason}" ); + } + } + + /** + * Call wfIncrStats() for the queue overall and for the queue type + * + * @param string $key Event type + * @param string $type Job type + * @param int $delta + * @since 1.22 + */ + public static function incrStats( $key, $type, $delta = 1 ) { + static $stats; + if ( !$stats ) { + $stats = MediaWikiServices::getInstance()->getStatsdDataFactory(); + } + $stats->updateCount( "jobqueue.{$key}.all", $delta ); + $stats->updateCount( "jobqueue.{$key}.{$type}", $delta ); + } +} + +/** + * @ingroup JobQueue + * @since 1.22 + */ +class JobQueueError extends MWException { +} + +class JobQueueConnectionError extends JobQueueError { +} + +class JobQueueReadOnlyError extends JobQueueError { + +} diff --git a/www/wiki/includes/jobqueue/JobQueueDB.php b/www/wiki/includes/jobqueue/JobQueueDB.php new file mode 100644 index 00000000..f01ba63f --- /dev/null +++ b/www/wiki/includes/jobqueue/JobQueueDB.php @@ -0,0 +1,851 @@ +<?php +/** + * Database-backed job queue code. + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with this program; if not, write to the Free Software Foundation, Inc., + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + * http://www.gnu.org/copyleft/gpl.html + * + * @file + */ +use Wikimedia\Rdbms\IDatabase; +use Wikimedia\Rdbms\DBConnRef; +use Wikimedia\Rdbms\DBConnectionError; +use Wikimedia\Rdbms\DBError; +use MediaWiki\MediaWikiServices; +use Wikimedia\ScopedCallback; + +/** + * Class to handle job queues stored in the DB + * + * @ingroup JobQueue + * @since 1.21 + */ +class JobQueueDB extends JobQueue { + const CACHE_TTL_SHORT = 30; // integer; seconds to cache info without re-validating + const MAX_AGE_PRUNE = 604800; // integer; seconds a job can live once claimed + const MAX_JOB_RANDOM = 2147483647; // integer; 2^31 - 1, used for job_random + const MAX_OFFSET = 255; // integer; maximum number of rows to skip + + /** @var WANObjectCache */ + protected $cache; + + /** @var bool|string Name of an external DB cluster. False if not set */ + protected $cluster = false; + + /** + * Additional parameters include: + * - cluster : The name of an external cluster registered via LBFactory. + * If not specified, the primary DB cluster for the wiki will be used. + * This can be overridden with a custom cluster so that DB handles will + * be retrieved via LBFactory::getExternalLB() and getConnection(). + * @param array $params + */ + protected function __construct( array $params ) { + parent::__construct( $params ); + + $this->cluster = isset( $params['cluster'] ) ? $params['cluster'] : false; + $this->cache = ObjectCache::getMainWANInstance(); + } + + protected function supportedOrders() { + return [ 'random', 'timestamp', 'fifo' ]; + } + + protected function optimalOrder() { + return 'random'; + } + + /** + * @see JobQueue::doIsEmpty() + * @return bool + */ + protected function doIsEmpty() { + $dbr = $this->getReplicaDB(); + try { + $found = $dbr->selectField( // unclaimed job + 'job', '1', [ 'job_cmd' => $this->type, 'job_token' => '' ], __METHOD__ + ); + } catch ( DBError $e ) { + $this->throwDBException( $e ); + } + + return !$found; + } + + /** + * @see JobQueue::doGetSize() + * @return int + */ + protected function doGetSize() { + $key = $this->getCacheKey( 'size' ); + + $size = $this->cache->get( $key ); + if ( is_int( $size ) ) { + return $size; + } + + try { + $dbr = $this->getReplicaDB(); + $size = (int)$dbr->selectField( 'job', 'COUNT(*)', + [ 'job_cmd' => $this->type, 'job_token' => '' ], + __METHOD__ + ); + } catch ( DBError $e ) { + $this->throwDBException( $e ); + } + $this->cache->set( $key, $size, self::CACHE_TTL_SHORT ); + + return $size; + } + + /** + * @see JobQueue::doGetAcquiredCount() + * @return int + */ + protected function doGetAcquiredCount() { + if ( $this->claimTTL <= 0 ) { + return 0; // no acknowledgements + } + + $key = $this->getCacheKey( 'acquiredcount' ); + + $count = $this->cache->get( $key ); + if ( is_int( $count ) ) { + return $count; + } + + $dbr = $this->getReplicaDB(); + try { + $count = (int)$dbr->selectField( 'job', 'COUNT(*)', + [ 'job_cmd' => $this->type, "job_token != {$dbr->addQuotes( '' )}" ], + __METHOD__ + ); + } catch ( DBError $e ) { + $this->throwDBException( $e ); + } + $this->cache->set( $key, $count, self::CACHE_TTL_SHORT ); + + return $count; + } + + /** + * @see JobQueue::doGetAbandonedCount() + * @return int + * @throws MWException + */ + protected function doGetAbandonedCount() { + if ( $this->claimTTL <= 0 ) { + return 0; // no acknowledgements + } + + $key = $this->getCacheKey( 'abandonedcount' ); + + $count = $this->cache->get( $key ); + if ( is_int( $count ) ) { + return $count; + } + + $dbr = $this->getReplicaDB(); + try { + $count = (int)$dbr->selectField( 'job', 'COUNT(*)', + [ + 'job_cmd' => $this->type, + "job_token != {$dbr->addQuotes( '' )}", + "job_attempts >= " . $dbr->addQuotes( $this->maxTries ) + ], + __METHOD__ + ); + } catch ( DBError $e ) { + $this->throwDBException( $e ); + } + + $this->cache->set( $key, $count, self::CACHE_TTL_SHORT ); + + return $count; + } + + /** + * @see JobQueue::doBatchPush() + * @param IJobSpecification[] $jobs + * @param int $flags + * @throws DBError|Exception + * @return void + */ + protected function doBatchPush( array $jobs, $flags ) { + $dbw = $this->getMasterDB(); + // In general, there will be two cases here: + // a) sqlite; DB connection is probably a regular round-aware handle. + // If the connection is busy with a transaction, then defer the job writes + // until right before the main round commit step. Any errors that bubble + // up will rollback the main commit round. + // b) mysql/postgres; DB connection is generally a separate CONN_TRX_AUTOCOMMIT handle. + // No transaction is active nor will be started by writes, so enqueue the jobs + // now so that any errors will show up immediately as the interface expects. Any + // errors that bubble up will rollback the main commit round. + $fname = __METHOD__; + $dbw->onTransactionPreCommitOrIdle( + function () use ( $dbw, $jobs, $flags, $fname ) { + $this->doBatchPushInternal( $dbw, $jobs, $flags, $fname ); + }, + $fname + ); + } + + /** + * This function should *not* be called outside of JobQueueDB + * + * @param IDatabase $dbw + * @param IJobSpecification[] $jobs + * @param int $flags + * @param string $method + * @throws DBError + * @return void + */ + public function doBatchPushInternal( IDatabase $dbw, array $jobs, $flags, $method ) { + if ( !count( $jobs ) ) { + return; + } + + $rowSet = []; // (sha1 => job) map for jobs that are de-duplicated + $rowList = []; // list of jobs for jobs that are not de-duplicated + foreach ( $jobs as $job ) { + $row = $this->insertFields( $job ); + if ( $job->ignoreDuplicates() ) { + $rowSet[$row['job_sha1']] = $row; + } else { + $rowList[] = $row; + } + } + + if ( $flags & self::QOS_ATOMIC ) { + $dbw->startAtomic( $method ); // wrap all the job additions in one transaction + } + try { + // Strip out any duplicate jobs that are already in the queue... + if ( count( $rowSet ) ) { + $res = $dbw->select( 'job', 'job_sha1', + [ + // No job_type condition since it's part of the job_sha1 hash + 'job_sha1' => array_keys( $rowSet ), + 'job_token' => '' // unclaimed + ], + $method + ); + foreach ( $res as $row ) { + wfDebug( "Job with hash '{$row->job_sha1}' is a duplicate.\n" ); + unset( $rowSet[$row->job_sha1] ); // already enqueued + } + } + // Build the full list of job rows to insert + $rows = array_merge( $rowList, array_values( $rowSet ) ); + // Insert the job rows in chunks to avoid replica DB lag... + foreach ( array_chunk( $rows, 50 ) as $rowBatch ) { + $dbw->insert( 'job', $rowBatch, $method ); + } + JobQueue::incrStats( 'inserts', $this->type, count( $rows ) ); + JobQueue::incrStats( 'dupe_inserts', $this->type, + count( $rowSet ) + count( $rowList ) - count( $rows ) + ); + } catch ( DBError $e ) { + $this->throwDBException( $e ); + } + if ( $flags & self::QOS_ATOMIC ) { + $dbw->endAtomic( $method ); + } + + return; + } + + /** + * @see JobQueue::doPop() + * @return Job|bool + */ + protected function doPop() { + $dbw = $this->getMasterDB(); + try { + $autoTrx = $dbw->getFlag( DBO_TRX ); // get current setting + $dbw->clearFlag( DBO_TRX ); // make each query its own transaction + $scopedReset = new ScopedCallback( function () use ( $dbw, $autoTrx ) { + $dbw->setFlag( $autoTrx ? DBO_TRX : 0 ); // restore old setting + } ); + + $uuid = wfRandomString( 32 ); // pop attempt + $job = false; // job popped off + do { // retry when our row is invalid or deleted as a duplicate + // Try to reserve a row in the DB... + if ( in_array( $this->order, [ 'fifo', 'timestamp' ] ) ) { + $row = $this->claimOldest( $uuid ); + } else { // random first + $rand = mt_rand( 0, self::MAX_JOB_RANDOM ); // encourage concurrent UPDATEs + $gte = (bool)mt_rand( 0, 1 ); // find rows with rand before/after $rand + $row = $this->claimRandom( $uuid, $rand, $gte ); + } + // Check if we found a row to reserve... + if ( !$row ) { + break; // nothing to do + } + JobQueue::incrStats( 'pops', $this->type ); + // Get the job object from the row... + $title = Title::makeTitle( $row->job_namespace, $row->job_title ); + $job = Job::factory( $row->job_cmd, $title, + self::extractBlob( $row->job_params ), $row->job_id ); + $job->metadata['id'] = $row->job_id; + $job->metadata['timestamp'] = $row->job_timestamp; + break; // done + } while ( true ); + + if ( !$job || mt_rand( 0, 9 ) == 0 ) { + // Handled jobs that need to be recycled/deleted; + // any recycled jobs will be picked up next attempt + $this->recycleAndDeleteStaleJobs(); + } + } catch ( DBError $e ) { + $this->throwDBException( $e ); + } + + return $job; + } + + /** + * Reserve a row with a single UPDATE without holding row locks over RTTs... + * + * @param string $uuid 32 char hex string + * @param int $rand Random unsigned integer (31 bits) + * @param bool $gte Search for job_random >= $random (otherwise job_random <= $random) + * @return stdClass|bool Row|false + */ + protected function claimRandom( $uuid, $rand, $gte ) { + $dbw = $this->getMasterDB(); + // Check cache to see if the queue has <= OFFSET items + $tinyQueue = $this->cache->get( $this->getCacheKey( 'small' ) ); + + $row = false; // the row acquired + $invertedDirection = false; // whether one job_random direction was already scanned + // This uses a replication safe method for acquiring jobs. One could use UPDATE+LIMIT + // instead, but that either uses ORDER BY (in which case it deadlocks in MySQL) or is + // not replication safe. Due to https://bugs.mysql.com/bug.php?id=6980, subqueries cannot + // be used here with MySQL. + do { + if ( $tinyQueue ) { // queue has <= MAX_OFFSET rows + // For small queues, using OFFSET will overshoot and return no rows more often. + // Instead, this uses job_random to pick a row (possibly checking both directions). + $ineq = $gte ? '>=' : '<='; + $dir = $gte ? 'ASC' : 'DESC'; + $row = $dbw->selectRow( 'job', self::selectFields(), // find a random job + [ + 'job_cmd' => $this->type, + 'job_token' => '', // unclaimed + "job_random {$ineq} {$dbw->addQuotes( $rand )}" ], + __METHOD__, + [ 'ORDER BY' => "job_random {$dir}" ] + ); + if ( !$row && !$invertedDirection ) { + $gte = !$gte; + $invertedDirection = true; + continue; // try the other direction + } + } else { // table *may* have >= MAX_OFFSET rows + // T44614: "ORDER BY job_random" with a job_random inequality causes high CPU + // in MySQL if there are many rows for some reason. This uses a small OFFSET + // instead of job_random for reducing excess claim retries. + $row = $dbw->selectRow( 'job', self::selectFields(), // find a random job + [ + 'job_cmd' => $this->type, + 'job_token' => '', // unclaimed + ], + __METHOD__, + [ 'OFFSET' => mt_rand( 0, self::MAX_OFFSET ) ] + ); + if ( !$row ) { + $tinyQueue = true; // we know the queue must have <= MAX_OFFSET rows + $this->cache->set( $this->getCacheKey( 'small' ), 1, 30 ); + continue; // use job_random + } + } + + if ( $row ) { // claim the job + $dbw->update( 'job', // update by PK + [ + 'job_token' => $uuid, + 'job_token_timestamp' => $dbw->timestamp(), + 'job_attempts = job_attempts+1' ], + [ 'job_cmd' => $this->type, 'job_id' => $row->job_id, 'job_token' => '' ], + __METHOD__ + ); + // This might get raced out by another runner when claiming the previously + // selected row. The use of job_random should minimize this problem, however. + if ( !$dbw->affectedRows() ) { + $row = false; // raced out + } + } else { + break; // nothing to do + } + } while ( !$row ); + + return $row; + } + + /** + * Reserve a row with a single UPDATE without holding row locks over RTTs... + * + * @param string $uuid 32 char hex string + * @return stdClass|bool Row|false + */ + protected function claimOldest( $uuid ) { + $dbw = $this->getMasterDB(); + + $row = false; // the row acquired + do { + if ( $dbw->getType() === 'mysql' ) { + // Per https://bugs.mysql.com/bug.php?id=6980, we can't use subqueries on the + // same table being changed in an UPDATE query in MySQL (gives Error: 1093). + // Oracle and Postgre have no such limitation. However, MySQL offers an + // alternative here by supporting ORDER BY + LIMIT for UPDATE queries. + $dbw->query( "UPDATE {$dbw->tableName( 'job' )} " . + "SET " . + "job_token = {$dbw->addQuotes( $uuid ) }, " . + "job_token_timestamp = {$dbw->addQuotes( $dbw->timestamp() )}, " . + "job_attempts = job_attempts+1 " . + "WHERE ( " . + "job_cmd = {$dbw->addQuotes( $this->type )} " . + "AND job_token = {$dbw->addQuotes( '' )} " . + ") ORDER BY job_id ASC LIMIT 1", + __METHOD__ + ); + } else { + // Use a subquery to find the job, within an UPDATE to claim it. + // This uses as much of the DB wrapper functions as possible. + $dbw->update( 'job', + [ + 'job_token' => $uuid, + 'job_token_timestamp' => $dbw->timestamp(), + 'job_attempts = job_attempts+1' ], + [ 'job_id = (' . + $dbw->selectSQLText( 'job', 'job_id', + [ 'job_cmd' => $this->type, 'job_token' => '' ], + __METHOD__, + [ 'ORDER BY' => 'job_id ASC', 'LIMIT' => 1 ] ) . + ')' + ], + __METHOD__ + ); + } + // Fetch any row that we just reserved... + if ( $dbw->affectedRows() ) { + $row = $dbw->selectRow( 'job', self::selectFields(), + [ 'job_cmd' => $this->type, 'job_token' => $uuid ], __METHOD__ + ); + if ( !$row ) { // raced out by duplicate job removal + wfDebug( "Row deleted as duplicate by another process.\n" ); + } + } else { + break; // nothing to do + } + } while ( !$row ); + + return $row; + } + + /** + * @see JobQueue::doAck() + * @param Job $job + * @throws MWException + */ + protected function doAck( Job $job ) { + if ( !isset( $job->metadata['id'] ) ) { + throw new MWException( "Job of type '{$job->getType()}' has no ID." ); + } + + $dbw = $this->getMasterDB(); + try { + $autoTrx = $dbw->getFlag( DBO_TRX ); // get current setting + $dbw->clearFlag( DBO_TRX ); // make each query its own transaction + $scopedReset = new ScopedCallback( function () use ( $dbw, $autoTrx ) { + $dbw->setFlag( $autoTrx ? DBO_TRX : 0 ); // restore old setting + } ); + + // Delete a row with a single DELETE without holding row locks over RTTs... + $dbw->delete( 'job', + [ 'job_cmd' => $this->type, 'job_id' => $job->metadata['id'] ], __METHOD__ ); + + JobQueue::incrStats( 'acks', $this->type ); + } catch ( DBError $e ) { + $this->throwDBException( $e ); + } + } + + /** + * @see JobQueue::doDeduplicateRootJob() + * @param IJobSpecification $job + * @throws MWException + * @return bool + */ + protected function doDeduplicateRootJob( IJobSpecification $job ) { + $params = $job->getParams(); + if ( !isset( $params['rootJobSignature'] ) ) { + throw new MWException( "Cannot register root job; missing 'rootJobSignature'." ); + } elseif ( !isset( $params['rootJobTimestamp'] ) ) { + throw new MWException( "Cannot register root job; missing 'rootJobTimestamp'." ); + } + $key = $this->getRootJobCacheKey( $params['rootJobSignature'] ); + // Callers should call batchInsert() and then this function so that if the insert + // fails, the de-duplication registration will be aborted. Since the insert is + // deferred till "transaction idle", do the same here, so that the ordering is + // maintained. Having only the de-duplication registration succeed would cause + // jobs to become no-ops without any actual jobs that made them redundant. + $dbw = $this->getMasterDB(); + $cache = $this->dupCache; + $dbw->onTransactionIdle( + function () use ( $cache, $params, $key, $dbw ) { + $timestamp = $cache->get( $key ); // current last timestamp of this job + if ( $timestamp && $timestamp >= $params['rootJobTimestamp'] ) { + return true; // a newer version of this root job was enqueued + } + + // Update the timestamp of the last root job started at the location... + return $cache->set( $key, $params['rootJobTimestamp'], JobQueueDB::ROOTJOB_TTL ); + }, + __METHOD__ + ); + + return true; + } + + /** + * @see JobQueue::doDelete() + * @return bool + */ + protected function doDelete() { + $dbw = $this->getMasterDB(); + try { + $dbw->delete( 'job', [ 'job_cmd' => $this->type ] ); + } catch ( DBError $e ) { + $this->throwDBException( $e ); + } + + return true; + } + + /** + * @see JobQueue::doWaitForBackups() + * @return void + */ + protected function doWaitForBackups() { + $lbFactory = MediaWikiServices::getInstance()->getDBLoadBalancerFactory(); + $lbFactory->waitForReplication( [ 'wiki' => $this->wiki, 'cluster' => $this->cluster ] ); + } + + /** + * @return void + */ + protected function doFlushCaches() { + foreach ( [ 'size', 'acquiredcount' ] as $type ) { + $this->cache->delete( $this->getCacheKey( $type ) ); + } + } + + /** + * @see JobQueue::getAllQueuedJobs() + * @return Iterator + */ + public function getAllQueuedJobs() { + return $this->getJobIterator( [ 'job_cmd' => $this->getType(), 'job_token' => '' ] ); + } + + /** + * @see JobQueue::getAllAcquiredJobs() + * @return Iterator + */ + public function getAllAcquiredJobs() { + return $this->getJobIterator( [ 'job_cmd' => $this->getType(), "job_token > ''" ] ); + } + + /** + * @param array $conds Query conditions + * @return Iterator + */ + protected function getJobIterator( array $conds ) { + $dbr = $this->getReplicaDB(); + try { + return new MappedIterator( + $dbr->select( 'job', self::selectFields(), $conds ), + function ( $row ) { + $job = Job::factory( + $row->job_cmd, + Title::makeTitle( $row->job_namespace, $row->job_title ), + strlen( $row->job_params ) ? unserialize( $row->job_params ) : [] + ); + $job->metadata['id'] = $row->job_id; + $job->metadata['timestamp'] = $row->job_timestamp; + + return $job; + } + ); + } catch ( DBError $e ) { + $this->throwDBException( $e ); + } + } + + public function getCoalesceLocationInternal() { + return $this->cluster + ? "DBCluster:{$this->cluster}:{$this->wiki}" + : "LBFactory:{$this->wiki}"; + } + + protected function doGetSiblingQueuesWithJobs( array $types ) { + $dbr = $this->getReplicaDB(); + // @note: this does not check whether the jobs are claimed or not. + // This is useful so JobQueueGroup::pop() also sees queues that only + // have stale jobs. This lets recycleAndDeleteStaleJobs() re-enqueue + // failed jobs so that they can be popped again for that edge case. + $res = $dbr->select( 'job', 'DISTINCT job_cmd', + [ 'job_cmd' => $types ], __METHOD__ ); + + $types = []; + foreach ( $res as $row ) { + $types[] = $row->job_cmd; + } + + return $types; + } + + protected function doGetSiblingQueueSizes( array $types ) { + $dbr = $this->getReplicaDB(); + $res = $dbr->select( 'job', [ 'job_cmd', 'COUNT(*) AS count' ], + [ 'job_cmd' => $types ], __METHOD__, [ 'GROUP BY' => 'job_cmd' ] ); + + $sizes = []; + foreach ( $res as $row ) { + $sizes[$row->job_cmd] = (int)$row->count; + } + + return $sizes; + } + + /** + * Recycle or destroy any jobs that have been claimed for too long + * + * @return int Number of jobs recycled/deleted + */ + public function recycleAndDeleteStaleJobs() { + $now = time(); + $count = 0; // affected rows + $dbw = $this->getMasterDB(); + + try { + if ( !$dbw->lock( "jobqueue-recycle-{$this->type}", __METHOD__, 1 ) ) { + return $count; // already in progress + } + + // Remove claims on jobs acquired for too long if enabled... + if ( $this->claimTTL > 0 ) { + $claimCutoff = $dbw->timestamp( $now - $this->claimTTL ); + // Get the IDs of jobs that have be claimed but not finished after too long. + // These jobs can be recycled into the queue by expiring the claim. Selecting + // the IDs first means that the UPDATE can be done by primary key (less deadlocks). + $res = $dbw->select( 'job', 'job_id', + [ + 'job_cmd' => $this->type, + "job_token != {$dbw->addQuotes( '' )}", // was acquired + "job_token_timestamp < {$dbw->addQuotes( $claimCutoff )}", // stale + "job_attempts < {$dbw->addQuotes( $this->maxTries )}" ], // retries left + __METHOD__ + ); + $ids = array_map( + function ( $o ) { + return $o->job_id; + }, iterator_to_array( $res ) + ); + if ( count( $ids ) ) { + // Reset job_token for these jobs so that other runners will pick them up. + // Set the timestamp to the current time, as it is useful to now that the job + // was already tried before (the timestamp becomes the "released" time). + $dbw->update( 'job', + [ + 'job_token' => '', + 'job_token_timestamp' => $dbw->timestamp( $now ) ], // time of release + [ + 'job_id' => $ids ], + __METHOD__ + ); + $affected = $dbw->affectedRows(); + $count += $affected; + JobQueue::incrStats( 'recycles', $this->type, $affected ); + $this->aggr->notifyQueueNonEmpty( $this->wiki, $this->type ); + } + } + + // Just destroy any stale jobs... + $pruneCutoff = $dbw->timestamp( $now - self::MAX_AGE_PRUNE ); + $conds = [ + 'job_cmd' => $this->type, + "job_token != {$dbw->addQuotes( '' )}", // was acquired + "job_token_timestamp < {$dbw->addQuotes( $pruneCutoff )}" // stale + ]; + if ( $this->claimTTL > 0 ) { // only prune jobs attempted too many times... + $conds[] = "job_attempts >= {$dbw->addQuotes( $this->maxTries )}"; + } + // Get the IDs of jobs that are considered stale and should be removed. Selecting + // the IDs first means that the UPDATE can be done by primary key (less deadlocks). + $res = $dbw->select( 'job', 'job_id', $conds, __METHOD__ ); + $ids = array_map( + function ( $o ) { + return $o->job_id; + }, iterator_to_array( $res ) + ); + if ( count( $ids ) ) { + $dbw->delete( 'job', [ 'job_id' => $ids ], __METHOD__ ); + $affected = $dbw->affectedRows(); + $count += $affected; + JobQueue::incrStats( 'abandons', $this->type, $affected ); + } + + $dbw->unlock( "jobqueue-recycle-{$this->type}", __METHOD__ ); + } catch ( DBError $e ) { + $this->throwDBException( $e ); + } + + return $count; + } + + /** + * @param IJobSpecification $job + * @return array + */ + protected function insertFields( IJobSpecification $job ) { + $dbw = $this->getMasterDB(); + + return [ + // Fields that describe the nature of the job + 'job_cmd' => $job->getType(), + 'job_namespace' => $job->getTitle()->getNamespace(), + 'job_title' => $job->getTitle()->getDBkey(), + 'job_params' => self::makeBlob( $job->getParams() ), + // Additional job metadata + 'job_timestamp' => $dbw->timestamp(), + 'job_sha1' => Wikimedia\base_convert( + sha1( serialize( $job->getDeduplicationInfo() ) ), + 16, 36, 31 + ), + 'job_random' => mt_rand( 0, self::MAX_JOB_RANDOM ) + ]; + } + + /** + * @throws JobQueueConnectionError + * @return DBConnRef + */ + protected function getReplicaDB() { + try { + return $this->getDB( DB_REPLICA ); + } catch ( DBConnectionError $e ) { + throw new JobQueueConnectionError( "DBConnectionError:" . $e->getMessage() ); + } + } + + /** + * @throws JobQueueConnectionError + * @return DBConnRef + */ + protected function getMasterDB() { + try { + return $this->getDB( DB_MASTER ); + } catch ( DBConnectionError $e ) { + throw new JobQueueConnectionError( "DBConnectionError:" . $e->getMessage() ); + } + } + + /** + * @param int $index (DB_REPLICA/DB_MASTER) + * @return DBConnRef + */ + protected function getDB( $index ) { + $lbFactory = MediaWikiServices::getInstance()->getDBLoadBalancerFactory(); + $lb = ( $this->cluster !== false ) + ? $lbFactory->getExternalLB( $this->cluster ) + : $lbFactory->getMainLB( $this->wiki ); + + return ( $lb->getServerType( $lb->getWriterIndex() ) !== 'sqlite' ) + // Keep a separate connection to avoid contention and deadlocks; + // However, SQLite has the opposite behavior due to DB-level locking. + ? $lb->getConnectionRef( $index, [], $this->wiki, $lb::CONN_TRX_AUTOCOMMIT ) + // Jobs insertion will be defered until the PRESEND stage to reduce contention. + : $lb->getConnectionRef( $index, [], $this->wiki ); + } + + /** + * @param string $property + * @return string + */ + private function getCacheKey( $property ) { + list( $db, $prefix ) = wfSplitWikiID( $this->wiki ); + $cluster = is_string( $this->cluster ) ? $this->cluster : 'main'; + + return wfForeignMemcKey( $db, $prefix, 'jobqueue', $cluster, $this->type, $property ); + } + + /** + * @param array|bool $params + * @return string + */ + protected static function makeBlob( $params ) { + if ( $params !== false ) { + return serialize( $params ); + } else { + return ''; + } + } + + /** + * @param string $blob + * @return bool|mixed + */ + protected static function extractBlob( $blob ) { + if ( (string)$blob !== '' ) { + return unserialize( $blob ); + } else { + return false; + } + } + + /** + * @param DBError $e + * @throws JobQueueError + */ + protected function throwDBException( DBError $e ) { + throw new JobQueueError( get_class( $e ) . ": " . $e->getMessage() ); + } + + /** + * Return the list of job fields that should be selected. + * @since 1.23 + * @return array + */ + public static function selectFields() { + return [ + 'job_id', + 'job_cmd', + 'job_namespace', + 'job_title', + 'job_timestamp', + 'job_params', + 'job_random', + 'job_attempts', + 'job_token', + 'job_token_timestamp', + 'job_sha1', + ]; + } +} diff --git a/www/wiki/includes/jobqueue/JobQueueFederated.php b/www/wiki/includes/jobqueue/JobQueueFederated.php new file mode 100644 index 00000000..7f3b2b1d --- /dev/null +++ b/www/wiki/includes/jobqueue/JobQueueFederated.php @@ -0,0 +1,496 @@ +<?php +/** + * Job queue code for federated queues. + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with this program; if not, write to the Free Software Foundation, Inc., + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + * http://www.gnu.org/copyleft/gpl.html + * + * @file + */ + +/** + * Class to handle enqueueing and running of background jobs for federated queues + * + * This class allows for queues to be partitioned into smaller queues. + * A partition is defined by the configuration for a JobQueue instance. + * For example, one can set $wgJobTypeConf['refreshLinks'] to point to a + * JobQueueFederated instance, which itself would consist of three JobQueueRedis + * instances, each using their own redis server. This would allow for the jobs + * to be split (evenly or based on weights) across multiple servers if a single + * server becomes impractical or expensive. Different JobQueue classes can be mixed. + * + * The basic queue configuration (e.g. "order", "claimTTL") of a federated queue + * is inherited by the partition queues. Additional configuration defines what + * section each wiki is in, what partition queues each section uses (and their weight), + * and the JobQueue configuration for each partition. Some sections might only need a + * single queue partition, like the sections for groups of small wikis. + * + * If used for performance, then $wgMainCacheType should be set to memcached/redis. + * Note that "fifo" cannot be used for the ordering, since the data is distributed. + * One can still use "timestamp" instead, as in "roughly timestamp ordered". Also, + * queue classes used by this should ignore down servers (with TTL) to avoid slowness. + * + * @ingroup JobQueue + * @since 1.22 + */ +class JobQueueFederated extends JobQueue { + /** @var HashRing */ + protected $partitionRing; + /** @var JobQueue[] (partition name => JobQueue) reverse sorted by weight */ + protected $partitionQueues = []; + + /** @var int Maximum number of partitions to try */ + protected $maxPartitionsTry; + + /** + * @param array $params Possible keys: + * - sectionsByWiki : A map of wiki IDs to section names. + * Wikis will default to using the section "default". + * - partitionsBySection : Map of section names to maps of (partition name => weight). + * A section called 'default' must be defined if not all wikis + * have explicitly defined sections. + * - configByPartition : Map of queue partition names to configuration arrays. + * These configuration arrays are passed to JobQueue::factory(). + * The options set here are overridden by those passed to this + * the federated queue itself (e.g. 'order' and 'claimTTL'). + * - maxPartitionsTry : Maximum number of times to attempt job insertion using + * different partition queues. This improves availability + * during failure, at the cost of added latency and somewhat + * less reliable job de-duplication mechanisms. + * @throws MWException + */ + protected function __construct( array $params ) { + parent::__construct( $params ); + $section = isset( $params['sectionsByWiki'][$this->wiki] ) + ? $params['sectionsByWiki'][$this->wiki] + : 'default'; + if ( !isset( $params['partitionsBySection'][$section] ) ) { + throw new MWException( "No configuration for section '$section'." ); + } + $this->maxPartitionsTry = isset( $params['maxPartitionsTry'] ) + ? $params['maxPartitionsTry'] + : 2; + // Get the full partition map + $partitionMap = $params['partitionsBySection'][$section]; + arsort( $partitionMap, SORT_NUMERIC ); + // Get the config to pass to merge into each partition queue config + $baseConfig = $params; + foreach ( [ 'class', 'sectionsByWiki', 'maxPartitionsTry', + 'partitionsBySection', 'configByPartition', ] as $o + ) { + unset( $baseConfig[$o] ); // partition queue doesn't care about this + } + // The class handles all aggregator calls already + unset( $baseConfig['aggregator'] ); + // Get the partition queue objects + foreach ( $partitionMap as $partition => $w ) { + if ( !isset( $params['configByPartition'][$partition] ) ) { + throw new MWException( "No configuration for partition '$partition'." ); + } + $this->partitionQueues[$partition] = JobQueue::factory( + $baseConfig + $params['configByPartition'][$partition] ); + } + // Ring of all partitions + $this->partitionRing = new HashRing( $partitionMap ); + } + + protected function supportedOrders() { + // No FIFO due to partitioning, though "rough timestamp order" is supported + return [ 'undefined', 'random', 'timestamp' ]; + } + + protected function optimalOrder() { + return 'undefined'; // defer to the partitions + } + + protected function supportsDelayedJobs() { + foreach ( $this->partitionQueues as $queue ) { + if ( !$queue->supportsDelayedJobs() ) { + return false; + } + } + + return true; + } + + protected function doIsEmpty() { + $empty = true; + $failed = 0; + foreach ( $this->partitionQueues as $queue ) { + try { + $empty = $empty && $queue->doIsEmpty(); + } catch ( JobQueueError $e ) { + ++$failed; + $this->logException( $e ); + } + } + $this->throwErrorIfAllPartitionsDown( $failed ); + + return $empty; + } + + protected function doGetSize() { + return $this->getCrossPartitionSum( 'size', 'doGetSize' ); + } + + protected function doGetAcquiredCount() { + return $this->getCrossPartitionSum( 'acquiredcount', 'doGetAcquiredCount' ); + } + + protected function doGetDelayedCount() { + return $this->getCrossPartitionSum( 'delayedcount', 'doGetDelayedCount' ); + } + + protected function doGetAbandonedCount() { + return $this->getCrossPartitionSum( 'abandonedcount', 'doGetAbandonedCount' ); + } + + /** + * @param string $type + * @param string $method + * @return int + */ + protected function getCrossPartitionSum( $type, $method ) { + $count = 0; + $failed = 0; + foreach ( $this->partitionQueues as $queue ) { + try { + $count += $queue->$method(); + } catch ( JobQueueError $e ) { + ++$failed; + $this->logException( $e ); + } + } + $this->throwErrorIfAllPartitionsDown( $failed ); + + return $count; + } + + protected function doBatchPush( array $jobs, $flags ) { + // Local ring variable that may be changed to point to a new ring on failure + $partitionRing = $this->partitionRing; + // Try to insert the jobs and update $partitionsTry on any failures. + // Retry to insert any remaning jobs again, ignoring the bad partitions. + $jobsLeft = $jobs; + // phpcs:ignore Generic.CodeAnalysis.ForLoopWithTestFunctionCall + for ( $i = $this->maxPartitionsTry; $i > 0 && count( $jobsLeft ); --$i ) { + try { + $partitionRing->getLiveLocationWeights(); + } catch ( UnexpectedValueException $e ) { + break; // all servers down; nothing to insert to + } + $jobsLeft = $this->tryJobInsertions( $jobsLeft, $partitionRing, $flags ); + } + if ( count( $jobsLeft ) ) { + throw new JobQueueError( + "Could not insert job(s), {$this->maxPartitionsTry} partitions tried." ); + } + } + + /** + * @param array $jobs + * @param HashRing &$partitionRing + * @param int $flags + * @throws JobQueueError + * @return array List of Job object that could not be inserted + */ + protected function tryJobInsertions( array $jobs, HashRing &$partitionRing, $flags ) { + $jobsLeft = []; + + // Because jobs are spread across partitions, per-job de-duplication needs + // to use a consistent hash to avoid allowing duplicate jobs per partition. + // When inserting a batch of de-duplicated jobs, QOS_ATOMIC is disregarded. + $uJobsByPartition = []; // (partition name => job list) + /** @var Job $job */ + foreach ( $jobs as $key => $job ) { + if ( $job->ignoreDuplicates() ) { + $sha1 = sha1( serialize( $job->getDeduplicationInfo() ) ); + $uJobsByPartition[$partitionRing->getLiveLocation( $sha1 )][] = $job; + unset( $jobs[$key] ); + } + } + // Get the batches of jobs that are not de-duplicated + if ( $flags & self::QOS_ATOMIC ) { + $nuJobBatches = [ $jobs ]; // all or nothing + } else { + // Split the jobs into batches and spread them out over servers if there + // are many jobs. This helps keep the partitions even. Otherwise, send all + // the jobs to a single partition queue to avoids the extra connections. + $nuJobBatches = array_chunk( $jobs, 300 ); + } + + // Insert the de-duplicated jobs into the queues... + foreach ( $uJobsByPartition as $partition => $jobBatch ) { + /** @var JobQueue $queue */ + $queue = $this->partitionQueues[$partition]; + try { + $ok = true; + $queue->doBatchPush( $jobBatch, $flags | self::QOS_ATOMIC ); + } catch ( JobQueueError $e ) { + $ok = false; + $this->logException( $e ); + } + if ( !$ok ) { + if ( !$partitionRing->ejectFromLiveRing( $partition, 5 ) ) { // blacklist + throw new JobQueueError( "Could not insert job(s), no partitions available." ); + } + $jobsLeft = array_merge( $jobsLeft, $jobBatch ); // not inserted + } + } + + // Insert the jobs that are not de-duplicated into the queues... + foreach ( $nuJobBatches as $jobBatch ) { + $partition = ArrayUtils::pickRandom( $partitionRing->getLiveLocationWeights() ); + $queue = $this->partitionQueues[$partition]; + try { + $ok = true; + $queue->doBatchPush( $jobBatch, $flags | self::QOS_ATOMIC ); + } catch ( JobQueueError $e ) { + $ok = false; + $this->logException( $e ); + } + if ( !$ok ) { + if ( !$partitionRing->ejectFromLiveRing( $partition, 5 ) ) { // blacklist + throw new JobQueueError( "Could not insert job(s), no partitions available." ); + } + $jobsLeft = array_merge( $jobsLeft, $jobBatch ); // not inserted + } + } + + return $jobsLeft; + } + + protected function doPop() { + $partitionsTry = $this->partitionRing->getLiveLocationWeights(); // (partition => weight) + + $failed = 0; + while ( count( $partitionsTry ) ) { + $partition = ArrayUtils::pickRandom( $partitionsTry ); + if ( $partition === false ) { + break; // all partitions at 0 weight + } + + /** @var JobQueue $queue */ + $queue = $this->partitionQueues[$partition]; + try { + $job = $queue->pop(); + } catch ( JobQueueError $e ) { + ++$failed; + $this->logException( $e ); + $job = false; + } + if ( $job ) { + $job->metadata['QueuePartition'] = $partition; + + return $job; + } else { + unset( $partitionsTry[$partition] ); // blacklist partition + } + } + $this->throwErrorIfAllPartitionsDown( $failed ); + + return false; + } + + protected function doAck( Job $job ) { + if ( !isset( $job->metadata['QueuePartition'] ) ) { + throw new MWException( "The given job has no defined partition name." ); + } + + $this->partitionQueues[$job->metadata['QueuePartition']]->ack( $job ); + } + + protected function doIsRootJobOldDuplicate( Job $job ) { + $signature = $job->getRootJobParams()['rootJobSignature']; + $partition = $this->partitionRing->getLiveLocation( $signature ); + try { + return $this->partitionQueues[$partition]->doIsRootJobOldDuplicate( $job ); + } catch ( JobQueueError $e ) { + if ( $this->partitionRing->ejectFromLiveRing( $partition, 5 ) ) { + $partition = $this->partitionRing->getLiveLocation( $signature ); + return $this->partitionQueues[$partition]->doIsRootJobOldDuplicate( $job ); + } + } + + return false; + } + + protected function doDeduplicateRootJob( IJobSpecification $job ) { + $signature = $job->getRootJobParams()['rootJobSignature']; + $partition = $this->partitionRing->getLiveLocation( $signature ); + try { + return $this->partitionQueues[$partition]->doDeduplicateRootJob( $job ); + } catch ( JobQueueError $e ) { + if ( $this->partitionRing->ejectFromLiveRing( $partition, 5 ) ) { + $partition = $this->partitionRing->getLiveLocation( $signature ); + return $this->partitionQueues[$partition]->doDeduplicateRootJob( $job ); + } + } + + return false; + } + + protected function doDelete() { + $failed = 0; + /** @var JobQueue $queue */ + foreach ( $this->partitionQueues as $queue ) { + try { + $queue->doDelete(); + } catch ( JobQueueError $e ) { + ++$failed; + $this->logException( $e ); + } + } + $this->throwErrorIfAllPartitionsDown( $failed ); + return true; + } + + protected function doWaitForBackups() { + $failed = 0; + /** @var JobQueue $queue */ + foreach ( $this->partitionQueues as $queue ) { + try { + $queue->waitForBackups(); + } catch ( JobQueueError $e ) { + ++$failed; + $this->logException( $e ); + } + } + $this->throwErrorIfAllPartitionsDown( $failed ); + } + + protected function doFlushCaches() { + /** @var JobQueue $queue */ + foreach ( $this->partitionQueues as $queue ) { + $queue->doFlushCaches(); + } + } + + public function getAllQueuedJobs() { + $iterator = new AppendIterator(); + + /** @var JobQueue $queue */ + foreach ( $this->partitionQueues as $queue ) { + $iterator->append( $queue->getAllQueuedJobs() ); + } + + return $iterator; + } + + public function getAllDelayedJobs() { + $iterator = new AppendIterator(); + + /** @var JobQueue $queue */ + foreach ( $this->partitionQueues as $queue ) { + $iterator->append( $queue->getAllDelayedJobs() ); + } + + return $iterator; + } + + public function getAllAcquiredJobs() { + $iterator = new AppendIterator(); + + /** @var JobQueue $queue */ + foreach ( $this->partitionQueues as $queue ) { + $iterator->append( $queue->getAllAcquiredJobs() ); + } + + return $iterator; + } + + public function getAllAbandonedJobs() { + $iterator = new AppendIterator(); + + /** @var JobQueue $queue */ + foreach ( $this->partitionQueues as $queue ) { + $iterator->append( $queue->getAllAbandonedJobs() ); + } + + return $iterator; + } + + public function getCoalesceLocationInternal() { + return "JobQueueFederated:wiki:{$this->wiki}" . + sha1( serialize( array_keys( $this->partitionQueues ) ) ); + } + + protected function doGetSiblingQueuesWithJobs( array $types ) { + $result = []; + + $failed = 0; + /** @var JobQueue $queue */ + foreach ( $this->partitionQueues as $queue ) { + try { + $nonEmpty = $queue->doGetSiblingQueuesWithJobs( $types ); + if ( is_array( $nonEmpty ) ) { + $result = array_unique( array_merge( $result, $nonEmpty ) ); + } else { + return null; // not supported on all partitions; bail + } + if ( count( $result ) == count( $types ) ) { + break; // short-circuit + } + } catch ( JobQueueError $e ) { + ++$failed; + $this->logException( $e ); + } + } + $this->throwErrorIfAllPartitionsDown( $failed ); + + return array_values( $result ); + } + + protected function doGetSiblingQueueSizes( array $types ) { + $result = []; + $failed = 0; + /** @var JobQueue $queue */ + foreach ( $this->partitionQueues as $queue ) { + try { + $sizes = $queue->doGetSiblingQueueSizes( $types ); + if ( is_array( $sizes ) ) { + foreach ( $sizes as $type => $size ) { + $result[$type] = isset( $result[$type] ) ? $result[$type] + $size : $size; + } + } else { + return null; // not supported on all partitions; bail + } + } catch ( JobQueueError $e ) { + ++$failed; + $this->logException( $e ); + } + } + $this->throwErrorIfAllPartitionsDown( $failed ); + + return $result; + } + + protected function logException( Exception $e ) { + wfDebugLog( 'JobQueueFederated', $e->getMessage() . "\n" . $e->getTraceAsString() ); + } + + /** + * Throw an error if no partitions available + * + * @param int $down The number of up partitions down + * @return void + * @throws JobQueueError + */ + protected function throwErrorIfAllPartitionsDown( $down ) { + if ( $down >= count( $this->partitionQueues ) ) { + throw new JobQueueError( 'No queue partitions available.' ); + } + } +} diff --git a/www/wiki/includes/jobqueue/JobQueueGroup.php b/www/wiki/includes/jobqueue/JobQueueGroup.php new file mode 100644 index 00000000..df55fc57 --- /dev/null +++ b/www/wiki/includes/jobqueue/JobQueueGroup.php @@ -0,0 +1,480 @@ +<?php +/** + * Job queue base code. + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with this program; if not, write to the Free Software Foundation, Inc., + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + * http://www.gnu.org/copyleft/gpl.html + * + * @file + */ + +/** + * Class to handle enqueueing of background jobs + * + * @ingroup JobQueue + * @since 1.21 + */ +class JobQueueGroup { + /** @var JobQueueGroup[] */ + protected static $instances = []; + + /** @var ProcessCacheLRU */ + protected $cache; + + /** @var string Wiki DB domain ID */ + protected $domain; + /** @var string|bool Read only rationale (or false if r/w) */ + protected $readOnlyReason; + /** @var bool Whether the wiki is not recognized in configuration */ + protected $invalidWiki = false; + + /** @var array Map of (bucket => (queue => JobQueue, types => list of types) */ + protected $coalescedQueues; + + /** @var Job[] */ + protected $bufferedJobs = []; + + const TYPE_DEFAULT = 1; // integer; jobs popped by default + const TYPE_ANY = 2; // integer; any job + + const USE_CACHE = 1; // integer; use process or persistent cache + + const PROC_CACHE_TTL = 15; // integer; seconds + + const CACHE_VERSION = 1; // integer; cache version + + /** + * @param string $domain Wiki DB domain ID + * @param string|bool $readOnlyReason Read-only reason or false + */ + protected function __construct( $domain, $readOnlyReason ) { + $this->domain = $domain; + $this->readOnlyReason = $readOnlyReason; + $this->cache = new ProcessCacheLRU( 10 ); + } + + /** + * @param bool|string $domain Wiki domain ID + * @return JobQueueGroup + */ + public static function singleton( $domain = false ) { + global $wgLocalDatabases; + + if ( $domain === false ) { + $domain = WikiMap::getCurrentWikiDbDomain()->getId(); + } + + if ( !isset( self::$instances[$domain] ) ) { + self::$instances[$domain] = new self( $domain, wfConfiguredReadOnlyReason() ); + // Make sure jobs are not getting pushed to bogus wikis. This can confuse + // the job runner system into spawning endless RPC requests that fail (T171371). + $wikiId = WikiMap::getWikiIdFromDomain( $domain ); + if ( + !WikiMap::isCurrentWikiDbDomain( $domain ) && + !in_array( $wikiId, $wgLocalDatabases ) + ) { + self::$instances[$domain]->invalidWiki = true; + } + } + + return self::$instances[$domain]; + } + + /** + * Destroy the singleton instances + * + * @return void + */ + public static function destroySingletons() { + self::$instances = []; + } + + /** + * Get the job queue object for a given queue type + * + * @param string $type + * @return JobQueue + */ + public function get( $type ) { + global $wgJobTypeConf; + + $conf = [ 'wiki' => $this->domain, 'type' => $type ]; + if ( isset( $wgJobTypeConf[$type] ) ) { + $conf = $conf + $wgJobTypeConf[$type]; + } else { + $conf = $conf + $wgJobTypeConf['default']; + } + $conf['aggregator'] = JobQueueAggregator::singleton(); + if ( $this->readOnlyReason !== false ) { + $conf['readOnlyReason'] = $this->readOnlyReason; + } + + return JobQueue::factory( $conf ); + } + + /** + * Insert jobs into the respective queues of which they belong + * + * This inserts the jobs into the queue specified by $wgJobTypeConf + * and updates the aggregate job queue information cache as needed. + * + * @param IJobSpecification|IJobSpecification[] $jobs A single Job or a list of Jobs + * @throws InvalidArgumentException + * @return void + */ + public function push( $jobs ) { + global $wgJobTypesExcludedFromDefaultQueue; + + if ( $this->invalidWiki ) { + // Do not enqueue job that cannot be run (T171371) + $e = new LogicException( "Domain '{$this->domain}' is not recognized." ); + MWExceptionHandler::logException( $e ); + return; + } + + $jobs = is_array( $jobs ) ? $jobs : [ $jobs ]; + if ( !count( $jobs ) ) { + return; + } + + $this->assertValidJobs( $jobs ); + + $jobsByType = []; // (job type => list of jobs) + foreach ( $jobs as $job ) { + $jobsByType[$job->getType()][] = $job; + } + + foreach ( $jobsByType as $type => $jobs ) { + $this->get( $type )->push( $jobs ); + } + + if ( $this->cache->has( 'queues-ready', 'list' ) ) { + $list = $this->cache->get( 'queues-ready', 'list' ); + if ( count( array_diff( array_keys( $jobsByType ), $list ) ) ) { + $this->cache->clear( 'queues-ready' ); + } + } + + $cache = ObjectCache::getLocalClusterInstance(); + $cache->set( + $cache->makeGlobalKey( 'jobqueue', $this->domain, 'hasjobs', self::TYPE_ANY ), + 'true', + 15 + ); + if ( array_diff( array_keys( $jobsByType ), $wgJobTypesExcludedFromDefaultQueue ) ) { + $cache->set( + $cache->makeGlobalKey( 'jobqueue', $this->domain, 'hasjobs', self::TYPE_DEFAULT ), + 'true', + 15 + ); + } + } + + /** + * Buffer jobs for insertion via push() or call it now if in CLI mode + * + * Note that pushLazyJobs() is registered as a deferred update just before + * DeferredUpdates::doUpdates() in MediaWiki and JobRunner classes in order + * to be executed as the very last deferred update (T100085, T154425). + * + * @param IJobSpecification|IJobSpecification[] $jobs A single Job or a list of Jobs + * @return void + * @since 1.26 + */ + public function lazyPush( $jobs ) { + if ( $this->invalidWiki ) { + // Do not enqueue job that cannot be run (T171371) + throw new LogicException( "Domain '{$this->domain}' is not recognized." ); + } + + if ( PHP_SAPI === 'cli' || PHP_SAPI === 'phpdbg' ) { + $this->push( $jobs ); + return; + } + + $jobs = is_array( $jobs ) ? $jobs : [ $jobs ]; + + // Throw errors now instead of on push(), when other jobs may be buffered + $this->assertValidJobs( $jobs ); + + $this->bufferedJobs = array_merge( $this->bufferedJobs, $jobs ); + } + + /** + * Push all jobs buffered via lazyPush() into their respective queues + * + * @return void + * @since 1.26 + */ + public static function pushLazyJobs() { + foreach ( self::$instances as $group ) { + try { + $group->push( $group->bufferedJobs ); + $group->bufferedJobs = []; + } catch ( Exception $e ) { + // Get in as many jobs as possible and let other post-send updates happen + MWExceptionHandler::logException( $e ); + } + } + } + + /** + * Pop a job off one of the job queues + * + * This pops a job off a queue as specified by $wgJobTypeConf and + * updates the aggregate job queue information cache as needed. + * + * @param int|string $qtype JobQueueGroup::TYPE_* constant or job type string + * @param int $flags Bitfield of JobQueueGroup::USE_* constants + * @param array $blacklist List of job types to ignore + * @return Job|bool Returns false on failure + */ + public function pop( $qtype = self::TYPE_DEFAULT, $flags = 0, array $blacklist = [] ) { + $job = false; + + if ( is_string( $qtype ) ) { // specific job type + if ( !in_array( $qtype, $blacklist ) ) { + $job = $this->get( $qtype )->pop(); + } + } else { // any job in the "default" jobs types + if ( $flags & self::USE_CACHE ) { + if ( !$this->cache->has( 'queues-ready', 'list', self::PROC_CACHE_TTL ) ) { + $this->cache->set( 'queues-ready', 'list', $this->getQueuesWithJobs() ); + } + $types = $this->cache->get( 'queues-ready', 'list' ); + } else { + $types = $this->getQueuesWithJobs(); + } + + if ( $qtype == self::TYPE_DEFAULT ) { + $types = array_intersect( $types, $this->getDefaultQueueTypes() ); + } + + $types = array_diff( $types, $blacklist ); // avoid selected types + shuffle( $types ); // avoid starvation + + foreach ( $types as $type ) { // for each queue... + $job = $this->get( $type )->pop(); + if ( $job ) { // found + break; + } else { // not found + $this->cache->clear( 'queues-ready' ); + } + } + } + + return $job; + } + + /** + * Acknowledge that a job was completed + * + * @param Job $job + * @return void + */ + public function ack( Job $job ) { + $this->get( $job->getType() )->ack( $job ); + } + + /** + * Register the "root job" of a given job into the queue for de-duplication. + * This should only be called right *after* all the new jobs have been inserted. + * + * @param Job $job + * @return bool + */ + public function deduplicateRootJob( Job $job ) { + return $this->get( $job->getType() )->deduplicateRootJob( $job ); + } + + /** + * Wait for any replica DBs or backup queue servers to catch up. + * + * This does nothing for certain queue classes. + * + * @return void + */ + public function waitForBackups() { + global $wgJobTypeConf; + + // Try to avoid doing this more than once per queue storage medium + foreach ( $wgJobTypeConf as $type => $conf ) { + $this->get( $type )->waitForBackups(); + } + } + + /** + * Get the list of queue types + * + * @return array List of strings + */ + public function getQueueTypes() { + return array_keys( $this->getCachedConfigVar( 'wgJobClasses' ) ); + } + + /** + * Get the list of default queue types + * + * @return array List of strings + */ + public function getDefaultQueueTypes() { + global $wgJobTypesExcludedFromDefaultQueue; + + return array_diff( $this->getQueueTypes(), $wgJobTypesExcludedFromDefaultQueue ); + } + + /** + * Check if there are any queues with jobs (this is cached) + * + * @param int $type JobQueueGroup::TYPE_* constant + * @return bool + * @since 1.23 + */ + public function queuesHaveJobs( $type = self::TYPE_ANY ) { + $cache = ObjectCache::getLocalClusterInstance(); + $key = $cache->makeGlobalKey( 'jobqueue', $this->domain, 'hasjobs', $type ); + + $value = $cache->get( $key ); + if ( $value === false ) { + $queues = $this->getQueuesWithJobs(); + if ( $type == self::TYPE_DEFAULT ) { + $queues = array_intersect( $queues, $this->getDefaultQueueTypes() ); + } + $value = count( $queues ) ? 'true' : 'false'; + $cache->add( $key, $value, 15 ); + } + + return ( $value === 'true' ); + } + + /** + * Get the list of job types that have non-empty queues + * + * @return array List of job types that have non-empty queues + */ + public function getQueuesWithJobs() { + $types = []; + foreach ( $this->getCoalescedQueues() as $info ) { + $nonEmpty = $info['queue']->getSiblingQueuesWithJobs( $this->getQueueTypes() ); + if ( is_array( $nonEmpty ) ) { // batching features supported + $types = array_merge( $types, $nonEmpty ); + } else { // we have to go through the queues in the bucket one-by-one + foreach ( $info['types'] as $type ) { + if ( !$this->get( $type )->isEmpty() ) { + $types[] = $type; + } + } + } + } + + return $types; + } + + /** + * Get the size of the queus for a list of job types + * + * @return array Map of (job type => size) + */ + public function getQueueSizes() { + $sizeMap = []; + foreach ( $this->getCoalescedQueues() as $info ) { + $sizes = $info['queue']->getSiblingQueueSizes( $this->getQueueTypes() ); + if ( is_array( $sizes ) ) { // batching features supported + $sizeMap = $sizeMap + $sizes; + } else { // we have to go through the queues in the bucket one-by-one + foreach ( $info['types'] as $type ) { + $sizeMap[$type] = $this->get( $type )->getSize(); + } + } + } + + return $sizeMap; + } + + /** + * @return array + */ + protected function getCoalescedQueues() { + global $wgJobTypeConf; + + if ( $this->coalescedQueues === null ) { + $this->coalescedQueues = []; + foreach ( $wgJobTypeConf as $type => $conf ) { + $queue = JobQueue::factory( + [ 'wiki' => $this->domain, 'type' => 'null' ] + $conf ); + $loc = $queue->getCoalesceLocationInternal(); + if ( !isset( $this->coalescedQueues[$loc] ) ) { + $this->coalescedQueues[$loc]['queue'] = $queue; + $this->coalescedQueues[$loc]['types'] = []; + } + if ( $type === 'default' ) { + $this->coalescedQueues[$loc]['types'] = array_merge( + $this->coalescedQueues[$loc]['types'], + array_diff( $this->getQueueTypes(), array_keys( $wgJobTypeConf ) ) + ); + } else { + $this->coalescedQueues[$loc]['types'][] = $type; + } + } + } + + return $this->coalescedQueues; + } + + /** + * @param string $name + * @return mixed + */ + private function getCachedConfigVar( $name ) { + // @TODO: cleanup this whole method with a proper config system + if ( WikiMap::isCurrentWikiDbDomain( $this->domain ) ) { + return $GLOBALS[$name]; // common case + } else { + $wiki = WikiMap::getWikiIdFromDomain( $this->domain ); + $cache = ObjectCache::getMainWANInstance(); + $value = $cache->getWithSetCallback( + $cache->makeGlobalKey( 'jobqueue', 'configvalue', $this->domain, $name ), + $cache::TTL_DAY + mt_rand( 0, $cache::TTL_DAY ), + function () use ( $wiki, $name ) { + global $wgConf; + // @TODO: use the full domain ID here + return [ 'v' => $wgConf->getConfig( $wiki, $name ) ]; + }, + [ 'pcTTL' => WANObjectCache::TTL_PROC_LONG ] + ); + return $value['v']; + } + } + + /** + * @param array $jobs + * @throws InvalidArgumentException + */ + private function assertValidJobs( array $jobs ) { + foreach ( $jobs as $job ) { // sanity checks + if ( !( $job instanceof IJobSpecification ) ) { + throw new InvalidArgumentException( "Expected IJobSpecification objects" ); + } + } + } + + function __destruct() { + $n = count( $this->bufferedJobs ); + if ( $n > 0 ) { + $type = implode( ', ', array_unique( array_map( 'get_class', $this->bufferedJobs ) ) ); + trigger_error( __METHOD__ . ": $n buffered job(s) of type(s) $type never inserted." ); + } + } +} diff --git a/www/wiki/includes/jobqueue/JobQueueMemory.php b/www/wiki/includes/jobqueue/JobQueueMemory.php new file mode 100644 index 00000000..f9e2c3dc --- /dev/null +++ b/www/wiki/includes/jobqueue/JobQueueMemory.php @@ -0,0 +1,230 @@ +<?php +/** + * PHP memory-backed job queue code. + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with this program; if not, write to the Free Software Foundation, Inc., + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + * http://www.gnu.org/copyleft/gpl.html + * + * @file + */ + +/** + * Class to handle job queues stored in PHP memory for testing + * + * JobQueueGroup does not remember every queue instance, so statically track it here + * + * @ingroup JobQueue + * @since 1.27 + */ +class JobQueueMemory extends JobQueue { + /** @var array[] */ + protected static $data = []; + + /** + * @see JobQueue::doBatchPush + * + * @param IJobSpecification[] $jobs + * @param int $flags + */ + protected function doBatchPush( array $jobs, $flags ) { + $unclaimed =& $this->getQueueData( 'unclaimed', [] ); + + foreach ( $jobs as $job ) { + if ( $job->ignoreDuplicates() ) { + $sha1 = Wikimedia\base_convert( + sha1( serialize( $job->getDeduplicationInfo() ) ), + 16, 36, 31 + ); + if ( !isset( $unclaimed[$sha1] ) ) { + $unclaimed[$sha1] = $job; + } + } else { + $unclaimed[] = $job; + } + } + } + + /** + * @see JobQueue::supportedOrders + * + * @return string[] + */ + protected function supportedOrders() { + return [ 'random', 'timestamp', 'fifo' ]; + } + + /** + * @see JobQueue::optimalOrder + * + * @return string + */ + protected function optimalOrder() { + return 'fifo'; + } + + /** + * @see JobQueue::doIsEmpty + * + * @return bool + */ + protected function doIsEmpty() { + return ( $this->doGetSize() == 0 ); + } + + /** + * @see JobQueue::doGetSize + * + * @return int + */ + protected function doGetSize() { + $unclaimed = $this->getQueueData( 'unclaimed' ); + + return $unclaimed ? count( $unclaimed ) : 0; + } + + /** + * @see JobQueue::doGetAcquiredCount + * + * @return int + */ + protected function doGetAcquiredCount() { + $claimed = $this->getQueueData( 'claimed' ); + + return $claimed ? count( $claimed ) : 0; + } + + /** + * @see JobQueue::doPop + * + * @return Job|bool + */ + protected function doPop() { + if ( $this->doGetSize() == 0 ) { + return false; + } + + $unclaimed =& $this->getQueueData( 'unclaimed' ); + $claimed =& $this->getQueueData( 'claimed', [] ); + + if ( $this->order === 'random' ) { + $key = array_rand( $unclaimed ); + } else { + reset( $unclaimed ); + $key = key( $unclaimed ); + } + + $spec = $unclaimed[$key]; + unset( $unclaimed[$key] ); + $claimed[] = $spec; + + $job = $this->jobFromSpecInternal( $spec ); + + end( $claimed ); + $job->metadata['claimId'] = key( $claimed ); + + return $job; + } + + /** + * @see JobQueue::doAck + * + * @param Job $job + */ + protected function doAck( Job $job ) { + if ( $this->getAcquiredCount() == 0 ) { + return; + } + + $claimed =& $this->getQueueData( 'claimed' ); + unset( $claimed[$job->metadata['claimId']] ); + } + + /** + * @see JobQueue::doDelete + */ + protected function doDelete() { + if ( isset( self::$data[$this->type][$this->wiki] ) ) { + unset( self::$data[$this->type][$this->wiki] ); + if ( !self::$data[$this->type] ) { + unset( self::$data[$this->type] ); + } + } + } + + /** + * @see JobQueue::getAllQueuedJobs + * + * @return Iterator of Job objects. + */ + public function getAllQueuedJobs() { + $unclaimed = $this->getQueueData( 'unclaimed' ); + if ( !$unclaimed ) { + return new ArrayIterator( [] ); + } + + return new MappedIterator( + $unclaimed, + function ( $value ) { + return $this->jobFromSpecInternal( $value ); + } + ); + } + + /** + * @see JobQueue::getAllAcquiredJobs + * + * @return Iterator of Job objects. + */ + public function getAllAcquiredJobs() { + $claimed = $this->getQueueData( 'claimed' ); + if ( !$claimed ) { + return new ArrayIterator( [] ); + } + + return new MappedIterator( + $claimed, + function ( $value ) { + return $this->jobFromSpecInternal( $value ); + } + ); + } + + /** + * @param IJobSpecification $spec + * + * @return Job + */ + public function jobFromSpecInternal( IJobSpecification $spec ) { + return Job::factory( $spec->getType(), $spec->getTitle(), $spec->getParams() ); + } + + /** + * @param string $field + * @param mixed $init + * + * @return mixed + */ + private function &getQueueData( $field, $init = null ) { + if ( !isset( self::$data[$this->type][$this->wiki][$field] ) ) { + if ( $init !== null ) { + self::$data[$this->type][$this->wiki][$field] = $init; + } else { + return $init; + } + } + + return self::$data[$this->type][$this->wiki][$field]; + } +} diff --git a/www/wiki/includes/jobqueue/JobQueueRedis.php b/www/wiki/includes/jobqueue/JobQueueRedis.php new file mode 100644 index 00000000..7dad014e --- /dev/null +++ b/www/wiki/includes/jobqueue/JobQueueRedis.php @@ -0,0 +1,820 @@ +<?php +/** + * Redis-backed job queue code. + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with this program; if not, write to the Free Software Foundation, Inc., + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + * http://www.gnu.org/copyleft/gpl.html + * + * @file + */ +use Psr\Log\LoggerInterface; + +/** + * Class to handle job queues stored in Redis + * + * This is a faster and less resource-intensive job queue than JobQueueDB. + * All data for a queue using this class is placed into one redis server. + * The mediawiki/services/jobrunner background service must be set up and running. + * + * There are eight main redis keys (per queue) used to track jobs: + * - l-unclaimed : A list of job IDs used for ready unclaimed jobs + * - z-claimed : A sorted set of (job ID, UNIX timestamp as score) used for job retries + * - z-abandoned : A sorted set of (job ID, UNIX timestamp as score) used for broken jobs + * - z-delayed : A sorted set of (job ID, UNIX timestamp as score) used for delayed jobs + * - h-idBySha1 : A hash of (SHA1 => job ID) for unclaimed jobs used for de-duplication + * - h-sha1ById : A hash of (job ID => SHA1) for unclaimed jobs used for de-duplication + * - h-attempts : A hash of (job ID => attempt count) used for job claiming/retries + * - h-data : A hash of (job ID => serialized blobs) for job storage + * A job ID can be in only one of z-delayed, l-unclaimed, z-claimed, and z-abandoned. + * If an ID appears in any of those lists, it should have a h-data entry for its ID. + * If a job has a SHA1 de-duplication value and its ID is in l-unclaimed or z-delayed, then + * there should be no other such jobs with that SHA1. Every h-idBySha1 entry has an h-sha1ById + * entry and every h-sha1ById must refer to an ID that is l-unclaimed. If a job has its + * ID in z-claimed or z-abandoned, then it must also have an h-attempts entry for its ID. + * + * The following keys are used to track queue states: + * - s-queuesWithJobs : A set of all queues with non-abandoned jobs + * + * The background service takes care of undelaying, recycling, and pruning jobs as well as + * removing s-queuesWithJobs entries as queues empty. + * + * Additionally, "rootjob:* keys track "root jobs" used for additional de-duplication. + * Aside from root job keys, all keys have no expiry, and are only removed when jobs are run. + * All the keys are prefixed with the relevant wiki ID information. + * + * This class requires Redis 2.6 as it makes use Lua scripts for fast atomic operations. + * Additionally, it should be noted that redis has different persistence modes, such + * as rdb snapshots, journaling, and no persistence. Appropriate configuration should be + * made on the servers based on what queues are using it and what tolerance they have. + * + * @ingroup JobQueue + * @ingroup Redis + * @since 1.22 + */ +class JobQueueRedis extends JobQueue { + /** @var RedisConnectionPool */ + protected $redisPool; + /** @var LoggerInterface */ + protected $logger; + + /** @var string Server address */ + protected $server; + /** @var string Compression method to use */ + protected $compression; + + const MAX_PUSH_SIZE = 25; // avoid tying up the server + + /** + * @param array $params Possible keys: + * - redisConfig : An array of parameters to RedisConnectionPool::__construct(). + * Note that the serializer option is ignored as "none" is always used. + * - redisServer : A hostname/port combination or the absolute path of a UNIX socket. + * If a hostname is specified but no port, the standard port number + * 6379 will be used. Required. + * - compression : The type of compression to use; one of (none,gzip). + * - daemonized : Set to true if the redisJobRunnerService runs in the background. + * This will disable job recycling/undelaying from the MediaWiki side + * to avoid redundance and out-of-sync configuration. + * @throws InvalidArgumentException + */ + public function __construct( array $params ) { + parent::__construct( $params ); + $params['redisConfig']['serializer'] = 'none'; // make it easy to use Lua + $this->server = $params['redisServer']; + $this->compression = isset( $params['compression'] ) ? $params['compression'] : 'none'; + $this->redisPool = RedisConnectionPool::singleton( $params['redisConfig'] ); + if ( empty( $params['daemonized'] ) ) { + throw new InvalidArgumentException( + "Non-daemonized mode is no longer supported. Please install the " . + "mediawiki/services/jobrunner service and update \$wgJobTypeConf as needed." ); + } + $this->logger = \MediaWiki\Logger\LoggerFactory::getInstance( 'redis' ); + } + + protected function supportedOrders() { + return [ 'timestamp', 'fifo' ]; + } + + protected function optimalOrder() { + return 'fifo'; + } + + protected function supportsDelayedJobs() { + return true; + } + + /** + * @see JobQueue::doIsEmpty() + * @return bool + * @throws JobQueueError + */ + protected function doIsEmpty() { + return $this->doGetSize() == 0; + } + + /** + * @see JobQueue::doGetSize() + * @return int + * @throws JobQueueError + */ + protected function doGetSize() { + $conn = $this->getConnection(); + try { + return $conn->lSize( $this->getQueueKey( 'l-unclaimed' ) ); + } catch ( RedisException $e ) { + $this->throwRedisException( $conn, $e ); + } + } + + /** + * @see JobQueue::doGetAcquiredCount() + * @return int + * @throws JobQueueError + */ + protected function doGetAcquiredCount() { + $conn = $this->getConnection(); + try { + $conn->multi( Redis::PIPELINE ); + $conn->zSize( $this->getQueueKey( 'z-claimed' ) ); + $conn->zSize( $this->getQueueKey( 'z-abandoned' ) ); + + return array_sum( $conn->exec() ); + } catch ( RedisException $e ) { + $this->throwRedisException( $conn, $e ); + } + } + + /** + * @see JobQueue::doGetDelayedCount() + * @return int + * @throws JobQueueError + */ + protected function doGetDelayedCount() { + $conn = $this->getConnection(); + try { + return $conn->zSize( $this->getQueueKey( 'z-delayed' ) ); + } catch ( RedisException $e ) { + $this->throwRedisException( $conn, $e ); + } + } + + /** + * @see JobQueue::doGetAbandonedCount() + * @return int + * @throws JobQueueError + */ + protected function doGetAbandonedCount() { + $conn = $this->getConnection(); + try { + return $conn->zSize( $this->getQueueKey( 'z-abandoned' ) ); + } catch ( RedisException $e ) { + $this->throwRedisException( $conn, $e ); + } + } + + /** + * @see JobQueue::doBatchPush() + * @param IJobSpecification[] $jobs + * @param int $flags + * @return void + * @throws JobQueueError + */ + protected function doBatchPush( array $jobs, $flags ) { + // Convert the jobs into field maps (de-duplicated against each other) + $items = []; // (job ID => job fields map) + foreach ( $jobs as $job ) { + $item = $this->getNewJobFields( $job ); + if ( strlen( $item['sha1'] ) ) { // hash identifier => de-duplicate + $items[$item['sha1']] = $item; + } else { + $items[$item['uuid']] = $item; + } + } + + if ( !count( $items ) ) { + return; // nothing to do + } + + $conn = $this->getConnection(); + try { + // Actually push the non-duplicate jobs into the queue... + if ( $flags & self::QOS_ATOMIC ) { + $batches = [ $items ]; // all or nothing + } else { + $batches = array_chunk( $items, self::MAX_PUSH_SIZE ); + } + $failed = 0; + $pushed = 0; + foreach ( $batches as $itemBatch ) { + $added = $this->pushBlobs( $conn, $itemBatch ); + if ( is_int( $added ) ) { + $pushed += $added; + } else { + $failed += count( $itemBatch ); + } + } + JobQueue::incrStats( 'inserts', $this->type, count( $items ) ); + JobQueue::incrStats( 'inserts_actual', $this->type, $pushed ); + JobQueue::incrStats( 'dupe_inserts', $this->type, + count( $items ) - $failed - $pushed ); + if ( $failed > 0 ) { + $err = "Could not insert {$failed} {$this->type} job(s)."; + wfDebugLog( 'JobQueueRedis', $err ); + throw new RedisException( $err ); + } + } catch ( RedisException $e ) { + $this->throwRedisException( $conn, $e ); + } + } + + /** + * @param RedisConnRef $conn + * @param array $items List of results from JobQueueRedis::getNewJobFields() + * @return int Number of jobs inserted (duplicates are ignored) + * @throws RedisException + */ + protected function pushBlobs( RedisConnRef $conn, array $items ) { + $args = [ $this->encodeQueueName() ]; + // Next args come in 4s ([id, sha1, rtime, blob [, id, sha1, rtime, blob ... ] ] ) + foreach ( $items as $item ) { + $args[] = (string)$item['uuid']; + $args[] = (string)$item['sha1']; + $args[] = (string)$item['rtimestamp']; + $args[] = (string)$this->serialize( $item ); + } + static $script = + /** @lang Lua */ +<<<LUA + local kUnclaimed, kSha1ById, kIdBySha1, kDelayed, kData, kQwJobs = unpack(KEYS) + -- First argument is the queue ID + local queueId = ARGV[1] + -- Next arguments all come in 4s (one per job) + local variadicArgCount = #ARGV - 1 + if variadicArgCount % 4 ~= 0 then + return redis.error_reply('Unmatched arguments') + end + -- Insert each job into this queue as needed + local pushed = 0 + for i = 2,#ARGV,4 do + local id,sha1,rtimestamp,blob = ARGV[i],ARGV[i+1],ARGV[i+2],ARGV[i+3] + if sha1 == '' or redis.call('hExists',kIdBySha1,sha1) == 0 then + if 1*rtimestamp > 0 then + -- Insert into delayed queue (release time as score) + redis.call('zAdd',kDelayed,rtimestamp,id) + else + -- Insert into unclaimed queue + redis.call('lPush',kUnclaimed,id) + end + if sha1 ~= '' then + redis.call('hSet',kSha1ById,id,sha1) + redis.call('hSet',kIdBySha1,sha1,id) + end + redis.call('hSet',kData,id,blob) + pushed = pushed + 1 + end + end + -- Mark this queue as having jobs + redis.call('sAdd',kQwJobs,queueId) + return pushed +LUA; + return $conn->luaEval( $script, + array_merge( + [ + $this->getQueueKey( 'l-unclaimed' ), # KEYS[1] + $this->getQueueKey( 'h-sha1ById' ), # KEYS[2] + $this->getQueueKey( 'h-idBySha1' ), # KEYS[3] + $this->getQueueKey( 'z-delayed' ), # KEYS[4] + $this->getQueueKey( 'h-data' ), # KEYS[5] + $this->getGlobalKey( 's-queuesWithJobs' ), # KEYS[6] + ], + $args + ), + 6 # number of first argument(s) that are keys + ); + } + + /** + * @see JobQueue::doPop() + * @return Job|bool + * @throws JobQueueError + */ + protected function doPop() { + $job = false; + + $conn = $this->getConnection(); + try { + do { + $blob = $this->popAndAcquireBlob( $conn ); + if ( !is_string( $blob ) ) { + break; // no jobs; nothing to do + } + + JobQueue::incrStats( 'pops', $this->type ); + $item = $this->unserialize( $blob ); + if ( $item === false ) { + wfDebugLog( 'JobQueueRedis', "Could not unserialize {$this->type} job." ); + continue; + } + + // If $item is invalid, the runner loop recyling will cleanup as needed + $job = $this->getJobFromFields( $item ); // may be false + } while ( !$job ); // job may be false if invalid + } catch ( RedisException $e ) { + $this->throwRedisException( $conn, $e ); + } + + return $job; + } + + /** + * @param RedisConnRef $conn + * @return array Serialized string or false + * @throws RedisException + */ + protected function popAndAcquireBlob( RedisConnRef $conn ) { + static $script = + /** @lang Lua */ +<<<LUA + local kUnclaimed, kSha1ById, kIdBySha1, kClaimed, kAttempts, kData = unpack(KEYS) + local rTime = unpack(ARGV) + -- Pop an item off the queue + local id = redis.call('rPop',kUnclaimed) + if not id then + return false + end + -- Allow new duplicates of this job + local sha1 = redis.call('hGet',kSha1ById,id) + if sha1 then redis.call('hDel',kIdBySha1,sha1) end + redis.call('hDel',kSha1ById,id) + -- Mark the jobs as claimed and return it + redis.call('zAdd',kClaimed,rTime,id) + redis.call('hIncrBy',kAttempts,id,1) + return redis.call('hGet',kData,id) +LUA; + return $conn->luaEval( $script, + [ + $this->getQueueKey( 'l-unclaimed' ), # KEYS[1] + $this->getQueueKey( 'h-sha1ById' ), # KEYS[2] + $this->getQueueKey( 'h-idBySha1' ), # KEYS[3] + $this->getQueueKey( 'z-claimed' ), # KEYS[4] + $this->getQueueKey( 'h-attempts' ), # KEYS[5] + $this->getQueueKey( 'h-data' ), # KEYS[6] + time(), # ARGV[1] (injected to be replication-safe) + ], + 6 # number of first argument(s) that are keys + ); + } + + /** + * @see JobQueue::doAck() + * @param Job $job + * @return Job|bool + * @throws UnexpectedValueException + * @throws JobQueueError + */ + protected function doAck( Job $job ) { + if ( !isset( $job->metadata['uuid'] ) ) { + throw new UnexpectedValueException( "Job of type '{$job->getType()}' has no UUID." ); + } + + $uuid = $job->metadata['uuid']; + $conn = $this->getConnection(); + try { + static $script = + /** @lang Lua */ +<<<LUA + local kClaimed, kAttempts, kData = unpack(KEYS) + local id = unpack(ARGV) + -- Unmark the job as claimed + local removed = redis.call('zRem',kClaimed,id) + -- Check if the job was recycled + if removed == 0 then + return 0 + end + -- Delete the retry data + redis.call('hDel',kAttempts,id) + -- Delete the job data itself + return redis.call('hDel',kData,id) +LUA; + $res = $conn->luaEval( $script, + [ + $this->getQueueKey( 'z-claimed' ), # KEYS[1] + $this->getQueueKey( 'h-attempts' ), # KEYS[2] + $this->getQueueKey( 'h-data' ), # KEYS[3] + $uuid # ARGV[1] + ], + 3 # number of first argument(s) that are keys + ); + + if ( !$res ) { + wfDebugLog( 'JobQueueRedis', "Could not acknowledge {$this->type} job $uuid." ); + + return false; + } + + JobQueue::incrStats( 'acks', $this->type ); + } catch ( RedisException $e ) { + $this->throwRedisException( $conn, $e ); + } + + return true; + } + + /** + * @see JobQueue::doDeduplicateRootJob() + * @param IJobSpecification $job + * @return bool + * @throws JobQueueError + * @throws LogicException + */ + protected function doDeduplicateRootJob( IJobSpecification $job ) { + if ( !$job->hasRootJobParams() ) { + throw new LogicException( "Cannot register root job; missing parameters." ); + } + $params = $job->getRootJobParams(); + + $key = $this->getRootJobCacheKey( $params['rootJobSignature'] ); + + $conn = $this->getConnection(); + try { + $timestamp = $conn->get( $key ); // current last timestamp of this job + if ( $timestamp && $timestamp >= $params['rootJobTimestamp'] ) { + return true; // a newer version of this root job was enqueued + } + + // Update the timestamp of the last root job started at the location... + return $conn->set( $key, $params['rootJobTimestamp'], self::ROOTJOB_TTL ); // 2 weeks + } catch ( RedisException $e ) { + $this->throwRedisException( $conn, $e ); + } + } + + /** + * @see JobQueue::doIsRootJobOldDuplicate() + * @param Job $job + * @return bool + * @throws JobQueueError + */ + protected function doIsRootJobOldDuplicate( Job $job ) { + if ( !$job->hasRootJobParams() ) { + return false; // job has no de-deplication info + } + $params = $job->getRootJobParams(); + + $conn = $this->getConnection(); + try { + // Get the last time this root job was enqueued + $timestamp = $conn->get( $this->getRootJobCacheKey( $params['rootJobSignature'] ) ); + } catch ( RedisException $e ) { + $timestamp = false; + $this->throwRedisException( $conn, $e ); + } + + // Check if a new root job was started at the location after this one's... + return ( $timestamp && $timestamp > $params['rootJobTimestamp'] ); + } + + /** + * @see JobQueue::doDelete() + * @return bool + * @throws JobQueueError + */ + protected function doDelete() { + static $props = [ 'l-unclaimed', 'z-claimed', 'z-abandoned', + 'z-delayed', 'h-idBySha1', 'h-sha1ById', 'h-attempts', 'h-data' ]; + + $conn = $this->getConnection(); + try { + $keys = []; + foreach ( $props as $prop ) { + $keys[] = $this->getQueueKey( $prop ); + } + + $ok = ( $conn->delete( $keys ) !== false ); + $conn->sRem( $this->getGlobalKey( 's-queuesWithJobs' ), $this->encodeQueueName() ); + + return $ok; + } catch ( RedisException $e ) { + $this->throwRedisException( $conn, $e ); + } + } + + /** + * @see JobQueue::getAllQueuedJobs() + * @return Iterator + * @throws JobQueueError + */ + public function getAllQueuedJobs() { + $conn = $this->getConnection(); + try { + $uids = $conn->lRange( $this->getQueueKey( 'l-unclaimed' ), 0, -1 ); + } catch ( RedisException $e ) { + $this->throwRedisException( $conn, $e ); + } + + return $this->getJobIterator( $conn, $uids ); + } + + /** + * @see JobQueue::getAllDelayedJobs() + * @return Iterator + * @throws JobQueueError + */ + public function getAllDelayedJobs() { + $conn = $this->getConnection(); + try { + $uids = $conn->zRange( $this->getQueueKey( 'z-delayed' ), 0, -1 ); + } catch ( RedisException $e ) { + $this->throwRedisException( $conn, $e ); + } + + return $this->getJobIterator( $conn, $uids ); + } + + /** + * @see JobQueue::getAllAcquiredJobs() + * @return Iterator + * @throws JobQueueError + */ + public function getAllAcquiredJobs() { + $conn = $this->getConnection(); + try { + $uids = $conn->zRange( $this->getQueueKey( 'z-claimed' ), 0, -1 ); + } catch ( RedisException $e ) { + $this->throwRedisException( $conn, $e ); + } + + return $this->getJobIterator( $conn, $uids ); + } + + /** + * @see JobQueue::getAllAbandonedJobs() + * @return Iterator + * @throws JobQueueError + */ + public function getAllAbandonedJobs() { + $conn = $this->getConnection(); + try { + $uids = $conn->zRange( $this->getQueueKey( 'z-abandoned' ), 0, -1 ); + } catch ( RedisException $e ) { + $this->throwRedisException( $conn, $e ); + } + + return $this->getJobIterator( $conn, $uids ); + } + + /** + * @param RedisConnRef $conn + * @param array $uids List of job UUIDs + * @return MappedIterator + */ + protected function getJobIterator( RedisConnRef $conn, array $uids ) { + return new MappedIterator( + $uids, + function ( $uid ) use ( $conn ) { + return $this->getJobFromUidInternal( $uid, $conn ); + }, + [ 'accept' => function ( $job ) { + return is_object( $job ); + } ] + ); + } + + public function getCoalesceLocationInternal() { + return "RedisServer:" . $this->server; + } + + protected function doGetSiblingQueuesWithJobs( array $types ) { + return array_keys( array_filter( $this->doGetSiblingQueueSizes( $types ) ) ); + } + + protected function doGetSiblingQueueSizes( array $types ) { + $sizes = []; // (type => size) + $types = array_values( $types ); // reindex + $conn = $this->getConnection(); + try { + $conn->multi( Redis::PIPELINE ); + foreach ( $types as $type ) { + $conn->lSize( $this->getQueueKey( 'l-unclaimed', $type ) ); + } + $res = $conn->exec(); + if ( is_array( $res ) ) { + foreach ( $res as $i => $size ) { + $sizes[$types[$i]] = $size; + } + } + } catch ( RedisException $e ) { + $this->throwRedisException( $conn, $e ); + } + + return $sizes; + } + + /** + * This function should not be called outside JobQueueRedis + * + * @param string $uid + * @param RedisConnRef $conn + * @return Job|bool Returns false if the job does not exist + * @throws JobQueueError + * @throws UnexpectedValueException + */ + public function getJobFromUidInternal( $uid, RedisConnRef $conn ) { + try { + $data = $conn->hGet( $this->getQueueKey( 'h-data' ), $uid ); + if ( $data === false ) { + return false; // not found + } + $item = $this->unserialize( $data ); + if ( !is_array( $item ) ) { // this shouldn't happen + throw new UnexpectedValueException( "Could not find job with ID '$uid'." ); + } + $title = Title::makeTitle( $item['namespace'], $item['title'] ); + $job = Job::factory( $item['type'], $title, $item['params'] ); + $job->metadata['uuid'] = $item['uuid']; + $job->metadata['timestamp'] = $item['timestamp']; + // Add in attempt count for debugging at showJobs.php + $job->metadata['attempts'] = $conn->hGet( $this->getQueueKey( 'h-attempts' ), $uid ); + + return $job; + } catch ( RedisException $e ) { + $this->throwRedisException( $conn, $e ); + } + } + + /** + * @return array List of (wiki,type) tuples for queues with non-abandoned jobs + * @throws JobQueueConnectionError + * @throws JobQueueError + */ + public function getServerQueuesWithJobs() { + $queues = []; + + $conn = $this->getConnection(); + try { + $set = $conn->sMembers( $this->getGlobalKey( 's-queuesWithJobs' ) ); + foreach ( $set as $queue ) { + $queues[] = $this->decodeQueueName( $queue ); + } + } catch ( RedisException $e ) { + $this->throwRedisException( $conn, $e ); + } + + return $queues; + } + + /** + * @param IJobSpecification $job + * @return array + */ + protected function getNewJobFields( IJobSpecification $job ) { + return [ + // Fields that describe the nature of the job + 'type' => $job->getType(), + 'namespace' => $job->getTitle()->getNamespace(), + 'title' => $job->getTitle()->getDBkey(), + 'params' => $job->getParams(), + // Some jobs cannot run until a "release timestamp" + 'rtimestamp' => $job->getReleaseTimestamp() ?: 0, + // Additional job metadata + 'uuid' => UIDGenerator::newRawUUIDv4( UIDGenerator::QUICK_RAND ), + 'sha1' => $job->ignoreDuplicates() + ? Wikimedia\base_convert( sha1( serialize( $job->getDeduplicationInfo() ) ), 16, 36, 31 ) + : '', + 'timestamp' => time() // UNIX timestamp + ]; + } + + /** + * @param array $fields + * @return Job|bool + */ + protected function getJobFromFields( array $fields ) { + $title = Title::makeTitle( $fields['namespace'], $fields['title'] ); + $job = Job::factory( $fields['type'], $title, $fields['params'] ); + $job->metadata['uuid'] = $fields['uuid']; + $job->metadata['timestamp'] = $fields['timestamp']; + + return $job; + } + + /** + * @param array $fields + * @return string Serialized and possibly compressed version of $fields + */ + protected function serialize( array $fields ) { + $blob = serialize( $fields ); + if ( $this->compression === 'gzip' + && strlen( $blob ) >= 1024 + && function_exists( 'gzdeflate' ) + ) { + $object = (object)[ 'blob' => gzdeflate( $blob ), 'enc' => 'gzip' ]; + $blobz = serialize( $object ); + + return ( strlen( $blobz ) < strlen( $blob ) ) ? $blobz : $blob; + } else { + return $blob; + } + } + + /** + * @param string $blob + * @return array|bool Unserialized version of $blob or false + */ + protected function unserialize( $blob ) { + $fields = unserialize( $blob ); + if ( is_object( $fields ) ) { + if ( $fields->enc === 'gzip' && function_exists( 'gzinflate' ) ) { + $fields = unserialize( gzinflate( $fields->blob ) ); + } else { + $fields = false; + } + } + + return is_array( $fields ) ? $fields : false; + } + + /** + * Get a connection to the server that handles all sub-queues for this queue + * + * @return RedisConnRef + * @throws JobQueueConnectionError + */ + protected function getConnection() { + $conn = $this->redisPool->getConnection( $this->server, $this->logger ); + if ( !$conn ) { + throw new JobQueueConnectionError( + "Unable to connect to redis server {$this->server}." ); + } + + return $conn; + } + + /** + * @param RedisConnRef $conn + * @param RedisException $e + * @throws JobQueueError + */ + protected function throwRedisException( RedisConnRef $conn, $e ) { + $this->redisPool->handleError( $conn, $e ); + throw new JobQueueError( "Redis server error: {$e->getMessage()}\n" ); + } + + /** + * @return string JSON + */ + private function encodeQueueName() { + return json_encode( [ $this->type, $this->wiki ] ); + } + + /** + * @param string $name JSON + * @return array (type, wiki) + */ + private function decodeQueueName( $name ) { + return json_decode( $name ); + } + + /** + * @param string $name + * @return string + */ + private function getGlobalKey( $name ) { + $parts = [ 'global', 'jobqueue', $name ]; + foreach ( $parts as $part ) { + if ( !preg_match( '/[a-zA-Z0-9_-]+/', $part ) ) { + throw new InvalidArgumentException( "Key part characters are out of range." ); + } + } + + return implode( ':', $parts ); + } + + /** + * @param string $prop + * @param string|null $type Override this for sibling queues + * @return string + */ + private function getQueueKey( $prop, $type = null ) { + $type = is_string( $type ) ? $type : $this->type; + list( $db, $prefix ) = wfSplitWikiID( $this->wiki ); + $keyspace = $prefix ? "$db-$prefix" : $db; + + $parts = [ $keyspace, 'jobqueue', $type, $prop ]; + + // Parts are typically ASCII, but encode for sanity to escape ":" + return implode( ':', array_map( 'rawurlencode', $parts ) ); + } +} diff --git a/www/wiki/includes/jobqueue/JobQueueSecondTestQueue.php b/www/wiki/includes/jobqueue/JobQueueSecondTestQueue.php new file mode 100644 index 00000000..01f467f2 --- /dev/null +++ b/www/wiki/includes/jobqueue/JobQueueSecondTestQueue.php @@ -0,0 +1,290 @@ +<?php + +/** + * A wrapper for the JobQueue that delegates all the method calls to a single, + * main queue, and also pushes all the jobs to a second job queue that's being + * debugged. + * + * This class was temporary added to test transitioning to the JobQueueEventBus + * and will removed after the transition is completed. This code is only needed + * while we are testing the new infrastructure to be able to compare the results + * between the queue implementations and make sure that they process the same jobs, + * deduplicate correctly, compare the delays, backlogs and make sure no jobs are lost. + * When the new infrastructure is well tested this will not be needed any more. + * + * @deprecated since 1.30 + * @since 1.30 + */ +class JobQueueSecondTestQueue extends JobQueue { + + /** + * @var JobQueue + */ + private $mainQueue; + + /** + * @var JobQueue + */ + private $debugQueue; + + /** + * @var bool + */ + private $onlyWriteToDebugQueue; + + protected function __construct( array $params ) { + if ( !isset( $params['mainqueue'] ) ) { + throw new MWException( "mainqueue parameter must be provided to the debug queue" ); + } + + if ( !isset( $params['debugqueue'] ) ) { + throw new MWException( "debugqueue parameter must be provided to the debug queue" ); + } + + $conf = [ 'wiki' => $params['wiki'], 'type' => $params['type'] ]; + $this->mainQueue = JobQueue::factory( $params['mainqueue'] + $conf ); + $this->debugQueue = JobQueue::factory( $params['debugqueue'] + $conf ); + $this->onlyWriteToDebugQueue = isset( $params['readonly'] ) ? $params['readonly'] : false; + + // We need to construct parent after creating the main and debug queue + // because super constructor calls some methods we delegate to the main queue. + parent::__construct( $params ); + } + + /** + * Get the allowed queue orders for configuration validation + * + * @return array Subset of (random, timestamp, fifo, undefined) + */ + protected function supportedOrders() { + return $this->mainQueue->supportedOrders(); + } + + /** + * Get the default queue order to use if configuration does not specify one + * + * @return string One of (random, timestamp, fifo, undefined) + */ + protected function optimalOrder() { + return $this->mainQueue->optimalOrder(); + } + + /** + * Find out if delayed jobs are supported for configuration validation + * + * @return bool Whether delayed jobs are supported + */ + protected function supportsDelayedJobs() { + return $this->mainQueue->supportsDelayedJobs(); + } + + /** + * @see JobQueue::isEmpty() + * @return bool + */ + protected function doIsEmpty() { + return $this->mainQueue->doIsEmpty(); + } + + /** + * @see JobQueue::getSize() + * @return int + */ + protected function doGetSize() { + return $this->mainQueue->doGetSize(); + } + + /** + * @see JobQueue::getAcquiredCount() + * @return int + */ + protected function doGetAcquiredCount() { + return $this->mainQueue->doGetAcquiredCount(); + } + + /** + * @see JobQueue::getDelayedCount() + * @return int + */ + protected function doGetDelayedCount() { + return $this->mainQueue->doGetDelayedCount(); + } + + /** + * @see JobQueue::getAbandonedCount() + * @return int + */ + protected function doGetAbandonedCount() { + return $this->mainQueue->doGetAbandonedCount(); + } + + /** + * @see JobQueue::batchPush() + * @param IJobSpecification[] $jobs + * @param int $flags + */ + protected function doBatchPush( array $jobs, $flags ) { + if ( !$this->onlyWriteToDebugQueue ) { + $this->mainQueue->doBatchPush( $jobs, $flags ); + } + + try { + $this->debugQueue->doBatchPush( $jobs, $flags ); + } catch ( Exception $exception ) { + MWExceptionHandler::logException( $exception ); + } + } + + /** + * @see JobQueue::pop() + * @return Job|bool + */ + protected function doPop() { + return $this->mainQueue->doPop(); + } + + /** + * @see JobQueue::ack() + * @param Job $job + * @return Job|bool + */ + protected function doAck( Job $job ) { + return $this->mainQueue->doAck( $job ); + } + + /** + * @see JobQueue::deduplicateRootJob() + * @param IJobSpecification $job + * @throws MWException + * @return bool + */ + protected function doDeduplicateRootJob( IJobSpecification $job ) { + return $this->mainQueue->doDeduplicateRootJob( $job ); + } + + /** + * @see JobQueue::isRootJobOldDuplicate() + * @param Job $job + * @return bool + */ + protected function doIsRootJobOldDuplicate( Job $job ) { + return $this->mainQueue->doIsRootJobOldDuplicate( $job ); + } + + /** + * @param string $signature Hash identifier of the root job + * @return string + */ + protected function getRootJobCacheKey( $signature ) { + return $this->mainQueue->getRootJobCacheKey( $signature ); + } + + /** + * @see JobQueue::delete() + * @return bool + * @throws MWException + */ + protected function doDelete() { + return $this->mainQueue->doDelete(); + } + + /** + * @see JobQueue::waitForBackups() + * @return void + */ + protected function doWaitForBackups() { + $this->mainQueue->doWaitForBackups(); + } + + /** + * @see JobQueue::flushCaches() + * @return void + */ + protected function doFlushCaches() { + $this->mainQueue->doFlushCaches(); + } + + /** + * Get an iterator to traverse over all available jobs in this queue. + * This does not include jobs that are currently acquired or delayed. + * Note: results may be stale if the queue is concurrently modified. + * + * @return Iterator + * @throws JobQueueError + */ + public function getAllQueuedJobs() { + return $this->mainQueue->getAllQueuedJobs(); + } + + /** + * Get an iterator to traverse over all delayed jobs in this queue. + * Note: results may be stale if the queue is concurrently modified. + * + * @return Iterator + * @throws JobQueueError + * @since 1.22 + */ + public function getAllDelayedJobs() { + return $this->mainQueue->getAllDelayedJobs(); + } + + /** + * Get an iterator to traverse over all claimed jobs in this queue + * + * Callers should be quick to iterator over it or few results + * will be returned due to jobs being acknowledged and deleted + * + * @return Iterator + * @throws JobQueueError + * @since 1.26 + */ + public function getAllAcquiredJobs() { + return $this->mainQueue->getAllAcquiredJobs(); + } + + /** + * Get an iterator to traverse over all abandoned jobs in this queue + * + * @return Iterator + * @throws JobQueueError + * @since 1.25 + */ + public function getAllAbandonedJobs() { + return $this->mainQueue->getAllAbandonedJobs(); + } + + /** + * Do not use this function outside of JobQueue/JobQueueGroup + * + * @return string + * @since 1.22 + */ + public function getCoalesceLocationInternal() { + return $this->mainQueue->getCoalesceLocationInternal(); + } + + /** + * @see JobQueue::getSiblingQueuesWithJobs() + * @param array $types List of queues types + * @return array|null (list of queue types) or null if unsupported + */ + protected function doGetSiblingQueuesWithJobs( array $types ) { + return $this->mainQueue->doGetSiblingQueuesWithJobs( $types ); + } + + /** + * @see JobQueue::getSiblingQueuesSize() + * @param array $types List of queues types + * @return array|null (list of queue types) or null if unsupported + */ + protected function doGetSiblingQueueSizes( array $types ) { + return $this->mainQueue->doGetSiblingQueueSizes( $types ); + } + + /** + * @throws JobQueueReadOnlyError + */ + protected function assertNotReadOnly() { + $this->mainQueue->assertNotReadOnly(); + } +} diff --git a/www/wiki/includes/jobqueue/JobRunner.php b/www/wiki/includes/jobqueue/JobRunner.php new file mode 100644 index 00000000..977fbdaa --- /dev/null +++ b/www/wiki/includes/jobqueue/JobRunner.php @@ -0,0 +1,607 @@ +<?php +/** + * Job queue runner utility methods + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with this program; if not, write to the Free Software Foundation, Inc., + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + * http://www.gnu.org/copyleft/gpl.html + * + * @file + * @ingroup JobQueue + */ + +use MediaWiki\MediaWikiServices; +use MediaWiki\Logger\LoggerFactory; +use Liuggio\StatsdClient\Factory\StatsdDataFactory; +use Psr\Log\LoggerAwareInterface; +use Psr\Log\LoggerInterface; +use Wikimedia\ScopedCallback; +use Wikimedia\Rdbms\LBFactory; +use Wikimedia\Rdbms\DBError; +use Wikimedia\Rdbms\DBReplicationWaitError; + +/** + * Job queue runner utility methods + * + * @ingroup JobQueue + * @since 1.24 + */ +class JobRunner implements LoggerAwareInterface { + /** @var Config */ + protected $config; + /** @var callable|null Debug output handler */ + protected $debug; + + /** + * @var LoggerInterface $logger + */ + protected $logger; + + const MAX_ALLOWED_LAG = 3; // abort if more than this much DB lag is present + const LAG_CHECK_PERIOD = 1.0; // check replica DB lag this many seconds + const ERROR_BACKOFF_TTL = 1; // seconds to back off a queue due to errors + const READONLY_BACKOFF_TTL = 30; // seconds to back off a queue due to read-only errors + + /** + * @param callable $debug Optional debug output handler + */ + public function setDebugHandler( $debug ) { + $this->debug = $debug; + } + + /** + * @param LoggerInterface $logger + * @return void + */ + public function setLogger( LoggerInterface $logger ) { + $this->logger = $logger; + } + + /** + * @param LoggerInterface $logger + */ + public function __construct( LoggerInterface $logger = null ) { + if ( $logger === null ) { + $logger = LoggerFactory::getInstance( 'runJobs' ); + } + $this->setLogger( $logger ); + $this->config = MediaWikiServices::getInstance()->getMainConfig(); + } + + /** + * Run jobs of the specified number/type for the specified time + * + * The response map has a 'job' field that lists status of each job, including: + * - type : the job type + * - status : ok/failed + * - error : any error message string + * - time : the job run time in ms + * The response map also has: + * - backoffs : the (job type => seconds) map of backoff times + * - elapsed : the total time spent running tasks in ms + * - reached : the reason the script finished, one of (none-ready, job-limit, time-limit, + * memory-limit) + * + * This method outputs status information only if a debug handler was set. + * Any exceptions are caught and logged, but are not reported as output. + * + * @param array $options Map of parameters: + * - type : the job type (or false for the default types) + * - maxJobs : maximum number of jobs to run + * - maxTime : maximum time in seconds before stopping + * - throttle : whether to respect job backoff configuration + * @return array Summary response that can easily be JSON serialized + */ + public function run( array $options ) { + $jobClasses = $this->config->get( 'JobClasses' ); + $profilerLimits = $this->config->get( 'TrxProfilerLimits' ); + + $response = [ 'jobs' => [], 'reached' => 'none-ready' ]; + + $type = isset( $options['type'] ) ? $options['type'] : false; + $maxJobs = isset( $options['maxJobs'] ) ? $options['maxJobs'] : false; + $maxTime = isset( $options['maxTime'] ) ? $options['maxTime'] : false; + $noThrottle = isset( $options['throttle'] ) && !$options['throttle']; + + // Bail if job type is invalid + if ( $type !== false && !isset( $jobClasses[$type] ) ) { + $response['reached'] = 'none-possible'; + return $response; + } + + // Bail out if DB is in read-only mode + if ( wfReadOnly() ) { + $response['reached'] = 'read-only'; + return $response; + } + + $lbFactory = MediaWikiServices::getInstance()->getDBLoadBalancerFactory(); + if ( $lbFactory->hasTransactionRound() ) { + throw new LogicException( __METHOD__ . ' called with an active transaction round.' ); + } + // Bail out if there is too much DB lag. + // This check should not block as we want to try other wiki queues. + list( , $maxLag ) = $lbFactory->getMainLB( wfWikiID() )->getMaxLag(); + if ( $maxLag >= self::MAX_ALLOWED_LAG ) { + $response['reached'] = 'replica-lag-limit'; + return $response; + } + + // Catch huge single updates that lead to replica DB lag + $trxProfiler = Profiler::instance()->getTransactionProfiler(); + $trxProfiler->setLogger( LoggerFactory::getInstance( 'DBPerformance' ) ); + $trxProfiler->setExpectations( $profilerLimits['JobRunner'], __METHOD__ ); + + // Some jobs types should not run until a certain timestamp + $backoffs = []; // map of (type => UNIX expiry) + $backoffDeltas = []; // map of (type => seconds) + $wait = 'wait'; // block to read backoffs the first time + + $group = JobQueueGroup::singleton(); + $stats = MediaWikiServices::getInstance()->getStatsdDataFactory(); + $jobsPopped = 0; + $timeMsTotal = 0; + $startTime = microtime( true ); // time since jobs started running + $lastCheckTime = 1; // timestamp of last replica DB check + do { + // Sync the persistent backoffs with concurrent runners + $backoffs = $this->syncBackoffDeltas( $backoffs, $backoffDeltas, $wait ); + $blacklist = $noThrottle ? [] : array_keys( $backoffs ); + $wait = 'nowait'; // less important now + + if ( $type === false ) { + $job = $group->pop( + JobQueueGroup::TYPE_DEFAULT, + JobQueueGroup::USE_CACHE, + $blacklist + ); + } elseif ( in_array( $type, $blacklist ) ) { + $job = false; // requested queue in backoff state + } else { + $job = $group->pop( $type ); // job from a single queue + } + + if ( $job ) { // found a job + ++$jobsPopped; + $popTime = time(); + $jType = $job->getType(); + + WebRequest::overrideRequestId( $job->getRequestId() ); + + // Back off of certain jobs for a while (for throttling and for errors) + $ttw = $this->getBackoffTimeToWait( $job ); + if ( $ttw > 0 ) { + // Always add the delta for other runners in case the time running the + // job negated the backoff for each individually but not collectively. + $backoffDeltas[$jType] = isset( $backoffDeltas[$jType] ) + ? $backoffDeltas[$jType] + $ttw + : $ttw; + $backoffs = $this->syncBackoffDeltas( $backoffs, $backoffDeltas, $wait ); + } + + $info = $this->executeJob( $job, $lbFactory, $stats, $popTime ); + if ( $info['status'] !== false || !$job->allowRetries() ) { + $group->ack( $job ); // succeeded or job cannot be retried + } + + // Back off of certain jobs for a while (for throttling and for errors) + if ( $info['status'] === false && mt_rand( 0, 49 ) == 0 ) { + $ttw = max( $ttw, $this->getErrorBackoffTTL( $info['error'] ) ); + $backoffDeltas[$jType] = isset( $backoffDeltas[$jType] ) + ? $backoffDeltas[$jType] + $ttw + : $ttw; + } + + $response['jobs'][] = [ + 'type' => $jType, + 'status' => ( $info['status'] === false ) ? 'failed' : 'ok', + 'error' => $info['error'], + 'time' => $info['timeMs'] + ]; + $timeMsTotal += $info['timeMs']; + + // Break out if we hit the job count or wall time limits... + if ( $maxJobs && $jobsPopped >= $maxJobs ) { + $response['reached'] = 'job-limit'; + break; + } elseif ( $maxTime && ( microtime( true ) - $startTime ) > $maxTime ) { + $response['reached'] = 'time-limit'; + break; + } + + // Don't let any of the main DB replica DBs get backed up. + // This only waits for so long before exiting and letting + // other wikis in the farm (on different masters) get a chance. + $timePassed = microtime( true ) - $lastCheckTime; + if ( $timePassed >= self::LAG_CHECK_PERIOD || $timePassed < 0 ) { + try { + $lbFactory->waitForReplication( [ + 'ifWritesSince' => $lastCheckTime, + 'timeout' => self::MAX_ALLOWED_LAG + ] ); + } catch ( DBReplicationWaitError $e ) { + $response['reached'] = 'replica-lag-limit'; + break; + } + $lastCheckTime = microtime( true ); + } + // Don't let any queue replica DBs/backups fall behind + if ( $jobsPopped > 0 && ( $jobsPopped % 100 ) == 0 ) { + $group->waitForBackups(); + } + + // Bail if near-OOM instead of in a job + if ( !$this->checkMemoryOK() ) { + $response['reached'] = 'memory-limit'; + break; + } + } + } while ( $job ); // stop when there are no jobs + + // Sync the persistent backoffs for the next runJobs.php pass + if ( $backoffDeltas ) { + $this->syncBackoffDeltas( $backoffs, $backoffDeltas, 'wait' ); + } + + $response['backoffs'] = $backoffs; + $response['elapsed'] = $timeMsTotal; + + return $response; + } + + /** + * @param string $error + * @return int TTL in seconds + */ + private function getErrorBackoffTTL( $error ) { + return strpos( $error, 'DBReadOnlyError' ) !== false + ? self::READONLY_BACKOFF_TTL + : self::ERROR_BACKOFF_TTL; + } + + /** + * @param Job $job + * @param LBFactory $lbFactory + * @param StatsdDataFactory $stats + * @param float $popTime + * @return array Map of status/error/timeMs + */ + private function executeJob( Job $job, LBFactory $lbFactory, $stats, $popTime ) { + $jType = $job->getType(); + $msg = $job->toString() . " STARTING"; + $this->logger->debug( $msg, [ + 'job_type' => $job->getType(), + ] ); + $this->debugCallback( $msg ); + + // Run the job... + $rssStart = $this->getMaxRssKb(); + $jobStartTime = microtime( true ); + try { + $fnameTrxOwner = get_class( $job ) . '::run'; // give run() outer scope + if ( !$job->hasExecutionFlag( $job::JOB_NO_EXPLICIT_TRX_ROUND ) ) { + $lbFactory->beginMasterChanges( $fnameTrxOwner ); + } + $status = $job->run(); + $error = $job->getLastError(); + $this->commitMasterChanges( $lbFactory, $job, $fnameTrxOwner ); + // Important: this must be the last deferred update added (T100085, T154425) + DeferredUpdates::addCallableUpdate( [ JobQueueGroup::class, 'pushLazyJobs' ] ); + // Run any deferred update tasks; doUpdates() manages transactions itself + DeferredUpdates::doUpdates(); + } catch ( Exception $e ) { + MWExceptionHandler::rollbackMasterChangesAndLog( $e ); + $status = false; + $error = get_class( $e ) . ': ' . $e->getMessage(); + } + // Always attempt to call teardown() even if Job throws exception. + try { + $job->teardown( $status ); + } catch ( Exception $e ) { + MWExceptionHandler::logException( $e ); + } + + // Commit all outstanding connections that are in a transaction + // to get a fresh repeatable read snapshot on every connection. + // Note that jobs are still responsible for handling replica DB lag. + $lbFactory->flushReplicaSnapshots( __METHOD__ ); + // Clear out title cache data from prior snapshots + MediaWikiServices::getInstance()->getLinkCache()->clear(); + $timeMs = intval( ( microtime( true ) - $jobStartTime ) * 1000 ); + $rssEnd = $this->getMaxRssKb(); + + // Record how long jobs wait before getting popped + $readyTs = $job->getReadyTimestamp(); + if ( $readyTs ) { + $pickupDelay = max( 0, $popTime - $readyTs ); + $stats->timing( 'jobqueue.pickup_delay.all', 1000 * $pickupDelay ); + $stats->timing( "jobqueue.pickup_delay.$jType", 1000 * $pickupDelay ); + } + // Record root job age for jobs being run + $rootTimestamp = $job->getRootJobParams()['rootJobTimestamp']; + if ( $rootTimestamp ) { + $age = max( 0, $popTime - wfTimestamp( TS_UNIX, $rootTimestamp ) ); + $stats->timing( "jobqueue.pickup_root_age.$jType", 1000 * $age ); + } + // Track the execution time for jobs + $stats->timing( "jobqueue.run.$jType", $timeMs ); + // Track RSS increases for jobs (in case of memory leaks) + if ( $rssStart && $rssEnd ) { + $stats->updateCount( "jobqueue.rss_delta.$jType", $rssEnd - $rssStart ); + } + + if ( $status === false ) { + $msg = $job->toString() . " t={job_duration} error={job_error}"; + $this->logger->error( $msg, [ + 'job_type' => $job->getType(), + 'job_duration' => $timeMs, + 'job_error' => $error, + ] ); + + $msg = $job->toString() . " t=$timeMs error={$error}"; + $this->debugCallback( $msg ); + } else { + $msg = $job->toString() . " t={job_duration} good"; + $this->logger->info( $msg, [ + 'job_type' => $job->getType(), + 'job_duration' => $timeMs, + ] ); + + $msg = $job->toString() . " t=$timeMs good"; + $this->debugCallback( $msg ); + } + + return [ 'status' => $status, 'error' => $error, 'timeMs' => $timeMs ]; + } + + /** + * @return int|null Max memory RSS in kilobytes + */ + private function getMaxRssKb() { + $info = wfGetRusage() ?: []; + // see https://linux.die.net/man/2/getrusage + return isset( $info['ru_maxrss'] ) ? (int)$info['ru_maxrss'] : null; + } + + /** + * @param Job $job + * @return int Seconds for this runner to avoid doing more jobs of this type + * @see $wgJobBackoffThrottling + */ + private function getBackoffTimeToWait( Job $job ) { + $throttling = $this->config->get( 'JobBackoffThrottling' ); + + if ( !isset( $throttling[$job->getType()] ) || $job instanceof DuplicateJob ) { + return 0; // not throttled + } + + $itemsPerSecond = $throttling[$job->getType()]; + if ( $itemsPerSecond <= 0 ) { + return 0; // not throttled + } + + $seconds = 0; + if ( $job->workItemCount() > 0 ) { + $exactSeconds = $job->workItemCount() / $itemsPerSecond; + // use randomized rounding + $seconds = floor( $exactSeconds ); + $remainder = $exactSeconds - $seconds; + $seconds += ( mt_rand() / mt_getrandmax() < $remainder ) ? 1 : 0; + } + + return (int)$seconds; + } + + /** + * Get the previous backoff expiries from persistent storage + * On I/O or lock acquisition failure this returns the original $backoffs. + * + * @param array $backoffs Map of (job type => UNIX timestamp) + * @param string $mode Lock wait mode - "wait" or "nowait" + * @return array Map of (job type => backoff expiry timestamp) + */ + private function loadBackoffs( array $backoffs, $mode = 'wait' ) { + $file = wfTempDir() . '/mw-runJobs-backoffs.json'; + if ( is_file( $file ) ) { + $noblock = ( $mode === 'nowait' ) ? LOCK_NB : 0; + $handle = fopen( $file, 'rb' ); + if ( !flock( $handle, LOCK_SH | $noblock ) ) { + fclose( $handle ); + return $backoffs; // don't wait on lock + } + $content = stream_get_contents( $handle ); + flock( $handle, LOCK_UN ); + fclose( $handle ); + $ctime = microtime( true ); + $cBackoffs = json_decode( $content, true ) ?: []; + foreach ( $cBackoffs as $type => $timestamp ) { + if ( $timestamp < $ctime ) { + unset( $cBackoffs[$type] ); + } + } + } else { + $cBackoffs = []; + } + + return $cBackoffs; + } + + /** + * Merge the current backoff expiries from persistent storage + * + * The $deltas map is set to an empty array on success. + * On I/O or lock acquisition failure this returns the original $backoffs. + * + * @param array $backoffs Map of (job type => UNIX timestamp) + * @param array $deltas Map of (job type => seconds) + * @param string $mode Lock wait mode - "wait" or "nowait" + * @return array The new backoffs account for $backoffs and the latest file data + */ + private function syncBackoffDeltas( array $backoffs, array &$deltas, $mode = 'wait' ) { + if ( !$deltas ) { + return $this->loadBackoffs( $backoffs, $mode ); + } + + $noblock = ( $mode === 'nowait' ) ? LOCK_NB : 0; + $file = wfTempDir() . '/mw-runJobs-backoffs.json'; + $handle = fopen( $file, 'wb+' ); + if ( !flock( $handle, LOCK_EX | $noblock ) ) { + fclose( $handle ); + return $backoffs; // don't wait on lock + } + $ctime = microtime( true ); + $content = stream_get_contents( $handle ); + $cBackoffs = json_decode( $content, true ) ?: []; + foreach ( $deltas as $type => $seconds ) { + $cBackoffs[$type] = isset( $cBackoffs[$type] ) && $cBackoffs[$type] >= $ctime + ? $cBackoffs[$type] + $seconds + : $ctime + $seconds; + } + foreach ( $cBackoffs as $type => $timestamp ) { + if ( $timestamp < $ctime ) { + unset( $cBackoffs[$type] ); + } + } + ftruncate( $handle, 0 ); + fwrite( $handle, json_encode( $cBackoffs ) ); + flock( $handle, LOCK_UN ); + fclose( $handle ); + + $deltas = []; + + return $cBackoffs; + } + + /** + * Make sure that this script is not too close to the memory usage limit. + * It is better to die in between jobs than OOM right in the middle of one. + * @return bool + */ + private function checkMemoryOK() { + static $maxBytes = null; + if ( $maxBytes === null ) { + $m = []; + if ( preg_match( '!^(\d+)(k|m|g|)$!i', ini_get( 'memory_limit' ), $m ) ) { + list( , $num, $unit ) = $m; + $conv = [ 'g' => 1073741824, 'm' => 1048576, 'k' => 1024, '' => 1 ]; + $maxBytes = $num * $conv[strtolower( $unit )]; + } else { + $maxBytes = 0; + } + } + $usedBytes = memory_get_usage(); + if ( $maxBytes && $usedBytes >= 0.95 * $maxBytes ) { + $msg = "Detected excessive memory usage ({used_bytes}/{max_bytes})."; + $this->logger->error( $msg, [ + 'used_bytes' => $usedBytes, + 'max_bytes' => $maxBytes, + ] ); + + $msg = "Detected excessive memory usage ($usedBytes/$maxBytes)."; + $this->debugCallback( $msg ); + + return false; + } + + return true; + } + + /** + * Log the job message + * @param string $msg The message to log + */ + private function debugCallback( $msg ) { + if ( $this->debug ) { + call_user_func_array( $this->debug, [ wfTimestamp( TS_DB ) . " $msg\n" ] ); + } + } + + /** + * Issue a commit on all masters who are currently in a transaction and have + * made changes to the database. It also supports sometimes waiting for the + * local wiki's replica DBs to catch up. See the documentation for + * $wgJobSerialCommitThreshold for more. + * + * @param LBFactory $lbFactory + * @param Job $job + * @param string $fnameTrxOwner + * @throws DBError + */ + private function commitMasterChanges( LBFactory $lbFactory, Job $job, $fnameTrxOwner ) { + $syncThreshold = $this->config->get( 'JobSerialCommitThreshold' ); + + $time = false; + $lb = $lbFactory->getMainLB( wfWikiID() ); + if ( $syncThreshold !== false && $lb->getServerCount() > 1 ) { + // Generally, there is one master connection to the local DB + $dbwSerial = $lb->getAnyOpenConnection( $lb->getWriterIndex() ); + // We need natively blocking fast locks + if ( $dbwSerial && $dbwSerial->namedLocksEnqueue() ) { + $time = $dbwSerial->pendingWriteQueryDuration( $dbwSerial::ESTIMATE_DB_APPLY ); + if ( $time < $syncThreshold ) { + $dbwSerial = false; + } + } else { + $dbwSerial = false; + } + } else { + // There are no replica DBs or writes are all to foreign DB (we don't handle that) + $dbwSerial = false; + } + + if ( !$dbwSerial ) { + $lbFactory->commitMasterChanges( + $fnameTrxOwner, + // Abort if any transaction was too big + [ 'maxWriteDuration' => $this->config->get( 'MaxJobDBWriteDuration' ) ] + ); + + return; + } + + $ms = intval( 1000 * $time ); + + $msg = $job->toString() . " COMMIT ENQUEUED [{job_commit_write_ms}ms of writes]"; + $this->logger->info( $msg, [ + 'job_type' => $job->getType(), + 'job_commit_write_ms' => $ms, + ] ); + + $msg = $job->toString() . " COMMIT ENQUEUED [{$ms}ms of writes]"; + $this->debugCallback( $msg ); + + // Wait for an exclusive lock to commit + if ( !$dbwSerial->lock( 'jobrunner-serial-commit', __METHOD__, 30 ) ) { + // This will trigger a rollback in the main loop + throw new DBError( $dbwSerial, "Timed out waiting on commit queue." ); + } + $unlocker = new ScopedCallback( function () use ( $dbwSerial ) { + $dbwSerial->unlock( 'jobrunner-serial-commit', __METHOD__ ); + } ); + + // Wait for the replica DBs to catch up + $pos = $lb->getMasterPos(); + if ( $pos ) { + $lb->waitForAll( $pos ); + } + + // Actually commit the DB master changes + $lbFactory->commitMasterChanges( + $fnameTrxOwner, + // Abort if any transaction was too big + [ 'maxWriteDuration' => $this->config->get( 'MaxJobDBWriteDuration' ) ] + ); + ScopedCallback::consume( $unlocker ); + } +} diff --git a/www/wiki/includes/jobqueue/JobSpecification.php b/www/wiki/includes/jobqueue/JobSpecification.php new file mode 100644 index 00000000..b62b83c6 --- /dev/null +++ b/www/wiki/includes/jobqueue/JobSpecification.php @@ -0,0 +1,233 @@ +<?php +/** + * Job queue task description base code. + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with this program; if not, write to the Free Software Foundation, Inc., + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + * http://www.gnu.org/copyleft/gpl.html + * + * @file + */ + +/** + * Job queue task description interface + * + * @ingroup JobQueue + * @since 1.23 + */ +interface IJobSpecification { + /** + * @return string Job type + */ + public function getType(); + + /** + * @return array + */ + public function getParams(); + + /** + * @return int|null UNIX timestamp to delay running this job until, otherwise null + */ + public function getReleaseTimestamp(); + + /** + * @return bool Whether only one of each identical set of jobs should be run + */ + public function ignoreDuplicates(); + + /** + * Subclasses may need to override this to make duplication detection work. + * The resulting map conveys everything that makes the job unique. This is + * only checked if ignoreDuplicates() returns true, meaning that duplicate + * jobs are supposed to be ignored. + * + * @return array Map of key/values + */ + public function getDeduplicationInfo(); + + /** + * @see JobQueue::deduplicateRootJob() + * @return array + * @since 1.26 + */ + public function getRootJobParams(); + + /** + * @see JobQueue::deduplicateRootJob() + * @return bool + * @since 1.22 + */ + public function hasRootJobParams(); + + /** + * @see JobQueue::deduplicateRootJob() + * @return bool Whether this is job is a root job + */ + public function isRootJob(); + + /** + * @return Title Descriptive title (this can simply be informative) + */ + public function getTitle(); +} + +/** + * Job queue task description base code + * + * Example usage: + * @code + * $job = new JobSpecification( + * 'null', + * array( 'lives' => 1, 'usleep' => 100, 'pi' => 3.141569 ), + * array( 'removeDuplicates' => 1 ), + * Title::makeTitle( NS_SPECIAL, 'nullity' ) + * ); + * JobQueueGroup::singleton()->push( $job ) + * @endcode + * + * @ingroup JobQueue + * @since 1.23 + */ +class JobSpecification implements IJobSpecification { + /** @var string */ + protected $type; + + /** @var array Array of job parameters or false if none */ + protected $params; + + /** @var Title */ + protected $title; + + /** @var array */ + protected $opts; + + /** + * @param string $type + * @param array $params Map of key/values + * @param array $opts Map of key/values; includes 'removeDuplicates' + * @param Title $title Optional descriptive title + */ + public function __construct( + $type, array $params, array $opts = [], Title $title = null + ) { + $this->validateParams( $params ); + $this->validateParams( $opts ); + + $this->type = $type; + $this->params = $params; + $this->title = $title ?: Title::makeTitle( NS_SPECIAL, 'Badtitle/' . static::class ); + $this->opts = $opts; + } + + /** + * @param array $params + */ + protected function validateParams( array $params ) { + foreach ( $params as $p => $v ) { + if ( is_array( $v ) ) { + $this->validateParams( $v ); + } elseif ( !is_scalar( $v ) && $v !== null ) { + throw new UnexpectedValueException( "Job parameter $p is not JSON serializable." ); + } + } + } + + public function getType() { + return $this->type; + } + + public function getTitle() { + return $this->title; + } + + public function getParams() { + return $this->params; + } + + public function getReleaseTimestamp() { + return isset( $this->params['jobReleaseTimestamp'] ) + ? wfTimestampOrNull( TS_UNIX, $this->params['jobReleaseTimestamp'] ) + : null; + } + + public function ignoreDuplicates() { + return !empty( $this->opts['removeDuplicates'] ); + } + + public function getDeduplicationInfo() { + $info = [ + 'type' => $this->getType(), + 'namespace' => $this->getTitle()->getNamespace(), + 'title' => $this->getTitle()->getDBkey(), + 'params' => $this->getParams() + ]; + if ( is_array( $info['params'] ) ) { + // Identical jobs with different "root" jobs should count as duplicates + unset( $info['params']['rootJobSignature'] ); + unset( $info['params']['rootJobTimestamp'] ); + // Likewise for jobs with different delay times + unset( $info['params']['jobReleaseTimestamp'] ); + } + + return $info; + } + + public function getRootJobParams() { + return [ + 'rootJobSignature' => isset( $this->params['rootJobSignature'] ) + ? $this->params['rootJobSignature'] + : null, + 'rootJobTimestamp' => isset( $this->params['rootJobTimestamp'] ) + ? $this->params['rootJobTimestamp'] + : null + ]; + } + + public function hasRootJobParams() { + return isset( $this->params['rootJobSignature'] ) + && isset( $this->params['rootJobTimestamp'] ); + } + + public function isRootJob() { + return $this->hasRootJobParams() && !empty( $this->params['rootJobIsSelf'] ); + } + + /** + * @return array Field/value map that can immediately be serialized + * @since 1.25 + */ + public function toSerializableArray() { + return [ + 'type' => $this->type, + 'params' => $this->params, + 'opts' => $this->opts, + 'title' => [ + 'ns' => $this->title->getNamespace(), + 'key' => $this->title->getDBkey() + ] + ]; + } + + /** + * @param array $map Field/value map + * @return JobSpecification + * @since 1.25 + */ + public static function newFromArray( array $map ) { + $title = Title::makeTitle( $map['title']['ns'], $map['title']['key'] ); + + return new self( $map['type'], $map['params'], $map['opts'], $title ); + } +} diff --git a/www/wiki/includes/jobqueue/README b/www/wiki/includes/jobqueue/README new file mode 100644 index 00000000..2073b0f8 --- /dev/null +++ b/www/wiki/includes/jobqueue/README @@ -0,0 +1,80 @@ +/*! +\ingroup JobQueue +\page jobqueue_design Job queue design + +Notes on the Job queuing system architecture. + +\section intro Introduction + +The data model consist of the following main components: +* The Job object represents a particular deferred task that happens in the + background. All jobs subclass the Job object and put the main logic in the + function called run(). +* The JobQueue object represents a particular queue of jobs of a certain type. + For example there may be a queue for email jobs and a queue for CDN purge + jobs. + +\section jobqueue Job queues + +Each job type has its own queue and is associated to a storage medium. One +queue might save its jobs in redis while another one uses would use a database. + +Storage medium are defined in a queue class. Before using it, you must +define in $wgJobTypeConf a mapping of the job type to a queue class. + +The factory class JobQueueGroup provides helper functions: +- getting the queue for a given job +- route new job insertions to the proper queue + +The following queue classes are available: +* JobQueueDB (stores jobs in the `job` table in a database) +* JobQueueRedis (stores jobs in a redis server) + +All queue classes support some basic operations (though some may be no-ops): +* enqueueing a batch of jobs +* dequeueing a single job +* acknowledging a job is completed +* checking if the queue is empty + +Some queue classes (like JobQueueDB) may dequeue jobs in random order while other +queues might dequeue jobs in exact FIFO order. Callers should thus not assume jobs +are executed in FIFO order. + +Also note that not all queue classes will have the same reliability guarantees. +In-memory queues may lose data when restarted depending on snapshot and journal +settings (including journal fsync() frequency). Some queue types may totally remove +jobs when dequeued while leaving the ack() function as a no-op; if a job is +dequeued by a job runner, which crashes before completion, the job will be +lost. Some jobs, like purging CDN caches after a template change, may not +require durable queues, whereas other jobs might be more important. + +\section aggregator Job queue aggregator + +The aggregators are used by nextJobDB.php, which is a script that will return a +random ready queue (on any wiki in the farm) that can be used with runJobs.php. +This can be used in conjunction with any scripts that handle wiki farm job queues. +Note that $wgLocalDatabases defines what wikis are in the wiki farm. + +Since each job type has its own queue, and wiki-farms may have many wikis, +there might be a large number of queues to keep track of. To avoid wasting +large amounts of time polling empty queues, aggregators exists to keep track +of which queues are ready. + +The following queue aggregator classes are available: +* JobQueueAggregatorRedis (uses a redis server to track ready queues) + +Some aggregators cache data for a few minutes while others may be always up to date. +This can be an important factor for jobs that need a low pickup time (or latency). + +\section jobs Jobs + +Callers should also try to make jobs maintain correctness when executed twice. +This is useful for queues that actually implement ack(), since they may recycle +dequeued but un-acknowledged jobs back into the queue to be attempted again. If +a runner dequeues a job, runs it, but then crashes before calling ack(), the +job may be returned to the queue and run a second time. Jobs like cache purging can +happen several times without any correctness problems. However, a pathological case +would be if a bug causes the problem to systematically keep repeating. For example, +a job may always throw a DB error at the end of run(). This problem is trickier to +solve and more obnoxious for things like email jobs, for example. For such jobs, +it might be useful to use a queue that does not retry jobs. diff --git a/www/wiki/includes/jobqueue/aggregator/JobQueueAggregator.php b/www/wiki/includes/jobqueue/aggregator/JobQueueAggregator.php new file mode 100644 index 00000000..433de93a --- /dev/null +++ b/www/wiki/includes/jobqueue/aggregator/JobQueueAggregator.php @@ -0,0 +1,180 @@ +<?php +/** + * Job queue aggregator code. + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with this program; if not, write to the Free Software Foundation, Inc., + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + * http://www.gnu.org/copyleft/gpl.html + * + * @file + */ + +/** + * Class to handle tracking information about all queues + * + * @ingroup JobQueue + * @since 1.21 + */ +abstract class JobQueueAggregator { + /** @var JobQueueAggregator */ + protected static $instance = null; + + /** + * @param array $params + */ + public function __construct( array $params ) { + } + + /** + * @throws MWException + * @return JobQueueAggregator + */ + final public static function singleton() { + global $wgJobQueueAggregator; + + if ( !isset( self::$instance ) ) { + $class = $wgJobQueueAggregator['class']; + $obj = new $class( $wgJobQueueAggregator ); + if ( !( $obj instanceof JobQueueAggregator ) ) { + throw new MWException( "Class '$class' is not a JobQueueAggregator class." ); + } + self::$instance = $obj; + } + + return self::$instance; + } + + /** + * Destroy the singleton instance + * + * @return void + */ + final public static function destroySingleton() { + self::$instance = null; + } + + /** + * Mark a queue as being empty + * + * @param string $wiki + * @param string $type + * @return bool Success + */ + final public function notifyQueueEmpty( $wiki, $type ) { + $ok = $this->doNotifyQueueEmpty( $wiki, $type ); + + return $ok; + } + + /** + * @see JobQueueAggregator::notifyQueueEmpty() + * @param string $wiki + * @param string $type + * @return bool + */ + abstract protected function doNotifyQueueEmpty( $wiki, $type ); + + /** + * Mark a queue as being non-empty + * + * @param string $wiki + * @param string $type + * @return bool Success + */ + final public function notifyQueueNonEmpty( $wiki, $type ) { + $ok = $this->doNotifyQueueNonEmpty( $wiki, $type ); + + return $ok; + } + + /** + * @see JobQueueAggregator::notifyQueueNonEmpty() + * @param string $wiki + * @param string $type + * @return bool + */ + abstract protected function doNotifyQueueNonEmpty( $wiki, $type ); + + /** + * Get the list of all of the queues with jobs + * + * @return array (job type => (list of wiki IDs)) + */ + final public function getAllReadyWikiQueues() { + $res = $this->doGetAllReadyWikiQueues(); + + return $res; + } + + /** + * @see JobQueueAggregator::getAllReadyWikiQueues() + */ + abstract protected function doGetAllReadyWikiQueues(); + + /** + * Purge all of the aggregator information + * + * @return bool Success + */ + final public function purge() { + $res = $this->doPurge(); + + return $res; + } + + /** + * @see JobQueueAggregator::purge() + */ + abstract protected function doPurge(); + + /** + * Get all databases that have a pending job. + * This poll all the queues and is this expensive. + * + * @return array (job type => (list of wiki IDs)) + */ + protected function findPendingWikiQueues() { + global $wgLocalDatabases; + + $pendingDBs = []; // (job type => (db list)) + foreach ( $wgLocalDatabases as $db ) { + foreach ( JobQueueGroup::singleton( $db )->getQueuesWithJobs() as $type ) { + $pendingDBs[$type][] = $db; + } + } + + return $pendingDBs; + } +} + +/** + * @ingroup JobQueue + */ +class JobQueueAggregatorNull extends JobQueueAggregator { + protected function doNotifyQueueEmpty( $wiki, $type ) { + return true; + } + + protected function doNotifyQueueNonEmpty( $wiki, $type ) { + return true; + } + + protected function doGetAllReadyWikiQueues() { + return []; + } + + protected function doPurge() { + return true; + } +} diff --git a/www/wiki/includes/jobqueue/aggregator/JobQueueAggregatorRedis.php b/www/wiki/includes/jobqueue/aggregator/JobQueueAggregatorRedis.php new file mode 100644 index 00000000..db07086f --- /dev/null +++ b/www/wiki/includes/jobqueue/aggregator/JobQueueAggregatorRedis.php @@ -0,0 +1,135 @@ +<?php +/** + * Job queue aggregator code that uses PhpRedis. + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with this program; if not, write to the Free Software Foundation, Inc., + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + * http://www.gnu.org/copyleft/gpl.html + * + * @file + */ +use Psr\Log\LoggerInterface; + +/** + * Class to handle tracking information about all queues using PhpRedis + * + * The mediawiki/services/jobrunner background service must be set up and running. + * + * @ingroup JobQueue + * @ingroup Redis + * @since 1.21 + */ +class JobQueueAggregatorRedis extends JobQueueAggregator { + /** @var RedisConnectionPool */ + protected $redisPool; + /** @var LoggerInterface */ + protected $logger; + /** @var array List of Redis server addresses */ + protected $servers; + + /** + * @param array $params Possible keys: + * - redisConfig : An array of parameters to RedisConnectionPool::__construct(). + * - redisServers : Array of server entries, the first being the primary and the + * others being fallback servers. Each entry is either a hostname/port + * combination or the absolute path of a UNIX socket. + * If a hostname is specified but no port, the standard port number + * 6379 will be used. Required. + */ + public function __construct( array $params ) { + parent::__construct( $params ); + $this->servers = isset( $params['redisServers'] ) + ? $params['redisServers'] + : [ $params['redisServer'] ]; // b/c + $params['redisConfig']['serializer'] = 'none'; + $this->redisPool = RedisConnectionPool::singleton( $params['redisConfig'] ); + $this->logger = \MediaWiki\Logger\LoggerFactory::getInstance( 'redis' ); + } + + protected function doNotifyQueueEmpty( $wiki, $type ) { + return true; // managed by the service + } + + protected function doNotifyQueueNonEmpty( $wiki, $type ) { + return true; // managed by the service + } + + protected function doGetAllReadyWikiQueues() { + $conn = $this->getConnection(); + if ( !$conn ) { + return []; + } + try { + $map = $conn->hGetAll( $this->getReadyQueueKey() ); + + if ( is_array( $map ) && isset( $map['_epoch'] ) ) { + unset( $map['_epoch'] ); // ignore + $pendingDBs = []; // (type => list of wikis) + foreach ( $map as $key => $time ) { + list( $type, $wiki ) = $this->decodeQueueName( $key ); + $pendingDBs[$type][] = $wiki; + } + } else { + throw new UnexpectedValueException( + "No queue listing found; make sure redisJobChronService is running." + ); + } + + return $pendingDBs; + } catch ( RedisException $e ) { + $this->redisPool->handleError( $conn, $e ); + + return []; + } + } + + protected function doPurge() { + return true; // fully and only refreshed by the service + } + + /** + * Get a connection to the server that handles all sub-queues for this queue + * + * @return RedisConnRef|bool Returns false on failure + * @throws MWException + */ + protected function getConnection() { + $conn = false; + foreach ( $this->servers as $server ) { + $conn = $this->redisPool->getConnection( $server, $this->logger ); + if ( $conn ) { + break; + } + } + + return $conn; + } + + /** + * @return string + */ + private function getReadyQueueKey() { + return "jobqueue:aggregator:h-ready-queues:v2"; // global + } + + /** + * @param string $name + * @return string[] + */ + private function decodeQueueName( $name ) { + list( $type, $wiki ) = explode( '/', $name, 2 ); + + return [ rawurldecode( $type ), rawurldecode( $wiki ) ]; + } +} diff --git a/www/wiki/includes/jobqueue/jobs/ActivityUpdateJob.php b/www/wiki/includes/jobqueue/jobs/ActivityUpdateJob.php new file mode 100644 index 00000000..8cc14e51 --- /dev/null +++ b/www/wiki/includes/jobqueue/jobs/ActivityUpdateJob.php @@ -0,0 +1,82 @@ +<?php +/** + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with this program; if not, write to the Free Software Foundation, Inc., + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + * http://www.gnu.org/copyleft/gpl.html + * + * @file + * @ingroup JobQueue + */ + +/** + * Job for updating user activity like "last viewed" timestamps + * + * Job parameters include: + * - type: one of (updateWatchlistNotification) [required] + * - userid: affected user ID [required] + * - notifTime: timestamp to set watchlist entries to [required] + * - curTime: UNIX timestamp of the event that triggered this job [required] + * + * @ingroup JobQueue + * @since 1.26 + */ +class ActivityUpdateJob extends Job { + function __construct( Title $title, array $params ) { + parent::__construct( 'activityUpdateJob', $title, $params ); + + static $required = [ 'type', 'userid', 'notifTime', 'curTime' ]; + $missing = implode( ', ', array_diff( $required, array_keys( $this->params ) ) ); + if ( $missing != '' ) { + throw new InvalidArgumentException( "Missing paramter(s) $missing" ); + } + + $this->removeDuplicates = true; + } + + public function run() { + if ( $this->params['type'] === 'updateWatchlistNotification' ) { + $this->updateWatchlistNotification(); + } else { + throw new InvalidArgumentException( "Invalid 'type' '{$this->params['type']}'." ); + } + + return true; + } + + protected function updateWatchlistNotification() { + $casTimestamp = ( $this->params['notifTime'] !== null ) + ? $this->params['notifTime'] + : $this->params['curTime']; + + $dbw = wfGetDB( DB_MASTER ); + $dbw->update( 'watchlist', + [ + 'wl_notificationtimestamp' => $dbw->timestampOrNull( $this->params['notifTime'] ) + ], + [ + 'wl_user' => $this->params['userid'], + 'wl_namespace' => $this->title->getNamespace(), + 'wl_title' => $this->title->getDBkey(), + // Add a "check and set" style comparison to handle conflicts. + // The inequality always avoids updates when the current value + // is already NULL per ANSI SQL. This is desired since NULL means + // that the user is "caught up" on edits already. When the field + // is non-NULL, make sure not to set it back in time or set it to + // NULL when newer revisions were in fact added to the page. + 'wl_notificationtimestamp < ' . $dbw->addQuotes( $dbw->timestamp( $casTimestamp ) ) + ], + __METHOD__ + ); + } +} diff --git a/www/wiki/includes/jobqueue/jobs/AssembleUploadChunksJob.php b/www/wiki/includes/jobqueue/jobs/AssembleUploadChunksJob.php new file mode 100644 index 00000000..e2914be5 --- /dev/null +++ b/www/wiki/includes/jobqueue/jobs/AssembleUploadChunksJob.php @@ -0,0 +1,138 @@ +<?php +/** + * Assemble the segments of a chunked upload. + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with this program; if not, write to the Free Software Foundation, Inc., + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + * http://www.gnu.org/copyleft/gpl.html + * + * @file + * @ingroup Upload + */ +use Wikimedia\ScopedCallback; + +/** + * Assemble the segments of a chunked upload. + * + * @ingroup Upload + */ +class AssembleUploadChunksJob extends Job { + public function __construct( Title $title, array $params ) { + parent::__construct( 'AssembleUploadChunks', $title, $params ); + $this->removeDuplicates = true; + } + + public function run() { + $scope = RequestContext::importScopedSession( $this->params['session'] ); + $this->addTeardownCallback( function () use ( &$scope ) { + ScopedCallback::consume( $scope ); // T126450 + } ); + + $context = RequestContext::getMain(); + $user = $context->getUser(); + try { + if ( !$user->isLoggedIn() ) { + $this->setLastError( "Could not load the author user from session." ); + + return false; + } + + UploadBase::setSessionStatus( + $user, + $this->params['filekey'], + [ 'result' => 'Poll', 'stage' => 'assembling', 'status' => Status::newGood() ] + ); + + $upload = new UploadFromChunks( $user ); + $upload->continueChunks( + $this->params['filename'], + $this->params['filekey'], + new WebRequestUpload( $context->getRequest(), 'null' ) + ); + + // Combine all of the chunks into a local file and upload that to a new stash file + $status = $upload->concatenateChunks(); + if ( !$status->isGood() ) { + UploadBase::setSessionStatus( + $user, + $this->params['filekey'], + [ 'result' => 'Failure', 'stage' => 'assembling', 'status' => $status ] + ); + $this->setLastError( $status->getWikiText( false, false, 'en' ) ); + + return false; + } + + // We can only get warnings like 'duplicate' after concatenating the chunks + $status = Status::newGood(); + $status->value = [ 'warnings' => $upload->checkWarnings() ]; + + // We have a new filekey for the fully concatenated file + $newFileKey = $upload->getStashFile()->getFileKey(); + + // Remove the old stash file row and first chunk file + $upload->stash->removeFileNoAuth( $this->params['filekey'] ); + + // Build the image info array while we have the local reference handy + $apiMain = new ApiMain(); // dummy object (XXX) + $imageInfo = $upload->getImageInfo( $apiMain->getResult() ); + + // Cleanup any temporary local file + $upload->cleanupTempFile(); + + // Cache the info so the user doesn't have to wait forever to get the final info + UploadBase::setSessionStatus( + $user, + $this->params['filekey'], + [ + 'result' => 'Success', + 'stage' => 'assembling', + 'filekey' => $newFileKey, + 'imageinfo' => $imageInfo, + 'status' => $status + ] + ); + } catch ( Exception $e ) { + UploadBase::setSessionStatus( + $user, + $this->params['filekey'], + [ + 'result' => 'Failure', + 'stage' => 'assembling', + 'status' => Status::newFatal( 'api-error-stashfailed' ) + ] + ); + $this->setLastError( get_class( $e ) . ": " . $e->getMessage() ); + // To be extra robust. + MWExceptionHandler::rollbackMasterChangesAndLog( $e ); + + return false; + } + + return true; + } + + public function getDeduplicationInfo() { + $info = parent::getDeduplicationInfo(); + if ( is_array( $info['params'] ) ) { + $info['params'] = [ 'filekey' => $info['params']['filekey'] ]; + } + + return $info; + } + + public function allowRetries() { + return false; + } +} diff --git a/www/wiki/includes/jobqueue/jobs/CategoryMembershipChangeJob.php b/www/wiki/includes/jobqueue/jobs/CategoryMembershipChangeJob.php new file mode 100644 index 00000000..16640be4 --- /dev/null +++ b/www/wiki/includes/jobqueue/jobs/CategoryMembershipChangeJob.php @@ -0,0 +1,253 @@ +<?php +/** + * Updater for link tracking tables after a page edit. + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with this program; if not, write to the Free Software Foundation, Inc., + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + * http://www.gnu.org/copyleft/gpl.html + * + * @file + */ +use MediaWiki\MediaWikiServices; +use Wikimedia\Rdbms\LBFactory; + +/** + * Job to add recent change entries mentioning category membership changes + * + * This allows users to easily scan categories for recent page membership changes + * + * Parameters include: + * - pageId : page ID + * - revTimestamp : timestamp of the triggering revision + * + * Category changes will be mentioned for revisions at/after the timestamp for this page + * + * @since 1.27 + */ +class CategoryMembershipChangeJob extends Job { + /** @var int|null */ + private $ticket; + + const ENQUEUE_FUDGE_SEC = 60; + + public function __construct( Title $title, array $params ) { + parent::__construct( 'categoryMembershipChange', $title, $params ); + // Only need one job per page. Note that ENQUEUE_FUDGE_SEC handles races where an + // older revision job gets inserted while the newer revision job is de-duplicated. + $this->removeDuplicates = true; + } + + public function run() { + $lbFactory = MediaWikiServices::getInstance()->getDBLoadBalancerFactory(); + $lb = $lbFactory->getMainLB(); + $dbw = $lb->getConnection( DB_MASTER ); + + $this->ticket = $lbFactory->getEmptyTransactionTicket( __METHOD__ ); + + $page = WikiPage::newFromID( $this->params['pageId'], WikiPage::READ_LATEST ); + if ( !$page ) { + $this->setLastError( "Could not find page #{$this->params['pageId']}" ); + return false; // deleted? + } + + // Cut down on the time spent in safeWaitForMasterPos() in the critical section + $dbr = $lb->getConnection( DB_REPLICA, [ 'recentchanges' ] ); + if ( !$lb->safeWaitForMasterPos( $dbr ) ) { + $this->setLastError( "Timed out while pre-waiting for replica DB to catch up" ); + return false; + } + + // Use a named lock so that jobs for this page see each others' changes + $lockKey = "CategoryMembershipUpdates:{$page->getId()}"; + $scopedLock = $dbw->getScopedLockAndFlush( $lockKey, __METHOD__, 3 ); + if ( !$scopedLock ) { + $this->setLastError( "Could not acquire lock '$lockKey'" ); + return false; + } + + // Wait till replica DB is caught up so that jobs for this page see each others' changes + if ( !$lb->safeWaitForMasterPos( $dbr ) ) { + $this->setLastError( "Timed out while waiting for replica DB to catch up" ); + return false; + } + // Clear any stale REPEATABLE-READ snapshot + $dbr->flushSnapshot( __METHOD__ ); + + $cutoffUnix = wfTimestamp( TS_UNIX, $this->params['revTimestamp'] ); + // Using ENQUEUE_FUDGE_SEC handles jobs inserted out of revision order due to the delay + // between COMMIT and actual enqueueing of the CategoryMembershipChangeJob job. + $cutoffUnix -= self::ENQUEUE_FUDGE_SEC; + + // Get the newest page revision that has a SRC_CATEGORIZE row. + // Assume that category changes before it were already handled. + $row = $dbr->selectRow( + 'revision', + [ 'rev_timestamp', 'rev_id' ], + [ + 'rev_page' => $page->getId(), + 'rev_timestamp >= ' . $dbr->addQuotes( $dbr->timestamp( $cutoffUnix ) ), + 'EXISTS (' . $dbr->selectSQLText( + 'recentchanges', + '1', + [ + 'rc_this_oldid = rev_id', + 'rc_source' => RecentChange::SRC_CATEGORIZE, + // Allow rc_cur_id or rc_timestamp index usage + 'rc_cur_id = rev_page', + 'rc_timestamp = rev_timestamp' + ] + ) . ')' + ], + __METHOD__, + [ 'ORDER BY' => 'rev_timestamp DESC, rev_id DESC' ] + ); + // Only consider revisions newer than any such revision + if ( $row ) { + $cutoffUnix = wfTimestamp( TS_UNIX, $row->rev_timestamp ); + $lastRevId = (int)$row->rev_id; + } else { + $lastRevId = 0; + } + + // Find revisions to this page made around and after this revision which lack category + // notifications in recent changes. This lets jobs pick up were the last one left off. + $encCutoff = $dbr->addQuotes( $dbr->timestamp( $cutoffUnix ) ); + $revQuery = Revision::getQueryInfo(); + $res = $dbr->select( + $revQuery['tables'], + $revQuery['fields'], + [ + 'rev_page' => $page->getId(), + "rev_timestamp > $encCutoff" . + " OR (rev_timestamp = $encCutoff AND rev_id > $lastRevId)" + ], + __METHOD__, + [ 'ORDER BY' => 'rev_timestamp ASC, rev_id ASC' ], + $revQuery['joins'] + ); + + // Apply all category updates in revision timestamp order + foreach ( $res as $row ) { + $this->notifyUpdatesForRevision( $lbFactory, $page, Revision::newFromRow( $row ) ); + } + + return true; + } + + /** + * @param LBFactory $lbFactory + * @param WikiPage $page + * @param Revision $newRev + * @throws MWException + */ + protected function notifyUpdatesForRevision( + LBFactory $lbFactory, WikiPage $page, Revision $newRev + ) { + $config = RequestContext::getMain()->getConfig(); + $title = $page->getTitle(); + + // Get the new revision + if ( !$newRev->getContent() ) { + return; // deleted? + } + + // Get the prior revision (the same for null edits) + if ( $newRev->getParentId() ) { + $oldRev = Revision::newFromId( $newRev->getParentId(), Revision::READ_LATEST ); + if ( !$oldRev->getContent() ) { + return; // deleted? + } + } else { + $oldRev = null; + } + + // Parse the new revision and get the categories + $categoryChanges = $this->getExplicitCategoriesChanges( $title, $newRev, $oldRev ); + list( $categoryInserts, $categoryDeletes ) = $categoryChanges; + if ( !$categoryInserts && !$categoryDeletes ) { + return; // nothing to do + } + + $catMembChange = new CategoryMembershipChange( $title, $newRev ); + $catMembChange->checkTemplateLinks(); + + $batchSize = $config->get( 'UpdateRowsPerQuery' ); + $insertCount = 0; + + foreach ( $categoryInserts as $categoryName ) { + $categoryTitle = Title::makeTitle( NS_CATEGORY, $categoryName ); + $catMembChange->triggerCategoryAddedNotification( $categoryTitle ); + if ( $insertCount++ && ( $insertCount % $batchSize ) == 0 ) { + $lbFactory->commitAndWaitForReplication( __METHOD__, $this->ticket ); + } + } + + foreach ( $categoryDeletes as $categoryName ) { + $categoryTitle = Title::makeTitle( NS_CATEGORY, $categoryName ); + $catMembChange->triggerCategoryRemovedNotification( $categoryTitle ); + if ( $insertCount++ && ( $insertCount++ % $batchSize ) == 0 ) { + $lbFactory->commitAndWaitForReplication( __METHOD__, $this->ticket ); + } + } + } + + private function getExplicitCategoriesChanges( + Title $title, Revision $newRev, Revision $oldRev = null + ) { + // Inject the same timestamp for both revision parses to avoid seeing category changes + // due to time-based parser functions. Inject the same page title for the parses too. + // Note that REPEATABLE-READ makes template/file pages appear unchanged between parses. + $parseTimestamp = $newRev->getTimestamp(); + // Parse the old rev and get the categories. Do not use link tables as that + // assumes these updates are perfectly FIFO and that link tables are always + // up to date, neither of which are true. + $oldCategories = $oldRev + ? $this->getCategoriesAtRev( $title, $oldRev, $parseTimestamp ) + : []; + // Parse the new revision and get the categories + $newCategories = $this->getCategoriesAtRev( $title, $newRev, $parseTimestamp ); + + $categoryInserts = array_values( array_diff( $newCategories, $oldCategories ) ); + $categoryDeletes = array_values( array_diff( $oldCategories, $newCategories ) ); + + return [ $categoryInserts, $categoryDeletes ]; + } + + /** + * @param Title $title + * @param Revision $rev + * @param string $parseTimestamp TS_MW + * + * @return string[] category names + */ + private function getCategoriesAtRev( Title $title, Revision $rev, $parseTimestamp ) { + $content = $rev->getContent(); + $options = $content->getContentHandler()->makeParserOptions( 'canonical' ); + $options->setTimestamp( $parseTimestamp ); + // This could possibly use the parser cache if it checked the revision ID, + // but that's more complicated than it's worth. + $output = $content->getParserOutput( $title, $rev->getId(), $options ); + + // array keys will cast numeric category names to ints + // so we need to cast them back to strings to avoid breaking things! + return array_map( 'strval', array_keys( $output->getCategories() ) ); + } + + public function getDeduplicationInfo() { + $info = parent::getDeduplicationInfo(); + unset( $info['params']['revTimestamp'] ); // first job wins + + return $info; + } +} diff --git a/www/wiki/includes/jobqueue/jobs/CdnPurgeJob.php b/www/wiki/includes/jobqueue/jobs/CdnPurgeJob.php new file mode 100644 index 00000000..356eebab --- /dev/null +++ b/www/wiki/includes/jobqueue/jobs/CdnPurgeJob.php @@ -0,0 +1,46 @@ +<?php +/** + * Job to purge a set of URLs from CDN + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with this program; if not, write to the Free Software Foundation, Inc., + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + * http://www.gnu.org/copyleft/gpl.html + * + * @file + * @ingroup JobQueue + */ + +/** + * Job to purge a set of URLs from CDN + * + * @ingroup JobQueue + * @since 1.27 + */ +class CdnPurgeJob extends Job { + /** + * @param Title $title + * @param array $params Job parameters (urls) + */ + function __construct( Title $title, array $params ) { + parent::__construct( 'cdnPurge', $title, $params ); + $this->removeDuplicates = false; // delay semantics are critical + } + + public function run() { + // Use purge() directly to avoid infinite recursion + CdnCacheUpdate::purge( $this->params['urls'] ); + + return true; + } +} diff --git a/www/wiki/includes/jobqueue/jobs/ClearUserWatchlistJob.php b/www/wiki/includes/jobqueue/jobs/ClearUserWatchlistJob.php new file mode 100644 index 00000000..77adfa1a --- /dev/null +++ b/www/wiki/includes/jobqueue/jobs/ClearUserWatchlistJob.php @@ -0,0 +1,118 @@ +<?php + +use MediaWiki\MediaWikiServices; + +/** + * Job to clear a users watchlist in batches. + * + * @author Addshore + * + * @ingroup JobQueue + * @since 1.31 + */ +class ClearUserWatchlistJob extends Job { + + /** + * @param User $user User to clear the watchlist for. + * @param int $maxWatchlistId The maximum wl_id at the time the job was first created. + * + * @return ClearUserWatchlistJob + */ + public static function newForUser( User $user, $maxWatchlistId ) { + return new self( + null, + [ 'userId' => $user->getId(), 'maxWatchlistId' => $maxWatchlistId ] + ); + } + + /** + * @param Title|null $title Not used by this job. + * @param array $params + * - userId, The ID for the user whose watchlist is being cleared. + * - maxWatchlistId, The maximum wl_id at the time the job was first created, + */ + public function __construct( Title $title = null, array $params ) { + parent::__construct( + 'clearUserWatchlist', + SpecialPage::getTitleFor( 'EditWatchlist', 'clear' ), + $params + ); + + $this->removeDuplicates = true; + } + + public function run() { + global $wgUpdateRowsPerQuery; + $userId = $this->params['userId']; + $maxWatchlistId = $this->params['maxWatchlistId']; + $batchSize = $wgUpdateRowsPerQuery; + + $loadBalancer = MediaWikiServices::getInstance()->getDBLoadBalancer(); + $dbw = $loadBalancer->getConnection( DB_MASTER ); + $dbr = $loadBalancer->getConnection( DB_REPLICA, [ 'watchlist' ] ); + + // Wait before lock to try to reduce time waiting in the lock. + if ( !$loadBalancer->safeWaitForMasterPos( $dbr ) ) { + $this->setLastError( 'Timed out waiting for replica to catch up before lock' ); + return false; + } + + // Use a named lock so that jobs for this user see each others' changes + $lockKey = "ClearUserWatchlistJob:$userId"; + $scopedLock = $dbw->getScopedLockAndFlush( $lockKey, __METHOD__, 10 ); + if ( !$scopedLock ) { + $this->setLastError( "Could not acquire lock '$lockKey'" ); + return false; + } + + if ( !$loadBalancer->safeWaitForMasterPos( $dbr ) ) { + $this->setLastError( 'Timed out waiting for replica to catch up within lock' ); + return false; + } + + // Clear any stale REPEATABLE-READ snapshot + $dbr->flushSnapshot( __METHOD__ ); + + $watchlistIds = $dbr->selectFieldValues( + 'watchlist', + 'wl_id', + [ + 'wl_user' => $userId, + 'wl_id <= ' . $maxWatchlistId + ], + __METHOD__, + [ + 'ORDER BY' => 'wl_id ASC', + 'LIMIT' => $batchSize, + ] + ); + + if ( count( $watchlistIds ) == 0 ) { + return true; + } + + $dbw->delete( 'watchlist', [ 'wl_id' => $watchlistIds ], __METHOD__ ); + + // Commit changes and remove lock before inserting next job. + $lbf = MediaWikiServices::getInstance()->getDBLoadBalancerFactory(); + $lbf->commitMasterChanges( __METHOD__ ); + unset( $scopedLock ); + + if ( count( $watchlistIds ) === (int)$batchSize ) { + // Until we get less results than the limit, recursively push + // the same job again. + JobQueueGroup::singleton()->push( new self( $this->getTitle(), $this->getParams() ) ); + } + + return true; + } + + public function getDeduplicationInfo() { + $info = parent::getDeduplicationInfo(); + // This job never has a namespace or title so we can't use it for deduplication + unset( $info['namespace'] ); + unset( $info['title'] ); + return $info; + } + +} diff --git a/www/wiki/includes/jobqueue/jobs/ClearWatchlistNotificationsJob.php b/www/wiki/includes/jobqueue/jobs/ClearWatchlistNotificationsJob.php new file mode 100644 index 00000000..94c1351a --- /dev/null +++ b/www/wiki/includes/jobqueue/jobs/ClearWatchlistNotificationsJob.php @@ -0,0 +1,79 @@ +<?php +/** + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with this program; if not, write to the Free Software Foundation, Inc., + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + * http://www.gnu.org/copyleft/gpl.html + * + * @file + * @ingroup JobQueue + */ + +use MediaWiki\MediaWikiServices; + +/** + * Job for clearing all of the "last viewed" timestamps for a user's watchlist + * + * Job parameters include: + * - userId: affected user ID [required] + * - casTime: UNIX timestamp of the event that triggered this job [required] + * + * @ingroup JobQueue + * @since 1.31 + */ +class ClearWatchlistNotificationsJob extends Job { + function __construct( Title $title, array $params ) { + parent::__construct( 'clearWatchlistNotifications', $title, $params ); + + static $required = [ 'userId', 'casTime' ]; + $missing = implode( ', ', array_diff( $required, array_keys( $this->params ) ) ); + if ( $missing != '' ) { + throw new InvalidArgumentException( "Missing paramter(s) $missing" ); + } + + $this->removeDuplicates = true; + } + + public function run() { + $services = MediaWikiServices::getInstance(); + $lbFactory = $services->getDBLoadBalancerFactory(); + $rowsPerQuery = $services->getMainConfig()->get( 'UpdateRowsPerQuery' ); + + $dbw = $lbFactory->getMainLB()->getConnection( DB_MASTER ); + $ticket = $lbFactory->getEmptyTransactionTicket( __METHOD__ ); + + $asOfTimes = array_unique( $dbw->selectFieldValues( + 'watchlist', + 'wl_notificationtimestamp', + [ 'wl_user' => $this->params['userId'], 'wl_notificationtimestamp IS NOT NULL' ], + __METHOD__, + [ 'ORDER BY' => 'wl_notificationtimestamp DESC' ] + ) ); + + foreach ( array_chunk( $asOfTimes, $rowsPerQuery ) as $asOfTimeBatch ) { + $dbw->update( + 'watchlist', + [ 'wl_notificationtimestamp' => null ], + [ + 'wl_user' => $this->params['userId'], + 'wl_notificationtimestamp' => $asOfTimeBatch, + // New notifications since the reset should not be cleared + 'wl_notificationtimestamp < ' . + $dbw->addQuotes( $dbw->timestamp( $this->params['casTime'] ) ) + ], + __METHOD__ + ); + $lbFactory->commitAndWaitForReplication( __METHOD__, $ticket ); + } + } +} diff --git a/www/wiki/includes/jobqueue/jobs/DeleteLinksJob.php b/www/wiki/includes/jobqueue/jobs/DeleteLinksJob.php new file mode 100644 index 00000000..d0969e46 --- /dev/null +++ b/www/wiki/includes/jobqueue/jobs/DeleteLinksJob.php @@ -0,0 +1,67 @@ +<?php +/** + * Job to update link tables for pages + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with this program; if not, write to the Free Software Foundation, Inc., + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + * http://www.gnu.org/copyleft/gpl.html + * + * @file + * @ingroup JobQueue + */ + +use MediaWiki\MediaWikiServices; + +/** + * Job to prune link tables for pages that were deleted + * + * Only DataUpdate classes should construct these jobs + * + * @ingroup JobQueue + * @since 1.27 + */ +class DeleteLinksJob extends Job { + function __construct( Title $title, array $params ) { + parent::__construct( 'deleteLinks', $title, $params ); + $this->removeDuplicates = true; + } + + function run() { + if ( is_null( $this->title ) ) { + $this->setLastError( "deleteLinks: Invalid title" ); + return false; + } + + $pageId = $this->params['pageId']; + + // Serialize links updates by page ID so they see each others' changes + $scopedLock = LinksUpdate::acquirePageLock( wfGetDB( DB_MASTER ), $pageId, 'job' ); + + if ( WikiPage::newFromID( $pageId, WikiPage::READ_LATEST ) ) { + // The page was restored somehow or something went wrong + $this->setLastError( "deleteLinks: Page #$pageId exists" ); + return false; + } + + $factory = MediaWikiServices::getInstance()->getDBLoadBalancerFactory(); + $timestamp = isset( $this->params['timestamp'] ) ? $this->params['timestamp'] : null; + $page = WikiPage::factory( $this->title ); // title when deleted + + $update = new LinksDeletionUpdate( $page, $pageId, $timestamp ); + $update->setTransactionTicket( $factory->getEmptyTransactionTicket( __METHOD__ ) ); + $update->doUpdate(); + + return true; + } +} diff --git a/www/wiki/includes/jobqueue/jobs/DoubleRedirectJob.php b/www/wiki/includes/jobqueue/jobs/DoubleRedirectJob.php new file mode 100644 index 00000000..74c446fc --- /dev/null +++ b/www/wiki/includes/jobqueue/jobs/DoubleRedirectJob.php @@ -0,0 +1,252 @@ +<?php +/** + * Job to fix double redirects after moving a page. + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with this program; if not, write to the Free Software Foundation, Inc., + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + * http://www.gnu.org/copyleft/gpl.html + * + * @file + * @ingroup JobQueue + */ + +/** + * Job to fix double redirects after moving a page + * + * @ingroup JobQueue + */ +class DoubleRedirectJob extends Job { + /** @var string Reason for the change, 'maintenance' or 'move'. Suffix fo + * message key 'double-redirect-fixed-'. + */ + private $reason; + + /** @var Title The title which has changed, redirects pointing to this + * title are fixed + */ + private $redirTitle; + + /** @var User */ + private static $user; + + /** + * @param Title $title + * @param array $params + */ + function __construct( Title $title, array $params ) { + parent::__construct( 'fixDoubleRedirect', $title, $params ); + $this->reason = $params['reason']; + $this->redirTitle = Title::newFromText( $params['redirTitle'] ); + } + + /** + * Insert jobs into the job queue to fix redirects to the given title + * @param string $reason The reason for the fix, see message + * "double-redirect-fixed-<reason>" + * @param Title $redirTitle The title which has changed, redirects + * pointing to this title are fixed + * @param bool $destTitle Not used + */ + public static function fixRedirects( $reason, $redirTitle, $destTitle = false ) { + # Need to use the master to get the redirect table updated in the same transaction + $dbw = wfGetDB( DB_MASTER ); + $res = $dbw->select( + [ 'redirect', 'page' ], + [ 'page_namespace', 'page_title' ], + [ + 'page_id = rd_from', + 'rd_namespace' => $redirTitle->getNamespace(), + 'rd_title' => $redirTitle->getDBkey() + ], __METHOD__ ); + if ( !$res->numRows() ) { + return; + } + $jobs = []; + foreach ( $res as $row ) { + $title = Title::makeTitle( $row->page_namespace, $row->page_title ); + if ( !$title ) { + continue; + } + + $jobs[] = new self( $title, [ + 'reason' => $reason, + 'redirTitle' => $redirTitle->getPrefixedDBkey() ] ); + # Avoid excessive memory usage + if ( count( $jobs ) > 10000 ) { + JobQueueGroup::singleton()->push( $jobs ); + $jobs = []; + } + } + JobQueueGroup::singleton()->push( $jobs ); + } + + /** + * @return bool + */ + function run() { + if ( !$this->redirTitle ) { + $this->setLastError( 'Invalid title' ); + + return false; + } + + $targetRev = Revision::newFromTitle( $this->title, false, Revision::READ_LATEST ); + if ( !$targetRev ) { + wfDebug( __METHOD__ . ": target redirect already deleted, ignoring\n" ); + + return true; + } + $content = $targetRev->getContent(); + $currentDest = $content ? $content->getRedirectTarget() : null; + if ( !$currentDest || !$currentDest->equals( $this->redirTitle ) ) { + wfDebug( __METHOD__ . ": Redirect has changed since the job was queued\n" ); + + return true; + } + + // Check for a suppression tag (used e.g. in periodically archived discussions) + $mw = MagicWord::get( 'staticredirect' ); + if ( $content->matchMagicWord( $mw ) ) { + wfDebug( __METHOD__ . ": skipping: suppressed with __STATICREDIRECT__\n" ); + + return true; + } + + // Find the current final destination + $newTitle = self::getFinalDestination( $this->redirTitle ); + if ( !$newTitle ) { + wfDebug( __METHOD__ . + ": skipping: single redirect, circular redirect or invalid redirect destination\n" ); + + return true; + } + if ( $newTitle->equals( $this->redirTitle ) ) { + // The redirect is already right, no need to change it + // This can happen if the page was moved back (say after vandalism) + wfDebug( __METHOD__ . " : skipping, already good\n" ); + } + + // Preserve fragment (T16904) + $newTitle = Title::makeTitle( $newTitle->getNamespace(), $newTitle->getDBkey(), + $currentDest->getFragment(), $newTitle->getInterwiki() ); + + // Fix the text + $newContent = $content->updateRedirect( $newTitle ); + + if ( $newContent->equals( $content ) ) { + $this->setLastError( 'Content unchanged???' ); + + return false; + } + + $user = $this->getUser(); + if ( !$user ) { + $this->setLastError( 'Invalid user' ); + + return false; + } + + // Save it + global $wgUser; + $oldUser = $wgUser; + $wgUser = $user; + $article = WikiPage::factory( $this->title ); + + // Messages: double-redirect-fixed-move, double-redirect-fixed-maintenance + $reason = wfMessage( 'double-redirect-fixed-' . $this->reason, + $this->redirTitle->getPrefixedText(), $newTitle->getPrefixedText() + )->inContentLanguage()->text(); + $flags = EDIT_UPDATE | EDIT_SUPPRESS_RC | EDIT_INTERNAL; + $article->doEditContent( $newContent, $reason, $flags, false, $user ); + $wgUser = $oldUser; + + return true; + } + + /** + * Get the final destination of a redirect + * + * @param Title $title + * + * @return Title|bool The final Title after following all redirects, or false if + * the page is not a redirect or the redirect loops. + */ + public static function getFinalDestination( $title ) { + $dbw = wfGetDB( DB_MASTER ); + + // Circular redirect check + $seenTitles = []; + $dest = false; + + while ( true ) { + $titleText = $title->getPrefixedDBkey(); + if ( isset( $seenTitles[$titleText] ) ) { + wfDebug( __METHOD__, "Circular redirect detected, aborting\n" ); + + return false; + } + $seenTitles[$titleText] = true; + + if ( $title->isExternal() ) { + // If the target is interwiki, we have to break early (T42352). + // Otherwise it will look up a row in the local page table + // with the namespace/page of the interwiki target which can cause + // unexpected results (e.g. X -> foo:Bar -> Bar -> .. ) + break; + } + + $row = $dbw->selectRow( + [ 'redirect', 'page' ], + [ 'rd_namespace', 'rd_title', 'rd_interwiki' ], + [ + 'rd_from=page_id', + 'page_namespace' => $title->getNamespace(), + 'page_title' => $title->getDBkey() + ], __METHOD__ ); + if ( !$row ) { + # No redirect from here, chain terminates + break; + } else { + $dest = $title = Title::makeTitle( + $row->rd_namespace, + $row->rd_title, + '', + $row->rd_interwiki + ); + } + } + + return $dest; + } + + /** + * Get a user object for doing edits, from a request-lifetime cache + * False will be returned if the user name specified in the + * 'double-redirect-fixer' message is invalid. + * + * @return User|bool + */ + function getUser() { + if ( !self::$user ) { + $username = wfMessage( 'double-redirect-fixer' )->inContentLanguage()->text(); + self::$user = User::newFromName( $username ); + # User::newFromName() can return false on a badly configured wiki. + if ( self::$user && !self::$user->isLoggedIn() ) { + self::$user->addToDatabase(); + } + } + + return self::$user; + } +} diff --git a/www/wiki/includes/jobqueue/jobs/DuplicateJob.php b/www/wiki/includes/jobqueue/jobs/DuplicateJob.php new file mode 100644 index 00000000..c005a29a --- /dev/null +++ b/www/wiki/includes/jobqueue/jobs/DuplicateJob.php @@ -0,0 +1,59 @@ +<?php +/** + * No-op job that does nothing. + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with this program; if not, write to the Free Software Foundation, Inc., + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + * http://www.gnu.org/copyleft/gpl.html + * + * @file + * @ingroup JobQueue + */ + +/** + * No-op job that does nothing. Used to represent duplicates. + * + * @ingroup JobQueue + */ +final class DuplicateJob extends Job { + /** + * Callers should use DuplicateJob::newFromJob() instead + * + * @param Title $title + * @param array $params Job parameters + */ + function __construct( Title $title, array $params ) { + parent::__construct( 'duplicate', $title, $params ); + } + + /** + * Get a duplicate no-op version of a job + * + * @param Job $job + * @return Job + */ + public static function newFromJob( Job $job ) { + $djob = new self( $job->getTitle(), $job->getParams() ); + $djob->command = $job->getType(); + $djob->params = is_array( $djob->params ) ? $djob->params : []; + $djob->params = [ 'isDuplicate' => true ] + $djob->params; + $djob->metadata = $job->metadata; + + return $djob; + } + + public function run() { + return true; + } +} diff --git a/www/wiki/includes/jobqueue/jobs/EmaillingJob.php b/www/wiki/includes/jobqueue/jobs/EmaillingJob.php new file mode 100644 index 00000000..960e8828 --- /dev/null +++ b/www/wiki/includes/jobqueue/jobs/EmaillingJob.php @@ -0,0 +1,46 @@ +<?php +/** + * Old job for notification emails. + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with this program; if not, write to the Free Software Foundation, Inc., + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + * http://www.gnu.org/copyleft/gpl.html + * + * @file + * @ingroup JobQueue + */ + +/** + * Old job used for sending single notification emails; + * kept for backwards-compatibility + * + * @ingroup JobQueue + */ +class EmaillingJob extends Job { + function __construct( Title $title = null, array $params ) { + parent::__construct( 'sendMail', Title::newMainPage(), $params ); + } + + function run() { + $status = UserMailer::send( + $this->params['to'], + $this->params['from'], + $this->params['subj'], + $this->params['body'], + [ 'replyTo' => $this->params['replyto'] ] + ); + + return $status->isOK(); + } +} diff --git a/www/wiki/includes/jobqueue/jobs/EnotifNotifyJob.php b/www/wiki/includes/jobqueue/jobs/EnotifNotifyJob.php new file mode 100644 index 00000000..9a5c3c72 --- /dev/null +++ b/www/wiki/includes/jobqueue/jobs/EnotifNotifyJob.php @@ -0,0 +1,57 @@ +<?php +/** + * Job for notification emails. + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with this program; if not, write to the Free Software Foundation, Inc., + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + * http://www.gnu.org/copyleft/gpl.html + * + * @file + * @ingroup JobQueue + */ + +/** + * Job for email notification mails + * + * @ingroup JobQueue + */ +class EnotifNotifyJob extends Job { + function __construct( Title $title, array $params ) { + parent::__construct( 'enotifNotify', $title, $params ); + } + + function run() { + $enotif = new EmailNotification(); + // Get the user from ID (rename safe). Anons are 0, so defer to name. + if ( isset( $this->params['editorID'] ) && $this->params['editorID'] ) { + $editor = User::newFromId( $this->params['editorID'] ); + // B/C, only the name might be given. + } else { + # @todo FIXME: newFromName could return false on a badly configured wiki. + $editor = User::newFromName( $this->params['editor'], false ); + } + $enotif->actuallyNotifyOnPageChange( + $editor, + $this->title, + $this->params['timestamp'], + $this->params['summary'], + $this->params['minorEdit'], + $this->params['oldid'], + $this->params['watchers'], + $this->params['pageStatus'] + ); + + return true; + } +} diff --git a/www/wiki/includes/jobqueue/jobs/EnqueueJob.php b/www/wiki/includes/jobqueue/jobs/EnqueueJob.php new file mode 100644 index 00000000..ea7a8d78 --- /dev/null +++ b/www/wiki/includes/jobqueue/jobs/EnqueueJob.php @@ -0,0 +1,98 @@ +<?php +/** + * Router job that takes jobs and enqueues them. + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with this program; if not, write to the Free Software Foundation, Inc., + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + * http://www.gnu.org/copyleft/gpl.html + * + * @file + * @ingroup JobQueue + */ + +/** + * Router job that takes jobs and enqueues them to their proper queues + * + * This can be used for getting sets of multiple jobs or sets of jobs intended for multiple + * queues to be inserted more robustly. This is a single job that, upon running, enqueues the + * wrapped jobs. If some of those fail to enqueue then the EnqueueJob will be retried. Due to + * the possibility of duplicate enqueues, the wrapped jobs should be idempotent. + * + * @ingroup JobQueue + * @since 1.25 + */ +final class EnqueueJob extends Job { + /** + * Callers should use the factory methods instead + * + * @param Title $title + * @param array $params Job parameters + */ + function __construct( Title $title, array $params ) { + parent::__construct( 'enqueue', $title, $params ); + } + + /** + * @param JobSpecification|JobSpecification[] $jobs + * @return EnqueueJob + */ + public static function newFromLocalJobs( $jobs ) { + $jobs = is_array( $jobs ) ? $jobs : [ $jobs ]; + + return self::newFromJobsByWiki( [ wfWikiID() => $jobs ] ); + } + + /** + * @param array $jobsByWiki Map of (wiki => JobSpecification list) + * @return EnqueueJob + */ + public static function newFromJobsByWiki( array $jobsByWiki ) { + $deduplicate = true; + + $jobMapsByWiki = []; + foreach ( $jobsByWiki as $wiki => $jobs ) { + $jobMapsByWiki[$wiki] = []; + foreach ( $jobs as $job ) { + if ( $job instanceof JobSpecification ) { + $jobMapsByWiki[$wiki][] = $job->toSerializableArray(); + } else { + throw new InvalidArgumentException( "Jobs must be of type JobSpecification." ); + } + $deduplicate = $deduplicate && $job->ignoreDuplicates(); + } + } + + $eJob = new self( + Title::makeTitle( NS_SPECIAL, 'Badtitle/' . __CLASS__ ), + [ 'jobsByWiki' => $jobMapsByWiki ] + ); + // If *all* jobs to be pushed are to be de-duplicated (a common case), then + // de-duplicate this whole job itself to avoid build up in high traffic cases + $eJob->removeDuplicates = $deduplicate; + + return $eJob; + } + + public function run() { + foreach ( $this->params['jobsByWiki'] as $wiki => $jobMaps ) { + $jobSpecs = []; + foreach ( $jobMaps as $jobMap ) { + $jobSpecs[] = JobSpecification::newFromArray( $jobMap ); + } + JobQueueGroup::singleton( $wiki )->push( $jobSpecs ); + } + + return true; + } +} diff --git a/www/wiki/includes/jobqueue/jobs/HTMLCacheUpdateJob.php b/www/wiki/includes/jobqueue/jobs/HTMLCacheUpdateJob.php new file mode 100644 index 00000000..34028df1 --- /dev/null +++ b/www/wiki/includes/jobqueue/jobs/HTMLCacheUpdateJob.php @@ -0,0 +1,202 @@ +<?php +/** + * HTML cache invalidation of all pages linking to a given title. + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with this program; if not, write to the Free Software Foundation, Inc., + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + * http://www.gnu.org/copyleft/gpl.html + * + * @file + * @ingroup JobQueue + * @ingroup Cache + */ + +use MediaWiki\MediaWikiServices; + +/** + * Job to purge the cache for all pages that link to or use another page or file + * + * This job comes in a few variants: + * - a) Recursive jobs to purge caches for backlink pages for a given title. + * These jobs have (recursive:true,table:<table>) set. + * - b) Jobs to purge caches for a set of titles (the job title is ignored). + * These jobs have (pages:(<page ID>:(<namespace>,<title>),...) set. + * + * @ingroup JobQueue + */ +class HTMLCacheUpdateJob extends Job { + function __construct( Title $title, array $params ) { + parent::__construct( 'htmlCacheUpdate', $title, $params ); + // Avoid the overhead of de-duplication when it would be pointless. + // Note that these jobs always set page_touched to the current time, + // so letting the older existing job "win" is still correct. + $this->removeDuplicates = ( + // Ranges rarely will line up + !isset( $params['range'] ) && + // Multiple pages per job make matches unlikely + !( isset( $params['pages'] ) && count( $params['pages'] ) != 1 ) + ); + $this->params += [ 'causeAction' => 'unknown', 'causeAgent' => 'unknown' ]; + } + + /** + * @param Title $title Title to purge backlink pages from + * @param string $table Backlink table name + * @param array $params Additional job parameters + * @return HTMLCacheUpdateJob + */ + public static function newForBacklinks( Title $title, $table, $params = [] ) { + return new self( + $title, + [ + 'table' => $table, + 'recursive' => true + ] + Job::newRootJobParams( // "overall" refresh links job info + "htmlCacheUpdate:{$table}:{$title->getPrefixedText()}" + ) + $params + ); + } + + function run() { + global $wgUpdateRowsPerJob, $wgUpdateRowsPerQuery; + + if ( isset( $this->params['table'] ) && !isset( $this->params['pages'] ) ) { + $this->params['recursive'] = true; // b/c; base job + } + + // Job to purge all (or a range of) backlink pages for a page + if ( !empty( $this->params['recursive'] ) ) { + // Carry over information for de-duplication + $extraParams = $this->getRootJobParams(); + // Carry over cause information for logging + $extraParams['causeAction'] = $this->params['causeAction']; + $extraParams['causeAgent'] = $this->params['causeAgent']; + // Convert this into no more than $wgUpdateRowsPerJob HTMLCacheUpdateJob per-title + // jobs and possibly a recursive HTMLCacheUpdateJob job for the rest of the backlinks + $jobs = BacklinkJobUtils::partitionBacklinkJob( + $this, + $wgUpdateRowsPerJob, + $wgUpdateRowsPerQuery, // jobs-per-title + // Carry over information for de-duplication + [ 'params' => $extraParams ] + ); + JobQueueGroup::singleton()->push( $jobs ); + // Job to purge pages for a set of titles + } elseif ( isset( $this->params['pages'] ) ) { + $this->invalidateTitles( $this->params['pages'] ); + // Job to update a single title + } else { + $t = $this->title; + $this->invalidateTitles( [ + $t->getArticleID() => [ $t->getNamespace(), $t->getDBkey() ] + ] ); + } + + return true; + } + + /** + * @param array $pages Map of (page ID => (namespace, DB key)) entries + */ + protected function invalidateTitles( array $pages ) { + global $wgUpdateRowsPerQuery, $wgUseFileCache, $wgPageLanguageUseDB; + + // Get all page IDs in this query into an array + $pageIds = array_keys( $pages ); + if ( !$pageIds ) { + return; + } + + // Bump page_touched to the current timestamp. This used to use the root job timestamp + // (e.g. template/file edit time), which was a bit more efficient when template edits are + // rare and don't effect the same pages much. However, this way allows for better + // de-duplication, which is much more useful for wikis with high edit rates. Note that + // RefreshLinksJob, which is enqueued alongside HTMLCacheUpdateJob, saves the parser output + // since it has to parse anyway. We assume that vast majority of the cache jobs finish + // before the link jobs, so using the current timestamp instead of the root timestamp is + // not expected to invalidate these cache entries too often. + $touchTimestamp = wfTimestampNow(); + // If page_touched is higher than this, then something else already bumped it after enqueue + $condTimestamp = isset( $this->params['rootJobTimestamp'] ) + ? $this->params['rootJobTimestamp'] + : $touchTimestamp; + + $dbw = wfGetDB( DB_MASTER ); + $factory = MediaWikiServices::getInstance()->getDBLoadBalancerFactory(); + $ticket = $factory->getEmptyTransactionTicket( __METHOD__ ); + // Update page_touched (skipping pages already touched since the root job). + // Check $wgUpdateRowsPerQuery for sanity; batch jobs are sized by that already. + foreach ( array_chunk( $pageIds, $wgUpdateRowsPerQuery ) as $batch ) { + $factory->commitAndWaitForReplication( __METHOD__, $ticket ); + + $dbw->update( 'page', + [ 'page_touched' => $dbw->timestamp( $touchTimestamp ) ], + [ 'page_id' => $batch, + // don't invalidated pages that were already invalidated + "page_touched < " . $dbw->addQuotes( $dbw->timestamp( $condTimestamp ) ) + ], + __METHOD__ + ); + } + // Get the list of affected pages (races only mean something else did the purge) + $titleArray = TitleArray::newFromResult( $dbw->select( + 'page', + array_merge( + [ 'page_namespace', 'page_title' ], + $wgPageLanguageUseDB ? [ 'page_lang' ] : [] + ), + [ 'page_id' => $pageIds, 'page_touched' => $dbw->timestamp( $touchTimestamp ) ], + __METHOD__ + ) ); + + // Update CDN; call purge() directly so as to not bother with secondary purges + $urls = []; + foreach ( $titleArray as $title ) { + /** @var Title $title */ + $urls = array_merge( $urls, $title->getCdnUrls() ); + } + CdnCacheUpdate::purge( $urls ); + + // Update file cache + if ( $wgUseFileCache ) { + foreach ( $titleArray as $title ) { + HTMLFileCache::clearFileCache( $title ); + } + } + } + + public function getDeduplicationInfo() { + $info = parent::getDeduplicationInfo(); + if ( is_array( $info['params'] ) ) { + // For per-pages jobs, the job title is that of the template that changed + // (or similar), so remove that since it ruins duplicate detection + if ( isset( $info['params']['pages'] ) ) { + unset( $info['namespace'] ); + unset( $info['title'] ); + } + } + + return $info; + } + + public function workItemCount() { + if ( !empty( $this->params['recursive'] ) ) { + return 0; // nothing actually purged + } elseif ( isset( $this->params['pages'] ) ) { + return count( $this->params['pages'] ); + } + + return 1; // one title + } +} diff --git a/www/wiki/includes/jobqueue/jobs/NullJob.php b/www/wiki/includes/jobqueue/jobs/NullJob.php new file mode 100644 index 00000000..80826fe1 --- /dev/null +++ b/www/wiki/includes/jobqueue/jobs/NullJob.php @@ -0,0 +1,76 @@ +<?php +/** + * Degenerate job that does nothing. + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with this program; if not, write to the Free Software Foundation, Inc., + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + * http://www.gnu.org/copyleft/gpl.html + * + * @file + * @ingroup JobQueue + */ + +/** + * Degenerate job that does nothing, but can optionally replace itself + * in the queue and/or sleep for a brief time period. These can be used + * to represent "no-op" jobs or test lock contention and performance. + * + * @par Example: + * Inserting a null job in the configured job queue: + * @code + * $ php maintenance/eval.php + * > $queue = JobQueueGroup::singleton(); + * > $job = new NullJob( Title::newMainPage(), [ 'lives' => 10 ] ); + * > $queue->push( $job ); + * @endcode + * You can then confirm the job has been enqueued by using the showJobs.php + * maintenance utility: + * @code + * $ php maintenance/showJobs.php --group + * null: 1 queue; 0 claimed (0 active, 0 abandoned) + * $ + * @endcode + * + * @ingroup JobQueue + */ +class NullJob extends Job { + /** + * @param Title $title + * @param array $params Job parameters (lives, usleep) + */ + function __construct( Title $title, array $params ) { + parent::__construct( 'null', $title, $params ); + if ( !isset( $this->params['lives'] ) ) { + $this->params['lives'] = 1; + } + if ( !isset( $this->params['usleep'] ) ) { + $this->params['usleep'] = 0; + } + $this->removeDuplicates = !empty( $this->params['removeDuplicates'] ); + } + + public function run() { + if ( $this->params['usleep'] > 0 ) { + usleep( $this->params['usleep'] ); + } + if ( $this->params['lives'] > 1 ) { + $params = $this->params; + $params['lives']--; + $job = new self( $this->title, $params ); + JobQueueGroup::singleton()->push( $job ); + } + + return true; + } +} diff --git a/www/wiki/includes/jobqueue/jobs/PublishStashedFileJob.php b/www/wiki/includes/jobqueue/jobs/PublishStashedFileJob.php new file mode 100644 index 00000000..e89812be --- /dev/null +++ b/www/wiki/includes/jobqueue/jobs/PublishStashedFileJob.php @@ -0,0 +1,152 @@ +<?php +/** + * Upload a file from the upload stash into the local file repo. + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with this program; if not, write to the Free Software Foundation, Inc., + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + * http://www.gnu.org/copyleft/gpl.html + * + * @file + * @ingroup Upload + * @ingroup JobQueue + */ +use Wikimedia\ScopedCallback; + +/** + * Upload a file from the upload stash into the local file repo. + * + * @ingroup Upload + * @ingroup JobQueue + */ +class PublishStashedFileJob extends Job { + public function __construct( Title $title, array $params ) { + parent::__construct( 'PublishStashedFile', $title, $params ); + $this->removeDuplicates = true; + } + + public function run() { + $scope = RequestContext::importScopedSession( $this->params['session'] ); + $this->addTeardownCallback( function () use ( &$scope ) { + ScopedCallback::consume( $scope ); // T126450 + } ); + + $context = RequestContext::getMain(); + $user = $context->getUser(); + try { + if ( !$user->isLoggedIn() ) { + $this->setLastError( "Could not load the author user from session." ); + + return false; + } + + UploadBase::setSessionStatus( + $user, + $this->params['filekey'], + [ 'result' => 'Poll', 'stage' => 'publish', 'status' => Status::newGood() ] + ); + + $upload = new UploadFromStash( $user ); + // @todo initialize() causes a GET, ideally we could frontload the antivirus + // checks and anything else to the stash stage (which includes concatenation and + // the local file is thus already there). That way, instead of GET+PUT, there could + // just be a COPY operation from the stash to the public zone. + $upload->initialize( $this->params['filekey'], $this->params['filename'] ); + + // Check if the local file checks out (this is generally a no-op) + $verification = $upload->verifyUpload(); + if ( $verification['status'] !== UploadBase::OK ) { + $status = Status::newFatal( 'verification-error' ); + $status->value = [ 'verification' => $verification ]; + UploadBase::setSessionStatus( + $user, + $this->params['filekey'], + [ 'result' => 'Failure', 'stage' => 'publish', 'status' => $status ] + ); + $this->setLastError( "Could not verify upload." ); + + return false; + } + + // Upload the stashed file to a permanent location + $status = $upload->performUpload( + $this->params['comment'], + $this->params['text'], + $this->params['watch'], + $user, + isset( $this->params['tags'] ) ? $this->params['tags'] : [] + ); + if ( !$status->isGood() ) { + UploadBase::setSessionStatus( + $user, + $this->params['filekey'], + [ 'result' => 'Failure', 'stage' => 'publish', 'status' => $status ] + ); + $this->setLastError( $status->getWikiText( false, false, 'en' ) ); + + return false; + } + + // Build the image info array while we have the local reference handy + $apiMain = new ApiMain(); // dummy object (XXX) + $imageInfo = $upload->getImageInfo( $apiMain->getResult() ); + + // Cleanup any temporary local file + $upload->cleanupTempFile(); + + // Cache the info so the user doesn't have to wait forever to get the final info + UploadBase::setSessionStatus( + $user, + $this->params['filekey'], + [ + 'result' => 'Success', + 'stage' => 'publish', + 'filename' => $upload->getLocalFile()->getName(), + 'imageinfo' => $imageInfo, + 'status' => Status::newGood() + ] + ); + } catch ( Exception $e ) { + UploadBase::setSessionStatus( + $user, + $this->params['filekey'], + [ + 'result' => 'Failure', + 'stage' => 'publish', + 'status' => Status::newFatal( 'api-error-publishfailed' ) + ] + ); + $this->setLastError( get_class( $e ) . ": " . $e->getMessage() ); + // To prevent potential database referential integrity issues. + // See T34551. + MWExceptionHandler::rollbackMasterChangesAndLog( $e ); + + return false; + } + + return true; + } + + public function getDeduplicationInfo() { + $info = parent::getDeduplicationInfo(); + if ( is_array( $info['params'] ) ) { + $info['params'] = [ 'filekey' => $info['params']['filekey'] ]; + } + + return $info; + } + + public function allowRetries() { + return false; + } +} diff --git a/www/wiki/includes/jobqueue/jobs/RecentChangesUpdateJob.php b/www/wiki/includes/jobqueue/jobs/RecentChangesUpdateJob.php new file mode 100644 index 00000000..8f508283 --- /dev/null +++ b/www/wiki/includes/jobqueue/jobs/RecentChangesUpdateJob.php @@ -0,0 +1,246 @@ +<?php +/** + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with this program; if not, write to the Free Software Foundation, Inc., + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + * http://www.gnu.org/copyleft/gpl.html + * + * @file + * @ingroup JobQueue + */ +use MediaWiki\MediaWikiServices; +use Wikimedia\Rdbms\DBReplicationWaitError; + +/** + * Job for pruning recent changes + * + * @ingroup JobQueue + * @since 1.25 + */ +class RecentChangesUpdateJob extends Job { + function __construct( Title $title, array $params ) { + parent::__construct( 'recentChangesUpdate', $title, $params ); + + if ( !isset( $params['type'] ) ) { + throw new Exception( "Missing 'type' parameter." ); + } + + $this->executionFlags |= self::JOB_NO_EXPLICIT_TRX_ROUND; + $this->removeDuplicates = true; + } + + /** + * @return RecentChangesUpdateJob + */ + final public static function newPurgeJob() { + return new self( + SpecialPage::getTitleFor( 'Recentchanges' ), [ 'type' => 'purge' ] + ); + } + + /** + * @return RecentChangesUpdateJob + * @since 1.26 + */ + final public static function newCacheUpdateJob() { + return new self( + SpecialPage::getTitleFor( 'Recentchanges' ), [ 'type' => 'cacheUpdate' ] + ); + } + + public function run() { + if ( $this->params['type'] === 'purge' ) { + $this->purgeExpiredRows(); + } elseif ( $this->params['type'] === 'cacheUpdate' ) { + $this->updateActiveUsers(); + } else { + throw new InvalidArgumentException( + "Invalid 'type' parameter '{$this->params['type']}'." ); + } + + return true; + } + + protected function purgeExpiredRows() { + global $wgRCMaxAge, $wgUpdateRowsPerQuery; + + $lockKey = wfWikiID() . ':recentchanges-prune'; + + $dbw = wfGetDB( DB_MASTER ); + if ( !$dbw->lock( $lockKey, __METHOD__, 0 ) ) { + // already in progress + return; + } + + $factory = MediaWikiServices::getInstance()->getDBLoadBalancerFactory(); + $ticket = $factory->getEmptyTransactionTicket( __METHOD__ ); + $cutoff = $dbw->timestamp( time() - $wgRCMaxAge ); + $rcQuery = RecentChange::getQueryInfo(); + do { + $rcIds = []; + $rows = []; + $res = $dbw->select( + $rcQuery['tables'], + $rcQuery['fields'], + [ 'rc_timestamp < ' . $dbw->addQuotes( $cutoff ) ], + __METHOD__, + [ 'LIMIT' => $wgUpdateRowsPerQuery ], + $rcQuery['joins'] + ); + foreach ( $res as $row ) { + $rcIds[] = $row->rc_id; + $rows[] = $row; + } + if ( $rcIds ) { + $dbw->delete( 'recentchanges', [ 'rc_id' => $rcIds ], __METHOD__ ); + Hooks::run( 'RecentChangesPurgeRows', [ $rows ] ); + // There might be more, so try waiting for replica DBs + try { + $factory->commitAndWaitForReplication( + __METHOD__, $ticket, [ 'timeout' => 3 ] + ); + } catch ( DBReplicationWaitError $e ) { + // Another job will continue anyway + break; + } + } + } while ( $rcIds ); + + $dbw->unlock( $lockKey, __METHOD__ ); + } + + protected function updateActiveUsers() { + global $wgActiveUserDays; + + // Users that made edits at least this many days ago are "active" + $days = $wgActiveUserDays; + // Pull in the full window of active users in this update + $window = $wgActiveUserDays * 86400; + + $dbw = wfGetDB( DB_MASTER ); + $factory = MediaWikiServices::getInstance()->getDBLoadBalancerFactory(); + $ticket = $factory->getEmptyTransactionTicket( __METHOD__ ); + + $lockKey = wfWikiID() . '-activeusers'; + if ( !$dbw->lock( $lockKey, __METHOD__, 0 ) ) { + // Exclusive update (avoids duplicate entries)… it's usually fine to just + // drop out here, if the Job is already running. + return; + } + + // Long-running queries expected + $dbw->setSessionOptions( [ 'connTimeout' => 900 ] ); + + $nowUnix = time(); + // Get the last-updated timestamp for the cache + $cTime = $dbw->selectField( 'querycache_info', + 'qci_timestamp', + [ 'qci_type' => 'activeusers' ] + ); + $cTimeUnix = $cTime ? wfTimestamp( TS_UNIX, $cTime ) : 1; + + // Pick the date range to fetch from. This is normally from the last + // update to till the present time, but has a limited window for sanity. + // If the window is limited, multiple runs are need to fully populate it. + $sTimestamp = max( $cTimeUnix, $nowUnix - $days * 86400 ); + $eTimestamp = min( $sTimestamp + $window, $nowUnix ); + + // Get all the users active since the last update + $actorQuery = ActorMigration::newMigration()->getJoin( 'rc_user' ); + $res = $dbw->select( + [ 'recentchanges' ] + $actorQuery['tables'], + [ + 'rc_user_text' => $actorQuery['fields']['rc_user_text'], + 'lastedittime' => 'MAX(rc_timestamp)' + ], + [ + $actorQuery['fields']['rc_user'] . ' > 0', // actual accounts + 'rc_type != ' . $dbw->addQuotes( RC_EXTERNAL ), // no wikidata + 'rc_log_type IS NULL OR rc_log_type != ' . $dbw->addQuotes( 'newusers' ), + 'rc_timestamp >= ' . $dbw->addQuotes( $dbw->timestamp( $sTimestamp ) ), + 'rc_timestamp <= ' . $dbw->addQuotes( $dbw->timestamp( $eTimestamp ) ) + ], + __METHOD__, + [ + 'GROUP BY' => [ 'rc_user_text' ], + 'ORDER BY' => 'NULL' // avoid filesort + ], + $actorQuery['joins'] + ); + $names = []; + foreach ( $res as $row ) { + $names[$row->rc_user_text] = $row->lastedittime; + } + + // Find which of the recently active users are already accounted for + if ( count( $names ) ) { + $res = $dbw->select( 'querycachetwo', + [ 'user_name' => 'qcc_title' ], + [ + 'qcc_type' => 'activeusers', + 'qcc_namespace' => NS_USER, + 'qcc_title' => array_keys( $names ), + 'qcc_value >= ' . $dbw->addQuotes( $nowUnix - $days * 86400 ), // TS_UNIX + ], + __METHOD__ + ); + // Note: In order for this to be actually consistent, we would need + // to update these rows with the new lastedittime. + foreach ( $res as $row ) { + unset( $names[$row->user_name] ); + } + } + + // Insert the users that need to be added to the list + if ( count( $names ) ) { + $newRows = []; + foreach ( $names as $name => $lastEditTime ) { + $newRows[] = [ + 'qcc_type' => 'activeusers', + 'qcc_namespace' => NS_USER, + 'qcc_title' => $name, + 'qcc_value' => wfTimestamp( TS_UNIX, $lastEditTime ), + 'qcc_namespacetwo' => 0, // unused + 'qcc_titletwo' => '' // unused + ]; + } + foreach ( array_chunk( $newRows, 500 ) as $rowBatch ) { + $dbw->insert( 'querycachetwo', $rowBatch, __METHOD__ ); + $factory->commitAndWaitForReplication( __METHOD__, $ticket ); + } + } + + // If a transaction was already started, it might have an old + // snapshot, so kludge the timestamp range back as needed. + $asOfTimestamp = min( $eTimestamp, (int)$dbw->trxTimestamp() ); + + // Touch the data freshness timestamp + $dbw->replace( 'querycache_info', + [ 'qci_type' ], + [ 'qci_type' => 'activeusers', + 'qci_timestamp' => $dbw->timestamp( $asOfTimestamp ) ], // not always $now + __METHOD__ + ); + + $dbw->unlock( $lockKey, __METHOD__ ); + + // Rotate out users that have not edited in too long (according to old data set) + $dbw->delete( 'querycachetwo', + [ + 'qcc_type' => 'activeusers', + 'qcc_value < ' . $dbw->addQuotes( $nowUnix - $days * 86400 ) // TS_UNIX + ], + __METHOD__ + ); + } +} diff --git a/www/wiki/includes/jobqueue/jobs/RefreshLinksJob.php b/www/wiki/includes/jobqueue/jobs/RefreshLinksJob.php new file mode 100644 index 00000000..8854c656 --- /dev/null +++ b/www/wiki/includes/jobqueue/jobs/RefreshLinksJob.php @@ -0,0 +1,320 @@ +<?php +/** + * Job to update link tables for pages + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with this program; if not, write to the Free Software Foundation, Inc., + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + * http://www.gnu.org/copyleft/gpl.html + * + * @file + * @ingroup JobQueue + */ +use MediaWiki\MediaWikiServices; +use Wikimedia\Rdbms\DBReplicationWaitError; + +/** + * Job to update link tables for pages + * + * This job comes in a few variants: + * - a) Recursive jobs to update links for backlink pages for a given title. + * These jobs have (recursive:true,table:<table>) set. + * - b) Jobs to update links for a set of pages (the job title is ignored). + * These jobs have (pages:(<page ID>:(<namespace>,<title>),...) set. + * - c) Jobs to update links for a single page (the job title) + * These jobs need no extra fields set. + * + * @ingroup JobQueue + */ +class RefreshLinksJob extends Job { + /** @var float Cache parser output when it takes this long to render */ + const PARSE_THRESHOLD_SEC = 1.0; + /** @var int Lag safety margin when comparing root job times to last-refresh times */ + const CLOCK_FUDGE = 10; + /** @var int How many seconds to wait for replica DBs to catch up */ + const LAG_WAIT_TIMEOUT = 15; + + function __construct( Title $title, array $params ) { + parent::__construct( 'refreshLinks', $title, $params ); + // Avoid the overhead of de-duplication when it would be pointless + $this->removeDuplicates = ( + // Ranges rarely will line up + !isset( $params['range'] ) && + // Multiple pages per job make matches unlikely + !( isset( $params['pages'] ) && count( $params['pages'] ) != 1 ) + ); + $this->params += [ 'causeAction' => 'unknown', 'causeAgent' => 'unknown' ]; + } + + /** + * @param Title $title + * @param array $params + * @return RefreshLinksJob + */ + public static function newPrioritized( Title $title, array $params ) { + $job = new self( $title, $params ); + $job->command = 'refreshLinksPrioritized'; + + return $job; + } + + /** + * @param Title $title + * @param array $params + * @return RefreshLinksJob + */ + public static function newDynamic( Title $title, array $params ) { + $job = new self( $title, $params ); + $job->command = 'refreshLinksDynamic'; + + return $job; + } + + function run() { + global $wgUpdateRowsPerJob; + + // Job to update all (or a range of) backlink pages for a page + if ( !empty( $this->params['recursive'] ) ) { + // When the base job branches, wait for the replica DBs to catch up to the master. + // From then on, we know that any template changes at the time the base job was + // enqueued will be reflected in backlink page parses when the leaf jobs run. + if ( !isset( $this->params['range'] ) ) { + try { + $lbFactory = MediaWikiServices::getInstance()->getDBLoadBalancerFactory(); + $lbFactory->waitForReplication( [ + 'wiki' => wfWikiID(), + 'timeout' => self::LAG_WAIT_TIMEOUT + ] ); + } catch ( DBReplicationWaitError $e ) { // only try so hard + $stats = MediaWikiServices::getInstance()->getStatsdDataFactory(); + $stats->increment( 'refreshlinks.lag_wait_failed' ); + } + } + // Carry over information for de-duplication + $extraParams = $this->getRootJobParams(); + $extraParams['triggeredRecursive'] = true; + // Carry over cause information for logging + $extraParams['causeAction'] = $this->params['causeAction']; + $extraParams['causeAgent'] = $this->params['causeAgent']; + // Convert this into no more than $wgUpdateRowsPerJob RefreshLinks per-title + // jobs and possibly a recursive RefreshLinks job for the rest of the backlinks + $jobs = BacklinkJobUtils::partitionBacklinkJob( + $this, + $wgUpdateRowsPerJob, + 1, // job-per-title + [ 'params' => $extraParams ] + ); + JobQueueGroup::singleton()->push( $jobs ); + // Job to update link tables for a set of titles + } elseif ( isset( $this->params['pages'] ) ) { + foreach ( $this->params['pages'] as $nsAndKey ) { + list( $ns, $dbKey ) = $nsAndKey; + $this->runForTitle( Title::makeTitleSafe( $ns, $dbKey ) ); + } + // Job to update link tables for a given title + } else { + $this->runForTitle( $this->title ); + } + + return true; + } + + /** + * @param Title $title + * @return bool + */ + protected function runForTitle( Title $title ) { + $services = MediaWikiServices::getInstance(); + $stats = $services->getStatsdDataFactory(); + $lbFactory = $services->getDBLoadBalancerFactory(); + $ticket = $lbFactory->getEmptyTransactionTicket( __METHOD__ ); + + $page = WikiPage::factory( $title ); + $page->loadPageData( WikiPage::READ_LATEST ); + + // Serialize links updates by page ID so they see each others' changes + $dbw = $lbFactory->getMainLB()->getConnection( DB_MASTER ); + /** @noinspection PhpUnusedLocalVariableInspection */ + $scopedLock = LinksUpdate::acquirePageLock( $dbw, $page->getId(), 'job' ); + // Get the latest ID *after* acquirePageLock() flushed the transaction. + // This is used to detect edits/moves after loadPageData() but before the scope lock. + // The works around the chicken/egg problem of determining the scope lock key. + $latest = $title->getLatestRevID( Title::GAID_FOR_UPDATE ); + + if ( !empty( $this->params['triggeringRevisionId'] ) ) { + // Fetch the specified revision; lockAndGetLatest() below detects if the page + // was edited since and aborts in order to avoid corrupting the link tables + $revision = Revision::newFromId( + $this->params['triggeringRevisionId'], + Revision::READ_LATEST + ); + } else { + // Fetch current revision; READ_LATEST reduces lockAndGetLatest() check failures + $revision = Revision::newFromTitle( $title, false, Revision::READ_LATEST ); + } + + if ( !$revision ) { + $stats->increment( 'refreshlinks.rev_not_found' ); + $this->setLastError( "Revision not found for {$title->getPrefixedDBkey()}" ); + return false; // just deleted? + } elseif ( $revision->getId() != $latest || $revision->getPage() !== $page->getId() ) { + // Do not clobber over newer updates with older ones. If all jobs where FIFO and + // serialized, it would be OK to update links based on older revisions since it + // would eventually get to the latest. Since that is not the case (by design), + // only update the link tables to a state matching the current revision's output. + $stats->increment( 'refreshlinks.rev_not_current' ); + $this->setLastError( "Revision {$revision->getId()} is not current" ); + return false; + } + + $content = $revision->getContent( Revision::RAW ); + if ( !$content ) { + // If there is no content, pretend the content is empty + $content = $revision->getContentHandler()->makeEmptyContent(); + } + + $parserOutput = false; + $parserOptions = $page->makeParserOptions( 'canonical' ); + // If page_touched changed after this root job, then it is likely that + // any views of the pages already resulted in re-parses which are now in + // cache. The cache can be reused to avoid expensive parsing in some cases. + if ( isset( $this->params['rootJobTimestamp'] ) ) { + $opportunistic = !empty( $this->params['isOpportunistic'] ); + + $skewedTimestamp = $this->params['rootJobTimestamp']; + if ( $opportunistic ) { + // Neither clock skew nor DB snapshot/replica DB lag matter much for such + // updates; focus on reusing the (often recently updated) cache + } else { + // For transclusion updates, the template changes must be reflected + $skewedTimestamp = wfTimestamp( TS_MW, + wfTimestamp( TS_UNIX, $skewedTimestamp ) + self::CLOCK_FUDGE + ); + } + + if ( $page->getLinksTimestamp() > $skewedTimestamp ) { + // Something already updated the backlinks since this job was made + $stats->increment( 'refreshlinks.update_skipped' ); + return true; + } + + if ( $page->getTouched() >= $this->params['rootJobTimestamp'] || $opportunistic ) { + // Cache is suspected to be up-to-date. As long as the cache rev ID matches + // and it reflects the job's triggering change, then it is usable. + $parserOutput = $services->getParserCache()->getDirty( $page, $parserOptions ); + if ( !$parserOutput + || $parserOutput->getCacheRevisionId() != $revision->getId() + || $parserOutput->getCacheTime() < $skewedTimestamp + ) { + $parserOutput = false; // too stale + } + } + } + + // Fetch the current revision and parse it if necessary... + if ( $parserOutput ) { + $stats->increment( 'refreshlinks.parser_cached' ); + } else { + $start = microtime( true ); + // Revision ID must be passed to the parser output to get revision variables correct + $parserOutput = $content->getParserOutput( + $title, $revision->getId(), $parserOptions, false ); + $elapsed = microtime( true ) - $start; + // If it took a long time to render, then save this back to the cache to avoid + // wasted CPU by other apaches or job runners. We don't want to always save to + // cache as this can cause high cache I/O and LRU churn when a template changes. + if ( $elapsed >= self::PARSE_THRESHOLD_SEC + && $page->shouldCheckParserCache( $parserOptions, $revision->getId() ) + && $parserOutput->isCacheable() + ) { + $ctime = wfTimestamp( TS_MW, (int)$start ); // cache time + $services->getParserCache()->save( + $parserOutput, $page, $parserOptions, $ctime, $revision->getId() + ); + } + $stats->increment( 'refreshlinks.parser_uncached' ); + } + + $updates = $content->getSecondaryDataUpdates( + $title, + null, + !empty( $this->params['useRecursiveLinksUpdate'] ), + $parserOutput + ); + + // For legacy hook handlers doing updates via LinksUpdateConstructed, make sure + // any pending writes they made get flushed before the doUpdate() calls below. + // This avoids snapshot-clearing errors in LinksUpdate::acquirePageLock(). + $lbFactory->commitAndWaitForReplication( __METHOD__, $ticket ); + + foreach ( $updates as $update ) { + // Carry over cause in case so the update can do extra logging + $update->setCause( $this->params['causeAction'], $this->params['causeAgent'] ); + // FIXME: This code probably shouldn't be here? + // Needed by things like Echo notifications which need + // to know which user caused the links update + if ( $update instanceof LinksUpdate ) { + $update->setRevision( $revision ); + if ( !empty( $this->params['triggeringUser'] ) ) { + $userInfo = $this->params['triggeringUser']; + if ( $userInfo['userId'] ) { + $user = User::newFromId( $userInfo['userId'] ); + } else { + // Anonymous, use the username + $user = User::newFromName( $userInfo['userName'], false ); + } + $update->setTriggeringUser( $user ); + } + } + } + + foreach ( $updates as $update ) { + $update->setTransactionTicket( $ticket ); + $update->doUpdate(); + } + + InfoAction::invalidateCache( $title ); + + // Commit any writes here in case this method is called in a loop. + // In that case, the scoped lock will fail to be acquired. + $lbFactory->commitAndWaitForReplication( __METHOD__, $ticket ); + + return true; + } + + public function getDeduplicationInfo() { + $info = parent::getDeduplicationInfo(); + unset( $info['causeAction'] ); + unset( $info['causeAgent'] ); + if ( is_array( $info['params'] ) ) { + // For per-pages jobs, the job title is that of the template that changed + // (or similar), so remove that since it ruins duplicate detection + if ( isset( $info['params']['pages'] ) ) { + unset( $info['namespace'] ); + unset( $info['title'] ); + } + } + + return $info; + } + + public function workItemCount() { + if ( !empty( $this->params['recursive'] ) ) { + return 0; // nothing actually refreshed + } elseif ( isset( $this->params['pages'] ) ) { + return count( $this->params['pages'] ); + } + + return 1; // one title + } +} diff --git a/www/wiki/includes/jobqueue/jobs/ThumbnailRenderJob.php b/www/wiki/includes/jobqueue/jobs/ThumbnailRenderJob.php new file mode 100644 index 00000000..cf3155d7 --- /dev/null +++ b/www/wiki/includes/jobqueue/jobs/ThumbnailRenderJob.php @@ -0,0 +1,111 @@ +<?php +/** + * Job for asynchronous rendering of thumbnails. + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with this program; if not, write to the Free Software Foundation, Inc., + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + * http://www.gnu.org/copyleft/gpl.html + * + * @file + * @ingroup JobQueue + */ + +/** + * Job for asynchronous rendering of thumbnails. + * + * @ingroup JobQueue + */ +class ThumbnailRenderJob extends Job { + public function __construct( Title $title, array $params ) { + parent::__construct( 'ThumbnailRender', $title, $params ); + } + + public function run() { + global $wgUploadThumbnailRenderMethod; + + $transformParams = $this->params['transformParams']; + + $file = wfLocalFile( $this->title ); + $file->load( File::READ_LATEST ); + + if ( $file && $file->exists() ) { + if ( $wgUploadThumbnailRenderMethod === 'jobqueue' ) { + $thumb = $file->transform( $transformParams, File::RENDER_NOW ); + + if ( $thumb && !$thumb->isError() ) { + return true; + } else { + $this->setLastError( __METHOD__ . ': thumbnail couln\'t be generated' ); + return false; + } + } elseif ( $wgUploadThumbnailRenderMethod === 'http' ) { + $thumbUrl = ''; + $status = $this->hitThumbUrl( $file, $transformParams, $thumbUrl ); + + wfDebug( __METHOD__ . ": received status {$status}\n" ); + + // 400 happens when requesting a size greater or equal than the original + if ( $status === 200 || $status === 301 || $status === 302 || $status === 400 ) { + return true; + } elseif ( $status ) { + $this->setLastError( __METHOD__ . ': incorrect HTTP status ' . + $status . ' when hitting ' . $thumbUrl ); + return false; + } else { + $this->setLastError( __METHOD__ . ': HTTP request failure' ); + return false; + } + } else { + $this->setLastError( __METHOD__ . ': unknown thumbnail render method ' . + $wgUploadThumbnailRenderMethod ); + return false; + } + } else { + $this->setLastError( __METHOD__ . ': file doesn\'t exist' ); + return false; + } + } + + protected function hitThumbUrl( LocalFile $file, $transformParams, &$thumbUrl ) { + global $wgUploadThumbnailRenderHttpCustomHost, $wgUploadThumbnailRenderHttpCustomDomain; + + $thumbName = $file->thumbName( $transformParams ); + $thumbUrl = $file->getThumbUrl( $thumbName ); + + if ( $wgUploadThumbnailRenderHttpCustomDomain ) { + $parsedUrl = wfParseUrl( $thumbUrl ); + + if ( !$parsedUrl || !isset( $parsedUrl['path'] ) || !strlen( $parsedUrl['path'] ) ) { + return false; + } + + $thumbUrl = '//' . $wgUploadThumbnailRenderHttpCustomDomain . $parsedUrl['path']; + } + + wfDebug( __METHOD__ . ": hitting url {$thumbUrl}\n" ); + + $request = MWHttpRequest::factory( $thumbUrl, + [ 'method' => 'HEAD', 'followRedirects' => true ], + __METHOD__ + ); + + if ( $wgUploadThumbnailRenderHttpCustomHost ) { + $request->setHeader( 'Host', $wgUploadThumbnailRenderHttpCustomHost ); + } + + $status = $request->execute(); + + return $request->getStatus(); + } +} diff --git a/www/wiki/includes/jobqueue/jobs/UserGroupExpiryJob.php b/www/wiki/includes/jobqueue/jobs/UserGroupExpiryJob.php new file mode 100644 index 00000000..0945e58f --- /dev/null +++ b/www/wiki/includes/jobqueue/jobs/UserGroupExpiryJob.php @@ -0,0 +1,39 @@ +<?php +/** + * Job that purges expired user group memberships. + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with this program; if not, write to the Free Software Foundation, Inc., + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + * http://www.gnu.org/copyleft/gpl.html + * + * @file + * @ingroup JobQueue + */ + +class UserGroupExpiryJob extends Job { + public function __construct( $params = false ) { + parent::__construct( 'userGroupExpiry', Title::newMainPage(), $params ); + $this->removeDuplicates = true; + } + + /** + * Run the job + * @return bool Success + */ + public function run() { + UserGroupMembership::purgeExpired(); + + return true; + } +} diff --git a/www/wiki/includes/jobqueue/utils/BacklinkJobUtils.php b/www/wiki/includes/jobqueue/utils/BacklinkJobUtils.php new file mode 100644 index 00000000..76f8d6d2 --- /dev/null +++ b/www/wiki/includes/jobqueue/utils/BacklinkJobUtils.php @@ -0,0 +1,149 @@ +<?php +/** + * Job to update links for a given title. + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with this program; if not, write to the Free Software Foundation, Inc., + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + * http://www.gnu.org/copyleft/gpl.html + * + * @file + * @ingroup JobQueue + */ + +/** + * Class with Backlink related Job helper methods + * + * When an asset changes, a base job can be inserted to update all assets that depend on it. + * The base job splits into per-title "leaf" jobs and a "remnant" job to handle the remaining + * range of backlinks. This recurs until the remnant job's backlink range is small enough that + * only leaf jobs are created from it. + * + * For example, if templates A and B are edited (at the same time) the queue will have: + * (A base, B base) + * When these jobs run, the queue will have per-title and remnant partition jobs: + * (titleX,titleY,titleZ,...,A remnant,titleM,titleN,titleO,...,B remnant) + * + * This works best when the queue is FIFO, for several reasons: + * - a) Since the remnant jobs are enqueued after the leaf jobs, the slower leaf jobs have to + * get popped prior to the fast remnant jobs. This avoids flooding the queue with leaf jobs + * for every single backlink of widely used assets (which can be millions). + * - b) Other jobs going in the queue still get a chance to run after a widely used asset changes. + * This is due to the large remnant job pushing to the end of the queue with each division. + * + * The size of the queues used in this manner depend on the number of assets changes and the + * number of workers. Also, with FIFO-per-partition queues, the queue size can be somewhat larger, + * depending on the number of queue partitions. + * + * @ingroup JobQueue + * @since 1.23 + */ +class BacklinkJobUtils { + /** + * Break down $job into approximately ($bSize/$cSize) leaf jobs and a single partition + * job that covers the remaining backlink range (if needed). Jobs for the first $bSize + * titles are collated ($cSize per job) into leaf jobs to do actual work. All the + * resulting jobs are of the same class as $job. No partition job is returned if the + * range covered by $job was less than $bSize, as the leaf jobs have full coverage. + * + * The leaf jobs have the 'pages' param set to a (<page ID>:(<namespace>,<DB key>),...) + * map so that the run() function knows what pages to act on. The leaf jobs will keep + * the same job title as the parent job (e.g. $job). + * + * The partition jobs have the 'range' parameter set to a map of the format + * (start:<integer>, end:<integer>, batchSize:<integer>, subranges:((<start>,<end>),...)), + * the 'table' parameter set to that of $job, and the 'recursive' parameter set to true. + * This method can be called on the resulting job to repeat the process again. + * + * The job provided ($job) must have the 'recursive' parameter set to true and the 'table' + * parameter must be set to a backlink table. The job title will be used as the title to + * find backlinks for. Any 'range' parameter must follow the same format as mentioned above. + * This should be managed by recursive calls to this method. + * + * The first jobs return are always the leaf jobs. This lets the caller use push() to + * put them directly into the queue and works well if the queue is FIFO. In such a queue, + * the leaf jobs have to get finished first before anything can resolve the next partition + * job, which keeps the queue very small. + * + * $opts includes: + * - params : extra job parameters to include in each job + * + * @param Job $job + * @param int $bSize BacklinkCache partition size; usually $wgUpdateRowsPerJob + * @param int $cSize Max titles per leaf job; Usually 1 or a modest value + * @param array $opts Optional parameter map + * @return Job[] List of Job objects + */ + public static function partitionBacklinkJob( Job $job, $bSize, $cSize, $opts = [] ) { + $class = get_class( $job ); + $title = $job->getTitle(); + $params = $job->getParams(); + + if ( isset( $params['pages'] ) || empty( $params['recursive'] ) ) { + $ranges = []; // sanity; this is a leaf node + $realBSize = 0; + wfWarn( __METHOD__ . " called on {$job->getType()} leaf job (explosive recursion)." ); + } elseif ( isset( $params['range'] ) ) { + // This is a range job to trigger the insertion of partitioned/title jobs... + $ranges = $params['range']['subranges']; + $realBSize = $params['range']['batchSize']; + } else { + // This is a base job to trigger the insertion of partitioned jobs... + $ranges = $title->getBacklinkCache()->partition( $params['table'], $bSize ); + $realBSize = $bSize; + } + + $extraParams = isset( $opts['params'] ) ? $opts['params'] : []; + + $jobs = []; + // Combine the first range (of size $bSize) backlinks into leaf jobs + if ( isset( $ranges[0] ) ) { + list( $start, $end ) = $ranges[0]; + $iter = $title->getBacklinkCache()->getLinks( $params['table'], $start, $end ); + $titles = iterator_to_array( $iter ); + /** @var Title[] $titleBatch */ + foreach ( array_chunk( $titles, $cSize ) as $titleBatch ) { + $pages = []; + foreach ( $titleBatch as $tl ) { + $pages[$tl->getArticleID()] = [ $tl->getNamespace(), $tl->getDBkey() ]; + } + $jobs[] = new $class( + $title, // maintain parent job title + [ 'pages' => $pages ] + $extraParams + ); + } + } + // Take all of the remaining ranges and build a partition job from it + if ( isset( $ranges[1] ) ) { + $jobs[] = new $class( + $title, // maintain parent job title + [ + 'recursive' => true, + 'table' => $params['table'], + 'range' => [ + 'start' => $ranges[1][0], + 'end' => $ranges[count( $ranges ) - 1][1], + 'batchSize' => $realBSize, + 'subranges' => array_slice( $ranges, 1 ) + ], + // Track how many times the base job divided for debugging + 'division' => isset( $params['division'] ) + ? ( $params['division'] + 1 ) + : 1 + ] + $extraParams + ); + } + + return $jobs; + } +} diff --git a/www/wiki/includes/jobqueue/utils/PurgeJobUtils.php b/www/wiki/includes/jobqueue/utils/PurgeJobUtils.php new file mode 100644 index 00000000..ba80c8e4 --- /dev/null +++ b/www/wiki/includes/jobqueue/utils/PurgeJobUtils.php @@ -0,0 +1,81 @@ +<?php +/** + * Base code for update jobs that put some secondary data extracted + * from article content into the database. + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with this program; if not, write to the Free Software Foundation, Inc., + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + * http://www.gnu.org/copyleft/gpl.html + * + * @file + */ +use Wikimedia\Rdbms\IDatabase; +use MediaWiki\MediaWikiServices; + +class PurgeJobUtils { + /** + * Invalidate the cache of a list of pages from a single namespace. + * This is intended for use by subclasses. + * + * @param IDatabase $dbw + * @param int $namespace Namespace number + * @param array $dbkeys + */ + public static function invalidatePages( IDatabase $dbw, $namespace, array $dbkeys ) { + if ( $dbkeys === [] ) { + return; + } + + $dbw->onTransactionIdle( + function () use ( $dbw, $namespace, $dbkeys ) { + $services = MediaWikiServices::getInstance(); + $lbFactory = $services->getDBLoadBalancerFactory(); + // Determine which pages need to be updated. + // This is necessary to prevent the job queue from smashing the DB with + // large numbers of concurrent invalidations of the same page. + $now = $dbw->timestamp(); + $ids = $dbw->selectFieldValues( + 'page', + 'page_id', + [ + 'page_namespace' => $namespace, + 'page_title' => $dbkeys, + 'page_touched < ' . $dbw->addQuotes( $now ) + ], + __METHOD__ + ); + + if ( !$ids ) { + return; + } + + $batchSize = $services->getMainConfig()->get( 'UpdateRowsPerQuery' ); + $ticket = $lbFactory->getEmptyTransactionTicket( __METHOD__ ); + foreach ( array_chunk( $ids, $batchSize ) as $idBatch ) { + $dbw->update( + 'page', + [ 'page_touched' => $now ], + [ + 'page_id' => $idBatch, + 'page_touched < ' . $dbw->addQuotes( $now ) // handle races + ], + __METHOD__ + ); + $lbFactory->commitAndWaitForReplication( __METHOD__, $ticket ); + } + }, + __METHOD__ + ); + } +} |