summaryrefslogtreecommitdiff
path: root/www/wiki/extensions/SemanticMediaWiki/src/MediaWiki/Jobs/UpdateDispatcherJob.php
blob: 8022830f7ded9d27ac2df56216d5805fae84db4e (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
<?php

namespace SMW\MediaWiki\Jobs;

use Hooks;
use SMW\MediaWiki\Job;
use SMW\ApplicationFactory;
use SMW\DIProperty;
use SMW\DIWikiPage;
use SMW\DataTypeRegistry;
use SMW\RequestOptions;
use SMW\Enum;
use SMW\Exception\DataItemDeserializationException;
use SMWDataItem as DataItem;
use Title;

/**
 * Dispatcher to find and create individual UpdateJob instances for a specific
 * subject and its linked entities.
 *
 * @license GNU GPL v2+
 * @since 1.9
 *
 * @author mwjames
 */
class UpdateDispatcherJob extends Job {

	/**
	 * Restrict dispatch process to an available pool of data
	 */
	const RESTRICTED_DISPATCH_POOL = 'restricted.disp.pool';

	/**
	 * Parameter for the secondary run to contain a list of update jobs to be
	 * inserted at once.
	 */
	const JOB_LIST = 'job-list';

	/**
	 * Size of chunks used when invoking the secondary dispatch run
	 */
	const CHUNK_SIZE = 500;

	/**
	 * @since  1.9
	 *
	 * @param Title $title
	 * @param array $params job parameters
	 * @param integer $id job id
	 */
	public function __construct( Title $title, $params = [], $id = 0 ) {
		parent::__construct( 'smw.updateDispatcher', $title, $params, $id );
		$this->removeDuplicates = true;
	}

	/**
	 * @see Job::run
	 *
	 * @since  1.9
	 *
	 * @return boolean
	 */
	public function run() {

		$this->initServices();

		/**
		 * Retrieved a job list (most likely from a secondary dispatch run) and
		 * push each list entry into the job queue to spread the work independently
		 * from the actual dispatch process.
		 */
		if ( $this->hasParameter( self::JOB_LIST ) ) {
			return $this->push_jobs_from_list( $this->getParameter( self::JOB_LIST ) );
		}

		/**
		 * Using an entity ID to initiate some work (which if send from the DELETE
		 * will have no valid ID_TABLE reference by the time this job is run) on
		 * some secondary tables.
		 */
		if ( $this->hasParameter( '_id' ) ) {
			$this->dispatch_by_id( $this->getParameter( '_id' ) );
		}

		if ( $this->getTitle()->getNamespace() === SMW_NS_PROPERTY ) {
			$this->dispatchUpdateForProperty(
				DIProperty::newFromUserLabel( $this->getTitle()->getText() )
			);

			$this->jobs[] = DIWikiPage::newFromTitle( $this->getTitle() )->getHash();
		} else {
			$this->dispatchUpdateForSubject(
				DIWikiPage::newFromTitle( $this->getTitle() )
			);
		}

		/**
		 * Create a secondary run by pushing collected jobs into a chunked queue
		 */
		if ( $this->jobs !== [] ) {
			$this->create_secondary_dispatch_run( $this->jobs );
		}

		Hooks::run( 'SMW::Job::AfterUpdateDispatcherJobComplete', [ $this ] );

		return true;
	}

	private function initServices() {

		$applicationFactory = ApplicationFactory::getInstance();
		$this->setStore( $applicationFactory->getStore() );

		$this->serializerFactory = $applicationFactory->newSerializerFactory();

		$this->isEnabledJobQueue(
			$applicationFactory->getSettings()->get( 'smwgEnableUpdateJobs' )
		);
	}

	private function dispatch_by_id( $id ) {

		$applicationFactory = ApplicationFactory::getInstance();
		$queryDependencyLinksStoreFactory = $applicationFactory->singleton( 'QueryDependencyLinksStoreFactory' );

		$queryDependencyLinksStore = $queryDependencyLinksStoreFactory->newQueryDependencyLinksStore(
			$applicationFactory->getStore()
		);

		$count = $queryDependencyLinksStore->countDependencies(
			$id
		);

		if ( $count === 0 ) {
			return;
		}

		$requestOptions = new RequestOptions();
		$requestOptions->setLimit(
			$count
		);

		$dependencyTargetLinks = $queryDependencyLinksStore->findDependencyTargetLinks(
			[ $id ],
			$requestOptions
		);

		foreach ( $dependencyTargetLinks as $targetLink ) {
			list( $title, $namespace, $iw, $subobjectname ) = explode( '#', $targetLink, 4 );

			// @see DIWikiPage::doUnserialize
			if ( !isset( $this->jobs[( $title . '#' . $namespace . '#' . $iw . '#' )] ) ) {
				$this->jobs[( $title . '#' . $namespace . '#' . $iw . '#' )] = true;
			}
		}
	}

	private function create_secondary_dispatch_run( $jobs ) {

		$origin = $this->getTitle()->getPrefixedText();

		foreach ( array_chunk( $jobs, self::CHUNK_SIZE, true ) as $jobList ) {
			$job = new self(
				Title::newFromText( 'UpdateDispatcher/SecondaryRun/' . md5( json_encode( $jobList ) ) ),
				[
					self::JOB_LIST => $jobList,
					'origin' => $origin,

					// We expect entities to exists that are send through the
					// dispatch to avoid creating "dead" ids on non existing (or
					// already deleted) entities
					'check_exists' => true
				]
			);

			$job->insert();
		}
	}

	private function dispatchUpdateForSubject( DIWikiPage $subject ) {

		if ( $this->getParameter( self::RESTRICTED_DISPATCH_POOL ) !== true ) {
			$this->addUpdateJobsForProperties(
				$this->store->getProperties( $subject )
			);

			$this->addUpdateJobsForProperties(
				$this->store->getInProperties( $subject )
			);
		}

		$this->addUpdateJobsFromDeserializedSemanticData();
	}

	private function dispatchUpdateForProperty( DIProperty $property ) {
		$this->addUpdateJobsForProperties( [ $property ] );
		$this->addUpdateJobsForSubjectsThatContainTypeError();
		$this->addUpdateJobsFromDeserializedSemanticData();
	}

	private function addUpdateJobsForProperties( array $properties ) {
		foreach ( $properties as $property ) {

			if ( !$property->isUserDefined() ) {
				continue;
			}

			// Before doing some work, make sure to only use page type properties
			// as a means to generate a resource (job) action
			$type = DataTypeRegistry::getInstance()->getDataItemByType(
				$property->findPropertyTypeId()
			);

			if ( $type !== DataItem::TYPE_WIKIPAGE ) {
				continue;
			}

			$requestOptions = new RequestOptions();

			// No need for a warmup since we want to keep the iterator for as
			// long as possible to only access one item at a time
			$requestOptions->setOption( Enum::SUSPEND_CACHE_WARMUP, true );

			// If we have an ID then use it to restrict the range of mactches
			// against that object reference (aka `o_id`). Of course, in case of
			// a delete action it is required that the disposer job (that removes
			// all pending references from any active table for that reference)
			// is called only after the job queue has been cleared otherwise
			// the `o_id` can no longer be a matchable ID.
			if ( $this->hasParameter( '_id' ) ) {
				$requestOptions->addExtraCondition( [ 'o_id' => $this->getParameter( '_id' ) ] );
			}

			// Best effort to find all entities to a selected property
			$subjects = $this->store->getAllPropertySubjects( $property, $requestOptions );

			$this->add_job(
				$this->apply_filter( $property, $subjects )
			);
		}
	}

	private function apply_filter( $property, $subjects ) {

		// If the an ID was provided it already restricted the list of references
		// hence avoid any further work
		if ( $this->hasParameter( '_id' ) ) {
			return $subjects;
		}

		if ( $this->getParameter( self::RESTRICTED_DISPATCH_POOL ) !== true ) {
			return $subjects;
		}

		$list = [];

		// Identify the source as base for a comparison
		$source = DIWikiPage::newFromTitle( $this->getTitle() );

		foreach ( $subjects as $subject ) {

			// #3322
			// Investigate which subjects have an actual connection to the
			// subject
			$dataItems = $this->store->getPropertyValues( $subject, $property );

			foreach ( $dataItems as $dataItem ) {
				// Make a judgment based on a literal comparison for the
				// values assigned and the now deleted entity
				if ( $dataItem instanceof DIWikiPage && $dataItem->equals( $source ) ) {
					$list[] = $subject;
				}
			}
		}

		return $list;
	}

	private function addUpdateJobsForSubjectsThatContainTypeError() {

		$subjects = $this->store->getPropertySubjects(
			new DIProperty( DIProperty::TYPE_ERROR ),
			DIWikiPage::newFromTitle( $this->getTitle() )
		);

		$this->add_job(
			$subjects
		);
	}

	private function addUpdateJobsFromDeserializedSemanticData() {

		if ( !$this->hasParameter( 'semanticData' ) ) {
			return;
		}

		$semanticData = $this->serializerFactory->newSemanticDataDeserializer()->deserialize(
			$this->getParameter( 'semanticData' )
		);

		$this->addUpdateJobsForProperties(
			$semanticData->getProperties()
		);
	}

	private function add_job( $subjects = [] ) {

		foreach ( $subjects as $subject ) {

			// Not trying to get the title here as it is waste of resources
			// as makeTitleSafe is expensive for large lists
			// $title = $subject->getTitle();

			if ( !$subject instanceof DIWikiPage ) {
				continue;
			}

			// Do not use the full subject as hash as we don't care about subobjects
			// since the root subject is enough to update all related subobjects
			// The format is the same as expected by DIWikiPage::doUnserialize
			$hash = $subject->getDBKey() . '#' . $subject->getNamespace() . '#' . $subject->getInterwiki() . '#';

			if ( !isset( $this->jobs[$hash] ) ) {
				$this->jobs[$hash] = true;
			}
		}
	}

	private function push_jobs_from_list( array $subjects ) {

		$check_exists = $this->getParameter( 'check_exists', false );

		$parameters = [
			UpdateJob::FORCED_UPDATE => true,
			'origin' => $this->getParameter( 'origin', 'UpdateDispatcherJob' )
		];

		// We expect non-duplicate subjects in the list and therefore deserialize
		// without any extra validation
		foreach ( $subjects as $key => $subject ) {

			if ( is_string( $key ) ) {
				$subject = $key;
			}

			try {
				$subject = DIWikiPage::doUnserialize( $subject );
			} catch( DataItemDeserializationException $e ) {
				continue;
			}

			if ( $check_exists && !$this->store->getObjectIds()->exists( $subject ) ) {
				continue;
			}

			if ( ( $title = $subject->getTitle() ) === null ) {
				continue;
			}

			$this->jobs[] = new UpdateJob( $title, $parameters );
		}

		$this->pushToJobQueue();

		return true;
	}

}