summaryrefslogtreecommitdiff
path: root/www/wiki/extensions/SemanticMediaWiki/src/Elastic/Indexer/FileIngestJob.php
blob: dfa432982c20a3b9877cc934f375b91b278ff012 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
<?php

namespace SMW\Elastic\Indexer;

use SMW\ApplicationFactory;
use SMW\MediaWiki\Job;
use SMW\Elastic\ElasticFactory;
use SMW\Elastic\Connection\Client as ElasticClient;
use SMW\SQLStore\ChangeOp\ChangeDiff;
use SMW\DIWikiPage;
use Title;

/**
 * @license GNU GPL v2
 * @since 3.0
 *
 * @author mwjames
 */
class FileIngestJob extends Job {

	/**
	 * @since 3.0
	 *
	 * @param Title $title
	 * @param array $params job parameters
	 */
	public function __construct( Title $title, $params = [] ) {
		parent::__construct( 'smw.elasticFileIngest', $title, $params );
		$this->removeDuplicates = true;
	}

	/**
	 * @see Job::run
	 *
	 * @since  3.0
	 */
	public function run() {

		$applicationFactory = ApplicationFactory::getInstance();
		$store = $applicationFactory->getStore();

		$connection = $store->getConnection( 'elastic' );

		// Make sure a node is available
		if ( $connection->hasLock( ElasticClient::TYPE_DATA ) || !$connection->ping() ) {

			if ( $connection->hasLock( ElasticClient::TYPE_DATA ) ) {
				$this->params['retryCount'] = 0;
			}

			return $this->requeueRetry( $connection->getConfig() );
		}

		$elasticFactory = new ElasticFactory();

		$indexer = $elasticFactory->newIndexer(
			$store
		);

		$fileIndexer = $indexer->getFileIndexer();

		$fileIndexer->setOrigin( __METHOD__ );

		$fileIndexer->setLogger(
			$applicationFactory->getMediaWikiLogger( 'smw-elastic' )
		);

		$file = wfFindFile( $this->getTitle() );

		// File isn't available yet (or uploaded), try again!
		if ( $file === false ) {
			return $this->requeueRetry( $connection->getConfig() );
		}

		// It has been observed that when this job is run, the job runner can
		// return with "Fatal error: Allowed memory size of ..." which in most
		// cases happen when large files are involved therefore temporary lift
		// the limitation!
		$memory_limit = ini_get( 'memory_limit' );

		if ( wfShorthandToInteger( $memory_limit ) < wfShorthandToInteger( '1024M' ) ) {
			ini_set( 'memory_limit', '1024M' );
		}

		$fileIndexer->index(
			DIWikiPage::newFromTitle( $this->getTitle() ),
			$file
		);

		ini_set( 'memory_limit', $memory_limit );

		return true;
	}

	private function requeueRetry( $config ) {

		// Give up!
		if ( $this->getParameter( 'retryCount' ) >= $config->dotGet( 'indexer.job.file.ingest.retries' ) ) {
			return true;
		}

		if ( !isset( $this->params['retryCount'] ) ) {
			$this->params['retryCount'] = 1;
		} else {
			$this->params['retryCount']++;
		}

		if ( !isset( $this->params['createdAt'] ) ) {
			$this->params['createdAt'] = time();
		}

		$job = new self( $this->title, $this->params );
		$job->setDelay( 60 * 10 );

		$job->insert();
	}

}