diff options
Diffstat (limited to 'www/wiki/extensions/SemanticMediaWiki/src/Elastic/Indexer/FileIngestJob.php')
-rw-r--r-- | www/wiki/extensions/SemanticMediaWiki/src/Elastic/Indexer/FileIngestJob.php | 118 |
1 files changed, 118 insertions, 0 deletions
diff --git a/www/wiki/extensions/SemanticMediaWiki/src/Elastic/Indexer/FileIngestJob.php b/www/wiki/extensions/SemanticMediaWiki/src/Elastic/Indexer/FileIngestJob.php new file mode 100644 index 00000000..dfa43298 --- /dev/null +++ b/www/wiki/extensions/SemanticMediaWiki/src/Elastic/Indexer/FileIngestJob.php @@ -0,0 +1,118 @@ +<?php + +namespace SMW\Elastic\Indexer; + +use SMW\ApplicationFactory; +use SMW\MediaWiki\Job; +use SMW\Elastic\ElasticFactory; +use SMW\Elastic\Connection\Client as ElasticClient; +use SMW\SQLStore\ChangeOp\ChangeDiff; +use SMW\DIWikiPage; +use Title; + +/** + * @license GNU GPL v2 + * @since 3.0 + * + * @author mwjames + */ +class FileIngestJob extends Job { + + /** + * @since 3.0 + * + * @param Title $title + * @param array $params job parameters + */ + public function __construct( Title $title, $params = [] ) { + parent::__construct( 'smw.elasticFileIngest', $title, $params ); + $this->removeDuplicates = true; + } + + /** + * @see Job::run + * + * @since 3.0 + */ + public function run() { + + $applicationFactory = ApplicationFactory::getInstance(); + $store = $applicationFactory->getStore(); + + $connection = $store->getConnection( 'elastic' ); + + // Make sure a node is available + if ( $connection->hasLock( ElasticClient::TYPE_DATA ) || !$connection->ping() ) { + + if ( $connection->hasLock( ElasticClient::TYPE_DATA ) ) { + $this->params['retryCount'] = 0; + } + + return $this->requeueRetry( $connection->getConfig() ); + } + + $elasticFactory = new ElasticFactory(); + + $indexer = $elasticFactory->newIndexer( + $store + ); + + $fileIndexer = $indexer->getFileIndexer(); + + $fileIndexer->setOrigin( __METHOD__ ); + + $fileIndexer->setLogger( + $applicationFactory->getMediaWikiLogger( 'smw-elastic' ) + ); + + $file = wfFindFile( $this->getTitle() ); + + // File isn't available yet (or uploaded), try again! + if ( $file === false ) { + return $this->requeueRetry( $connection->getConfig() ); + } + + // It has been observed that when this job is run, the job runner can + // return with "Fatal error: Allowed memory size of ..." which in most + // cases happen when large files are involved therefore temporary lift + // the limitation! + $memory_limit = ini_get( 'memory_limit' ); + + if ( wfShorthandToInteger( $memory_limit ) < wfShorthandToInteger( '1024M' ) ) { + ini_set( 'memory_limit', '1024M' ); + } + + $fileIndexer->index( + DIWikiPage::newFromTitle( $this->getTitle() ), + $file + ); + + ini_set( 'memory_limit', $memory_limit ); + + return true; + } + + private function requeueRetry( $config ) { + + // Give up! + if ( $this->getParameter( 'retryCount' ) >= $config->dotGet( 'indexer.job.file.ingest.retries' ) ) { + return true; + } + + if ( !isset( $this->params['retryCount'] ) ) { + $this->params['retryCount'] = 1; + } else { + $this->params['retryCount']++; + } + + if ( !isset( $this->params['createdAt'] ) ) { + $this->params['createdAt'] = time(); + } + + $job = new self( $this->title, $this->params ); + $job->setDelay( 60 * 10 ); + + $job->insert(); + } + +} |