summaryrefslogtreecommitdiff
path: root/www/wiki/includes/deferred/DeferredUpdates.php
blob: 9b25d53820bbc67d8543fad42cb292ddcd1b4ea8 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
<?php
/**
 * Interface and manager for deferred updates.
 *
 * This program is free software; you can redistribute it and/or modify
 * it under the terms of the GNU General Public License as published by
 * the Free Software Foundation; either version 2 of the License, or
 * (at your option) any later version.
 *
 * This program is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
 * GNU General Public License for more details.
 *
 * You should have received a copy of the GNU General Public License along
 * with this program; if not, write to the Free Software Foundation, Inc.,
 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
 * http://www.gnu.org/copyleft/gpl.html
 *
 * @file
 */
use Wikimedia\Rdbms\IDatabase;
use MediaWiki\MediaWikiServices;
use Wikimedia\Rdbms\LBFactory;
use Wikimedia\Rdbms\LoadBalancer;

/**
 * Class for managing the deferred updates
 *
 * In web request mode, deferred updates can be run at the end of the request, either before or
 * after the HTTP response has been sent. In either case, they run after the DB commit step. If
 * an update runs after the response is sent, it will not block clients. If sent before, it will
 * run synchronously. These two modes are defined via PRESEND and POSTSEND constants, the latter
 * being the default for addUpdate() and addCallableUpdate().
 *
 * Updates that work through this system will be more likely to complete by the time the client
 * makes their next request after this one than with the JobQueue system.
 *
 * In CLI mode, updates run immediately if no DB writes are pending. Otherwise, they run when:
 *   - a) Any waitForReplication() call if no writes are pending on any DB
 *   - b) A commit happens on Maintenance::getDB( DB_MASTER ) if no writes are pending on any DB
 *   - c) EnqueueableDataUpdate tasks may enqueue on commit of Maintenance::getDB( DB_MASTER )
 *   - d) At the completion of Maintenance::execute()
 *
 * When updates are deferred, they go into one two FIFO "top-queues" (one for pre-send and one
 * for post-send). Updates enqueued *during* doUpdate() of a "top" update go into the "sub-queue"
 * for that update. After that method finishes, the sub-queue is run until drained. This continues
 * for each top-queue job until the entire top queue is drained. This happens for the pre-send
 * top-queue, and later on, the post-send top-queue, in execute().
 *
 * @since 1.19
 */
class DeferredUpdates {
	/** @var DeferrableUpdate[] Updates to be deferred until before request end */
	private static $preSendUpdates = [];
	/** @var DeferrableUpdate[] Updates to be deferred until after request end */
	private static $postSendUpdates = [];

	const ALL = 0; // all updates; in web requests, use only after flushing the output buffer
	const PRESEND = 1; // for updates that should run before flushing output buffer
	const POSTSEND = 2; // for updates that should run after flushing output buffer

	const BIG_QUEUE_SIZE = 100;

	/** @var array|null Information about the current execute() call or null if not running */
	private static $executeContext;

	/**
	 * Add an update to the deferred list to be run later by execute()
	 *
	 * In CLI mode, callback magic will also be used to run updates when safe
	 *
	 * @param DeferrableUpdate $update Some object that implements doUpdate()
	 * @param int $stage DeferredUpdates constant (PRESEND or POSTSEND) (since 1.27)
	 */
	public static function addUpdate( DeferrableUpdate $update, $stage = self::POSTSEND ) {
		global $wgCommandLineMode;

		if ( self::$executeContext && self::$executeContext['stage'] >= $stage ) {
			// This is a sub-DeferredUpdate; run it right after its parent update.
			// Also, while post-send updates are running, push any "pre-send" jobs to the
			// active post-send queue to make sure they get run this round (or at all).
			self::$executeContext['subqueue'][] = $update;

			return;
		}

		if ( $stage === self::PRESEND ) {
			self::push( self::$preSendUpdates, $update );
		} else {
			self::push( self::$postSendUpdates, $update );
		}

		// Try to run the updates now if in CLI mode and no transaction is active.
		// This covers scripts that don't/barely use the DB but make updates to other stores.
		if ( $wgCommandLineMode ) {
			self::tryOpportunisticExecute( 'run' );
		}
	}

	/**
	 * Add a callable update. In a lot of cases, we just need a callback/closure,
	 * defining a new DeferrableUpdate object is not necessary
	 *
	 * @see MWCallableUpdate::__construct()
	 *
	 * @param callable $callable
	 * @param int $stage DeferredUpdates constant (PRESEND or POSTSEND) (since 1.27)
	 * @param IDatabase|IDatabase[]|null $dbw Abort if this DB is rolled back [optional] (since 1.28)
	 */
	public static function addCallableUpdate(
		$callable, $stage = self::POSTSEND, $dbw = null
	) {
		self::addUpdate( new MWCallableUpdate( $callable, wfGetCaller(), $dbw ), $stage );
	}

	/**
	 * Do any deferred updates and clear the list
	 *
	 * @param string $mode Use "enqueue" to use the job queue when possible [Default: "run"]
	 * @param int $stage DeferredUpdates constant (PRESEND, POSTSEND, or ALL) (since 1.27)
	 */
	public static function doUpdates( $mode = 'run', $stage = self::ALL ) {
		$stageEffective = ( $stage === self::ALL ) ? self::POSTSEND : $stage;

		if ( $stage === self::ALL || $stage === self::PRESEND ) {
			self::execute( self::$preSendUpdates, $mode, $stageEffective );
		}

		if ( $stage === self::ALL || $stage == self::POSTSEND ) {
			self::execute( self::$postSendUpdates, $mode, $stageEffective );
		}
	}

	/**
	 * @param bool $value Whether to just immediately run updates in addUpdate()
	 * @since 1.28
	 * @deprecated 1.29 Causes issues in Web-executed jobs - see T165714 and T100085.
	 */
	public static function setImmediateMode( $value ) {
		wfDeprecated( __METHOD__, '1.29' );
	}

	/**
	 * @param DeferrableUpdate[] $queue
	 * @param DeferrableUpdate $update
	 */
	private static function push( array &$queue, DeferrableUpdate $update ) {
		if ( $update instanceof MergeableUpdate ) {
			$class = get_class( $update ); // fully-qualified class
			if ( isset( $queue[$class] ) ) {
				/** @var MergeableUpdate $existingUpdate */
				$existingUpdate = $queue[$class];
				$existingUpdate->merge( $update );
			} else {
				$queue[$class] = $update;
			}
		} else {
			$queue[] = $update;
		}
	}

	/**
	 * Immediately run/queue a list of updates
	 *
	 * @param DeferrableUpdate[] &$queue List of DeferrableUpdate objects
	 * @param string $mode Use "enqueue" to use the job queue when possible
	 * @param int $stage Class constant (PRESEND, POSTSEND) (since 1.28)
	 * @throws ErrorPageError Happens on top-level calls
	 * @throws Exception Happens on second-level calls
	 */
	protected static function execute( array &$queue, $mode, $stage ) {
		$services = MediaWikiServices::getInstance();
		$stats = $services->getStatsdDataFactory();
		$lbFactory = $services->getDBLoadBalancerFactory();
		$method = RequestContext::getMain()->getRequest()->getMethod();

		$ticket = $lbFactory->getEmptyTransactionTicket( __METHOD__ );

		/** @var ErrorPageError $reportableError */
		$reportableError = null;
		/** @var DeferrableUpdate[] $updates Snapshot of queue */
		$updates = $queue;

		// Keep doing rounds of updates until none get enqueued...
		while ( $updates ) {
			$queue = []; // clear the queue

			// Order will be DataUpdate followed by generic DeferrableUpdate tasks
			$updatesByType = [ 'data' => [], 'generic' => [] ];
			foreach ( $updates as $du ) {
				if ( $du instanceof DataUpdate ) {
					$du->setTransactionTicket( $ticket );
					$updatesByType['data'][] = $du;
				} else {
					$updatesByType['generic'][] = $du;
				}

				$name = ( $du instanceof DeferrableCallback )
					? get_class( $du ) . '-' . $du->getOrigin()
					: get_class( $du );
				$stats->increment( 'deferred_updates.' . $method . '.' . $name );
			}

			// Execute all remaining tasks...
			foreach ( $updatesByType as $updatesForType ) {
				foreach ( $updatesForType as $update ) {
					self::$executeContext = [ 'stage' => $stage, 'subqueue' => [] ];
					/** @var DeferrableUpdate $update */
					$guiError = self::runUpdate( $update, $lbFactory, $mode, $stage );
					$reportableError = $reportableError ?: $guiError;
					// Do the subqueue updates for $update until there are none
					while ( self::$executeContext['subqueue'] ) {
						$subUpdate = reset( self::$executeContext['subqueue'] );
						$firstKey = key( self::$executeContext['subqueue'] );
						unset( self::$executeContext['subqueue'][$firstKey] );

						if ( $subUpdate instanceof DataUpdate ) {
							$subUpdate->setTransactionTicket( $ticket );
						}

						$guiError = self::runUpdate( $subUpdate, $lbFactory, $mode, $stage );
						$reportableError = $reportableError ?: $guiError;
					}
					self::$executeContext = null;
				}
			}

			$updates = $queue; // new snapshot of queue (check for new entries)
		}

		if ( $reportableError ) {
			throw $reportableError; // throw the first of any GUI errors
		}
	}

	/**
	 * @param DeferrableUpdate $update
	 * @param LBFactory $lbFactory
	 * @param string $mode
	 * @param int $stage
	 * @return ErrorPageError|null
	 */
	private static function runUpdate(
		DeferrableUpdate $update, LBFactory $lbFactory, $mode, $stage
	) {
		$guiError = null;
		try {
			if ( $mode === 'enqueue' && $update instanceof EnqueueableDataUpdate ) {
				// Run only the job enqueue logic to complete the update later
				$spec = $update->getAsJobSpecification();
				JobQueueGroup::singleton( $spec['wiki'] )->push( $spec['job'] );
			} elseif ( $update instanceof TransactionRoundDefiningUpdate ) {
				$update->doUpdate();
			} else {
				// Run the bulk of the update now
				$fnameTrxOwner = get_class( $update ) . '::doUpdate';
				$lbFactory->beginMasterChanges( $fnameTrxOwner );
				$update->doUpdate();
				$lbFactory->commitMasterChanges( $fnameTrxOwner );
			}
		} catch ( Exception $e ) {
			// Reporting GUI exceptions does not work post-send
			if ( $e instanceof ErrorPageError && $stage === self::PRESEND ) {
				$guiError = $e;
			}
			MWExceptionHandler::rollbackMasterChangesAndLog( $e );
		}

		return $guiError;
	}

	/**
	 * Run all deferred updates immediately if there are no DB writes active
	 *
	 * If $mode is 'run' but there are busy databates, EnqueueableDataUpdate
	 * tasks will be enqueued anyway for the sake of progress.
	 *
	 * @param string $mode Use "enqueue" to use the job queue when possible
	 * @return bool Whether updates were allowed to run
	 * @since 1.28
	 */
	public static function tryOpportunisticExecute( $mode = 'run' ) {
		// execute() loop is already running
		if ( self::$executeContext ) {
			return false;
		}

		// Avoiding running updates without them having outer scope
		if ( !self::areDatabaseTransactionsActive() ) {
			self::doUpdates( $mode );
			return true;
		}

		if ( self::pendingUpdatesCount() >= self::BIG_QUEUE_SIZE ) {
			// If we cannot run the updates with outer transaction context, try to
			// at least enqueue all the updates that support queueing to job queue
			self::$preSendUpdates = self::enqueueUpdates( self::$preSendUpdates );
			self::$postSendUpdates = self::enqueueUpdates( self::$postSendUpdates );
		}

		return !self::pendingUpdatesCount();
	}

	/**
	 * Enqueue a job for each EnqueueableDataUpdate item and return the other items
	 *
	 * @param DeferrableUpdate[] $updates A list of deferred update instances
	 * @return DeferrableUpdate[] Remaining updates that do not support being queued
	 */
	private static function enqueueUpdates( array $updates ) {
		$remaining = [];

		foreach ( $updates as $update ) {
			if ( $update instanceof EnqueueableDataUpdate ) {
				$spec = $update->getAsJobSpecification();
				JobQueueGroup::singleton( $spec['wiki'] )->push( $spec['job'] );
			} else {
				$remaining[] = $update;
			}
		}

		return $remaining;
	}

	/**
	 * @return int Number of enqueued updates
	 * @since 1.28
	 */
	public static function pendingUpdatesCount() {
		return count( self::$preSendUpdates ) + count( self::$postSendUpdates );
	}

	/**
	 * @param int $stage DeferredUpdates constant (PRESEND, POSTSEND, or ALL)
	 * @return DeferrableUpdate[]
	 * @since 1.29
	 */
	public static function getPendingUpdates( $stage = self::ALL ) {
		$updates = [];
		if ( $stage === self::ALL || $stage === self::PRESEND ) {
			$updates = array_merge( $updates, self::$preSendUpdates );
		}
		if ( $stage === self::ALL || $stage === self::POSTSEND ) {
			$updates = array_merge( $updates, self::$postSendUpdates );
		}
		return $updates;
	}

	/**
	 * Clear all pending updates without performing them. Generally, you don't
	 * want or need to call this. Unit tests need it though.
	 */
	public static function clearPendingUpdates() {
		self::$preSendUpdates = [];
		self::$postSendUpdates = [];
	}

	/**
	 * @return bool If a transaction round is active or connection is not ready for commit()
	 */
	private static function areDatabaseTransactionsActive() {
		$lbFactory = MediaWikiServices::getInstance()->getDBLoadBalancerFactory();
		if ( $lbFactory->hasTransactionRound() ) {
			return true;
		}

		$connsBusy = false;
		$lbFactory->forEachLB( function ( LoadBalancer $lb ) use ( &$connsBusy ) {
			$lb->forEachOpenMasterConnection( function ( IDatabase $conn ) use ( &$connsBusy ) {
				if ( $conn->writesOrCallbacksPending() || $conn->explicitTrxActive() ) {
					$connsBusy = true;
				}
			} );
		} );

		return $connsBusy;
	}
}