summaryrefslogtreecommitdiff
path: root/www/wiki/extensions/Translate/ttmserver/TTMServerMessageUpdateJob.php
blob: 7a6a91d7b6719f8274a154fa31cb0a26bf314813 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
<?php
/**
 * Contains class with job for updating translation memory.
 *
 * @file
 * @author Niklas Laxström
 * @license GPL-2.0-or-later
 */

use MediaWiki\Logger\LoggerFactory;

/**
 * Job for updating translation memory.
 *
 * job params:
 * - command: the command to run, defaults to 'rebuild'
 * - service: the service to write to, if set to null the job will write
 *   to the default (primary) service and its replicas.
 * - errorCount: number of errors encountered while trying to perform the write
 *   on this service
 *
 * This job handles retries itself and return false in allowRetries to disable
 * JobQueue's internal retry service.
 *
 * If mirroring is activated on the primary service then the first job
 * will try to write to all services, it will resend a new job to
 * every single service that failed and will increment errorCount.
 * When too many errors occur on single service the job is dropped.
 *
 * @ingroup JobQueue
 */
class TTMServerMessageUpdateJob extends Job {
	/**
	 * Number of *retries* allowed, 4 means we attempt
	 * to run the job 5 times (1 initial attempt + 4 retries).
	 */
	const MAX_ERROR_RETRY = 4;

	/**
	 * Constant used by backoffDelay().
	 * With 7 the cumulative delay between the first and last attempt is
	 * between 8 and 33 minutes.
	 */
	const WRITE_BACKOFF_EXPONENT = 7;

	/**
	 * The maximum amount of time jobs delayed due to frozen services can remain
	 * in the job queue.
	 */
	const DROP_DELAYED_JOBS_AFTER = 86400; // 60 * 60 * 24 * 1;

	/**
	 * @param MessageHandle $handle
	 * @param string $command
	 * @return self
	 */
	public static function newJob( MessageHandle $handle, $command ) {
		$job = new self( $handle->getTitle(), [ 'command' => $command ] );

		return $job;
	}

	/**
	 * @param Title $title
	 * @param array $params
	 */
	public function __construct( $title, $params = [] ) {
		parent::__construct(
			__CLASS__,
			$title,
			$params + [
				'command' => 'rebuild',
				'service' => null,
				'errorCount' => 0,
				'createdAt' => time(),
				'retryCount' => 0,
			]
		);
	}

	/**
	 * Fetch all the translations and update them.
	 * @return bool
	 */
	public function run() {
		global $wgTranslateTranslationServices,
			$wgTranslateTranslationDefaultService;

		$service = $this->params['service'];
		$writeToMirrors = false;

		if ( $service === null ) {
			$service = $wgTranslateTranslationDefaultService;
			$writeToMirrors = true;
		}

		if ( !isset( $wgTranslateTranslationServices[$service] ) ) {
			LoggerFactory::getInstance( 'TTMServerUpdates' )->warning(
				'Received update job for a an unknown service {service}.',
				[ 'service' => $service ]
			);
			return true;
		}

		$services = [ $service ];
		if ( $writeToMirrors ) {
			$config = $wgTranslateTranslationServices[$service];
			$server = TTMServer::factory( $config );
			$services = array_unique(
				array_merge( $services, $server->getMirrors() )
			);
		}

		foreach ( $services as $service ) {
			$this->runCommandWithRetry( $service );
		}
		return true;
	}

	/**
	 * @inheritDoc
	 */
	public function allowRetries() {
		return false;
	}

	/**
	 * Run the update on the specified service name.
	 *
	 * @param string $serviceName the service name
	 */
	private function runCommandWithRetry( $serviceName ) {
		global $wgTranslateTranslationServices;

		if ( !isset( $wgTranslateTranslationServices[$serviceName] ) ) {
			LoggerFactory::getInstance( 'TTMServerUpdates' )->warning(
				'Cannot write to {service}: service is unknown.',
				[ 'service' => $serviceName ]
			);
			return;
		}
		$ttmserver = TTMServer::factory( $wgTranslateTranslationServices[$serviceName] );

		if ( $serviceName === null || !( $ttmserver instanceof WritableTTMServer ) ) {
			LoggerFactory::getInstance( 'TTMServerUpdates' )->warning(
				'Received update job for a service that does not implement ' .
				'WritableTTMServer, please check config for {service}.',
				[ 'service' => $serviceName ]
			);
			return;
		}

		try {
			if ( $ttmserver->isFrozen() ) {
				$this->requeueRetry( $serviceName );
			} else {
				$this->runCommand( $ttmserver );
			}
		} catch ( \Exception $e ) {
			$this->requeueError( $serviceName, $e );
		}
	}

	/**
	 * @param string $serviceName the service in error
	 * @param Exception $e the error
	 */
	private function requeueError( $serviceName, $e ) {
		LoggerFactory::getInstance( 'TTMServerUpdates' )->warning(
			'Exception thrown while running {command} on ' .
			'service {service}: {errorMessage}',
			[
				'command' => $this->params['command'],
				'service' => $serviceName,
				'errorMessage' => $e->getMessage(),
				'exception' => $e,
			]
		);
		if ( $this->params['errorCount'] >= self::MAX_ERROR_RETRY ) {
			LoggerFactory::getInstance( 'TTMServerUpdates' )->warning(
				'Dropping failing job {command} for service {service} ' .
				'after repeated failure',
				[
					'command' => $this->params['command'],
					'service' => $serviceName,
				]
			);
			return;
		}

		$delay = self::backoffDelay( $this->params['errorCount'] );
		$job = clone $this;
		$job->params['errorCount']++;
		$job->params['service'] = $serviceName;
		$job->setDelay( $delay );
		LoggerFactory::getInstance( 'TTMServerUpdates' )->info(
			'Update job reported failure on service {service}. ' .
			'Requeueing job with delay of {delay}.',
			[
				'service' => $serviceName,
				'delay' => $delay
			]
		);
		$this->resend( $job );
	}

	/**
	 * Re-queue job that is frozen, or drop the job if it has
	 * been frozen for too long.
	 *
	 * @param string $serviceName
	 */
	private function requeueRetry( $serviceName ) {
		$diff = time() - $this->params['createdAt'];
		$dropTimeout = self::DROP_DELAYED_JOBS_AFTER;
		if ( $diff > $dropTimeout ) {
			LoggerFactory::getInstance( 'TTMServerUpdates' )->warning(
				'Dropping delayed job {command} for service {service} ' .
				'after waiting {diff}s',
				[
					'command' => $this->params['command'],
					'service' => $serviceName,
					'diff' => $diff,
				]
			);
		} else {
			$delay = self::backoffDelay( $this->params['retryCount'] );
			$job = clone $this;
			$job->params['retryCount']++;
			$job->params['service'] = $serviceName;
			$job->setDelay( $delay );
			LoggerFactory::getInstance( 'TTMServerUpdates' )->debug(
				'Service {service} reported frozen. ' .
				'Requeueing job with delay of {delay}s',
				[
					'service' => $serviceName,
					'delay' => $delay
				]
			);
			$this->resend( $job );
		}
	}

	/**
	 * Extracted for testing purpose
	 * @param self $job
	 */
	protected function resend( self $job ) {
		JobQueueGroup::singleton()->push( $job );
	}

	private function runCommand( WritableTTMServer $ttmserver ) {
		$handle = $this->getHandle();
		$command = $this->params['command'];

		if ( $command === 'delete' ) {
			$this->updateItem( $ttmserver, $handle, null, false );
		} elseif ( $command === 'rebuild' ) {
			$this->updateMessage( $ttmserver, $handle );
		} elseif ( $command === 'refresh' ) {
			$this->updateTranslation( $ttmserver, $handle );
		}
	}

	/**
	 * Extracted for testing purpose
	 *
	 * @return MessageHandle
	 */
	protected function getHandle() {
		return new MessageHandle( $this->title );
	}

	/**
	 * Extracted for testing purpose
	 *
	 * @param MessageHandle $handle
	 * @return string
	 */
	protected function getTranslation( MessageHandle $handle ) {
		return TranslateUtils::getMessageContent(
			$handle->getKey(),
			$handle->getCode(),
			$handle->getTitle()->getNamespace()
		);
	}

	private function updateMessage( WritableTTMServer $ttmserver, MessageHandle $handle ) {
		// Base page update, e.g. group change. Update everything.
		$translations = ApiQueryMessageTranslations::getTranslations( $handle );
		foreach ( $translations as $page => $data ) {
			$tTitle = Title::makeTitle( $this->title->getNamespace(), $page );
			$tHandle = new MessageHandle( $tTitle );
			$this->updateItem( $ttmserver, $tHandle, $data[0], $tHandle->isFuzzy() );
		}
	}

	private function updateTranslation( WritableTTMServer $ttmserver, MessageHandle $handle ) {
		// Update only this translation
		$translation = $this->getTranslation( $handle );
		$this->updateItem( $ttmserver, $handle, $translation, $handle->isFuzzy() );
	}

	private function updateItem( WritableTTMServer $ttmserver, MessageHandle $handle, $text, $fuzzy ) {
		if ( $fuzzy ) {
			$text = null;
		}
		$ttmserver->update( $handle, $text );
	}

	/**
	 * Set a delay for this job. Note that this might not be possible, the JobQueue
	 * implementation handling this job doesn't support it (JobQueueDB) but is possible
	 * for the high performance JobQueueRedis. Note also that delays are minimums -
	 * at least JobQueueRedis makes no effort to remove the delay as soon as possible
	 * after it has expired. By default it only checks every five minutes or so.
	 * Note yet again that if another delay has been set that is longer then this one
	 * then the _longer_ delay stays.
	 *
	 * @param int $delay seconds to delay this job if possible
	 */
	public function setDelay( $delay ) {
		$jobQueue = JobQueueGroup::singleton()->get( $this->getType() );
		if ( !$delay || !$jobQueue->delayedJobsEnabled() ) {
			return;
		}
		$oldTime = $this->getReleaseTimestamp();
		$newTime = time() + $delay;
		if ( $oldTime !== null && $oldTime >= $newTime ) {
			return;
		}
		$this->params[ 'jobReleaseTimestamp' ] = $newTime;
	}

	/**
	 * @param int $retryCount The number of times the job has errored out.
	 * @return int Number of seconds to delay. With the default minimum exponent
	 * of 6 the possible return values are 64, 128, 256, 512 and 1024 giving a
	 * maximum delay of 17 minutes.
	 */
	public static function backoffDelay( $retryCount ) {
		return ceil( pow(
			2,
			static::WRITE_BACKOFF_EXPONENT + rand( 0, min( $retryCount, 4 ) )
		) );
	}
}