diff options
Diffstat (limited to 'www/wiki/extensions/Translate/ttmserver/ElasticSearchTTMServer.php')
-rw-r--r-- | www/wiki/extensions/Translate/ttmserver/ElasticSearchTTMServer.php | 608 |
1 files changed, 405 insertions, 203 deletions
diff --git a/www/wiki/extensions/Translate/ttmserver/ElasticSearchTTMServer.php b/www/wiki/extensions/Translate/ttmserver/ElasticSearchTTMServer.php index 8886d079..0835d518 100644 --- a/www/wiki/extensions/Translate/ttmserver/ElasticSearchTTMServer.php +++ b/www/wiki/extensions/Translate/ttmserver/ElasticSearchTTMServer.php @@ -4,10 +4,12 @@ * * @file * @author Niklas Laxström - * @license GPL-2.0+ + * @license GPL-2.0-or-later * @ingroup TTMServer */ +use MediaWiki\Logger\LoggerFactory; + /** * TTMServer backed based on ElasticSearch. Depends on Elastica. * @since 2014.04 @@ -18,6 +20,38 @@ class ElasticSearchTTMServer implements ReadableTTMServer, WritableTTMServer, SearchableTTMserver { /** + * @const int number of documents that will be loaded and deleted in a + * single operation + */ + const BULK_DELETE_CHUNK_SIZE = 100; + + /** + * @const int in case a write operation fails during a batch process + * this constant controls the number of times we will retry the same + * operation. + */ + const BULK_INDEX_RETRY_ATTEMPTS = 5; + + /** + * @const int time (seconds) to wait for the index to ready before + * starting to index. Since we wait for index status it can be relatively + * long especially if some nodes are restarted. + */ + const WAIT_UNTIL_READY_TIMEOUT = 3600; + + /** + * Flag in the frozen index that indicates that all indices + * are frozen (useful only when this service shares the cluster with + * CirrusSearch) + */ + const ALL_INDEXES_FROZEN_NAME = 'freeze_everything'; + + /** + * Type used in the frozen index + */ + const FROZEN_TYPE = 'frozen'; + + /** * @var \Elastica\Client */ protected $client; @@ -49,6 +83,11 @@ class ElasticSearchTTMServer } protected function doQuery( $sourceLanguage, $targetLanguage, $text ) { + if ( !$this->useWikimediaExtraPlugin() ) { + // ElasticTTM is currently not compatible with elasticsearch 2.x/5.x + // It needs FuzzyLikeThis ported via the wmf extra plugin + throw new \RuntimeException( 'The wikimedia extra plugin is mandatory.' ); + } /* Two query system: * 1) Find all strings in source language that match text * 2) Do another query for translations for those strings @@ -57,52 +96,33 @@ class ElasticSearchTTMServer $oldTimeout = $connection->getTimeout(); $connection->setTimeout( 10 ); - $fuzzyQuery = new \Elastica\Query\FuzzyLikeThis(); + $fuzzyQuery = new FuzzyLikeThis(); $fuzzyQuery->setLikeText( $text ); - $fuzzyQuery->addFields( array( 'content' ) ); + $fuzzyQuery->addFields( [ 'content' ] ); $boostQuery = new \Elastica\Query\FunctionScore(); - if ( $this->useWikimediaExtraPlugin() ) { - $boostQuery->addFunction( - 'levenshtein_distance_score', - array( - 'text' => $text, - 'field' => 'content' - ) - ); - } else { - $groovyScript = -<<<GROOVY -import org.apache.lucene.search.spell.* -new LevensteinDistance().getDistance(srctxt, _source['content']) -GROOVY; - $script = new \Elastica\Script( - $groovyScript, - array( 'srctxt' => $text ), - \Elastica\Script::LANG_GROOVY - ); - $boostQuery->addScriptScoreFunction( $script ); - } + $boostQuery->addFunction( + 'levenshtein_distance_score', + [ + 'text' => $text, + 'field' => 'content' + ] + ); $boostQuery->setBoostMode( \Elastica\Query\FunctionScore::BOOST_MODE_REPLACE ); // Wrap the fuzzy query so it can be used as a filter. // This is slightly faster, as ES can throw away the scores by this query. - $fuzzyFilter = new \Elastica\Filter\Query(); - $fuzzyFilter->setQuery( $fuzzyQuery ); - $boostQuery->setFilter( $fuzzyFilter ); + $bool = new \Elastica\Query\BoolQuery(); + $bool->addFilter( $fuzzyQuery ); + $bool->addMust( $boostQuery ); - // Use filtered query to wrap function score and language filter - $filteredQuery = new \Elastica\Query\Filtered(); - - $languageFilter = new \Elastica\Filter\Term(); + $languageFilter = new \Elastica\Query\Term(); $languageFilter->setTerm( 'language', $sourceLanguage ); - - $filteredQuery->setFilter( $languageFilter ); - $filteredQuery->setQuery( $boostQuery ); + $bool->addFilter( $languageFilter ); // The whole query $query = new \Elastica\Query(); - $query->setQuery( $filteredQuery ); + $query->setQuery( $bool ); // The interface usually displays three best candidates. These might // come from more than three source things, if the translations are @@ -117,16 +137,17 @@ GROOVY; $query->setFrom( 0 ); $query->setSize( $sizeFirst ); - $query->setParam( '_source', array( 'content' ) ); - $cutoff = isset( $this->config['cutoff'] ) ? $this->config['cutoff'] : 0.65; + $query->setParam( '_source', [ 'content' ] ); + $cutoff = $this->config['cutoff'] ?? 0.65; $query->setParam( 'min_score', $cutoff ); - $query->setSort( array( '_score', '_uid' ) ); + $query->setSort( [ '_score', '_uid' ] ); - // This query is doing two unrelated things: - // 1) Collect the message contents and scores so that they can - // be accessed later for the translations we found. - // 2) Build the query string for the query that fetches the translations. - $contents = $scores = $terms = array(); + /* This query is doing two unrelated things: + * 1) Collect the message contents and scores so that they can + * be accessed later for the translations we found. + * 2) Build the query string for the query that fetches the translations. + */ + $contents = $scores = $terms = []; do { $resultset = $this->getType()->search( $query ); @@ -167,17 +188,17 @@ GROOVY; // Break if we already got all hits } while ( $resultset->getTotalHits() > count( $contents ) ); - $suggestions = array(); + $suggestions = []; // Skip second query if first query found nothing. Keeping only one return // statement in this method to avoid forgetting to reset connection timeout - if ( $terms !== array() ) { + if ( $terms !== [] ) { $idQuery = new \Elastica\Query\Terms(); $idQuery->setTerms( '_id', $terms ); $query = new \Elastica\Query( $idQuery ); $query->setSize( 25 ); - $query->setParam( '_source', array( 'wiki', 'uri', 'content', 'localid' ) ); + $query->setParam( '_source', [ 'wiki', 'uri', 'content', 'localid' ] ); $resultset = $this->getType()->search( $query ); foreach ( $resultset->getResults() as $result ) { @@ -186,7 +207,7 @@ GROOVY; // Construct the matching source id $sourceId = preg_replace( '~/[^/]+$~', '', $result->getId() ); - $suggestions[] = array( + $suggestions[] = [ 'source' => $contents[$sourceId], 'target' => $data['content'], 'context' => $data['localid'], @@ -194,10 +215,10 @@ GROOVY; 'wiki' => $data['wiki'], 'location' => $data['localid'] . '/' . $targetLanguage, 'uri' => $data['uri'], - ); + ]; } - // Ensure reults are in quality order + // Ensure results are in quality order uasort( $suggestions, function ( $a, $b ) { if ( $a['quality'] === $b['quality'] ) { return 0; @@ -214,6 +235,14 @@ GROOVY; /* Write functions */ + /** + * Add / update translations. + * + * @param MessageHandle $handle + * @param ?string $targetText + * @throws \RuntimeException + * @return bool + */ public function update( MessageHandle $handle, $targetText ) { if ( !$handle->isValid() || $handle->getCode() === '' ) { return false; @@ -234,14 +263,11 @@ GROOVY; // Do not delete definitions, because the translations are attached to that if ( $handle->getCode() !== $sourceLanguage ) { $localid = $handle->getTitleForBase()->getPrefixedText(); - - $boolQuery = new \Elastica\Query\BoolQuery(); - $boolQuery->addMust( new Elastica\Query\Term( array( 'wiki' => wfWikiID() ) ) ); - $boolQuery->addMust( new Elastica\Query\Term( array( 'language' => $handle->getCode() ) ) ); - $boolQuery->addMust( new Elastica\Query\Term( array( 'localid' => $localid ) ) ); - - $query = new \Elastica\Query( $boolQuery ); - $this->getType()->deleteByQuery( $query ); + $this->deleteByQuery( $this->getType(), Elastica\Query::create( + ( new \Elastica\Query\BoolQuery() ) + ->addFilter( new Elastica\Query\Term( [ 'wiki' => wfWikiID() ] ) ) + ->addFilter( new Elastica\Query\Term( [ 'language' => $handle->getCode() ] ) ) + ->addFilter( new Elastica\Query\Term( [ 'localid' => $localid ] ) ) ) ); } // If translation was made fuzzy, we do not need to add anything @@ -251,23 +277,19 @@ GROOVY; $revId = $handle->getTitleForLanguage( $sourceLanguage )->getLatestRevID(); $doc = $this->createDocument( $handle, $targetText, $revId ); + $fname = __METHOD__; - $retries = 5; - while ( $retries-- > 0 ) { - try { + MWElasticUtils::withRetry( self::BULK_INDEX_RETRY_ATTEMPTS, + function () use ( $doc ) { $this->getType()->addDocument( $doc ); - break; - } catch ( \Elastica\Exception\ExceptionInterface $e ) { - if ( $retries === 0 ) { - throw $e; - } else { - $c = get_class( $e ); - $msg = $e->getMessage(); - error_log( __METHOD__ . ": update failed ($c: $msg); retrying." ); - sleep( 10 ); - } + }, + function ( $e, $errors ) use ( $fname ) { + $c = get_class( $e ); + $msg = $e->getMessage(); + error_log( $fname . ": update failed ($c: $msg); retrying." ); + sleep( 10 ); } - } + ); return true; } @@ -285,14 +307,14 @@ GROOVY; $wiki = wfWikiID(); $globalid = "$wiki-$localid-$revId/$language"; - $data = array( + $data = [ 'wiki' => $wiki, 'uri' => $handle->getTitle()->getCanonicalURL(), 'localid' => $localid, 'language' => $language, 'content' => $text, 'group' => $handle->getGroupIds(), - ); + ]; return new \Elastica\Document( $globalid, $data ); } @@ -302,36 +324,45 @@ GROOVY; * @param bool $rebuild Deletes index first if already exists */ public function createIndex( $rebuild ) { + $indexSettings = [ + 'number_of_shards' => $this->getShardCount(), + 'analysis' => [ + 'filter' => [ + 'prefix_filter' => [ + 'type' => 'edge_ngram', + 'min_gram' => 2, + 'max_gram' => 20 + ] + ], + 'analyzer' => [ + 'prefix' => [ + 'type' => 'custom', + 'tokenizer' => 'standard', + 'filter' => [ 'standard', 'lowercase', 'prefix_filter' ] + ], + 'casesensitive' => [ + 'tokenizer' => 'standard', + 'filter' => [ 'standard' ] + ] + ] + ] + ]; + $replicas = $this->getReplicaCount(); + if ( strpos( $replicas, '-' ) === false ) { + $indexSettings['number_of_replicas'] = $replicas; + } else { + $indexSettings['auto_expand_replicas'] = $replicas; + } + $type = $this->getType(); - $type->getIndex()->create( - array( - 'number_of_shards' => $this->getShardCount(), - 'number_of_replicas' => $this->getReplicaCount(), - 'analysis' => array( - 'filter' => array( - 'prefix_filter' => array( - 'type' => 'edge_ngram', - 'min_gram'=> 2, - 'max_gram'=> 20 - ) - ), - 'analyzer' => array( - 'prefix' => array( - 'type' => 'custom', - 'tokenizer' => 'standard', - 'filter' => array( 'standard', 'lowercase', 'prefix_filter' ) - ), - 'casesensitive' => array( - 'tokenizer' => 'standard', - 'filter' => array( 'standard' ) - ) - ) - ) - ), - $rebuild - ); + $type->getIndex()->create( $indexSettings, $rebuild ); } + /** + * Begin the bootstrap process. + * + * @throws \RuntimeException + */ public function beginBootstrap() { $type = $this->getType(); if ( $this->updateMapping ) { @@ -342,44 +373,40 @@ GROOVY; } $settings = $type->getIndex()->getSettings(); - $settings->setRefreshInterval( -1 ); + $settings->setRefreshInterval( '-1' ); - $term = new Elastica\Query\Term(); - $term->setTerm( 'wiki', wfWikiID() ); - $query = new \Elastica\Query( $term ); - $type->deleteByQuery( $query ); + $this->deleteByQuery( $this->getType(), \Elastica\Query::create( + ( new Elastica\Query\Term() )->setTerm( 'wiki', wfWikiID() ) ) ); $mapping = new \Elastica\Type\Mapping(); $mapping->setType( $type ); - $mapping->setProperties( array( - 'wiki' => array( 'type' => 'string', 'index' => 'not_analyzed' ), - 'localid' => array( 'type' => 'string', 'index' => 'not_analyzed' ), - 'uri' => array( 'type' => 'string', 'index' => 'not_analyzed' ), - 'language' => array( 'type' => 'string', 'index' => 'not_analyzed' ), - 'group' => array( 'type' => 'string', 'index' => 'not_analyzed' ), - 'content' => array( - 'type' => 'string', - 'fields' => array( - 'content' => array( - 'type' => 'string', - 'index' => 'analyzed', + $mapping->setProperties( [ + 'wiki' => [ 'type' => 'keyword' ], + 'localid' => [ 'type' => 'keyword' ], + 'uri' => [ 'type' => 'keyword' ], + 'language' => [ 'type' => 'keyword' ], + 'group' => [ 'type' => 'keyword' ], + 'content' => [ + 'type' => 'text', + 'fields' => [ + 'content' => [ + 'type' => 'text', 'term_vector' => 'yes' - ), - 'prefix_complete' => array( - 'type' => 'string', - 'index_analyzer' => 'prefix', + ], + 'prefix_complete' => [ + 'type' => 'text', + 'analyzer' => 'prefix', 'search_analyzer' => 'standard', 'term_vector' => 'yes' - ), - 'case_sensitive' => array( - 'type' => 'string', - 'index' => 'analyzed', + ], + 'case_sensitive' => [ + 'type' => 'text', 'analyzer' => 'casesensitive', 'term_vector' => 'yes' - ) - ) - ), - ) ); + ] + ] + ], + ] ); $mapping->send(); $this->waitUntilReady(); @@ -400,29 +427,24 @@ GROOVY; } public function batchInsertTranslations( array $batch ) { - $docs = array(); + $docs = []; foreach ( $batch as $data ) { list( $handle, $sourceLanguage, $text ) = $data; $revId = $handle->getTitleForLanguage( $sourceLanguage )->getLatestRevID(); $docs[] = $this->createDocument( $handle, $text, $revId ); } - $retries = 5; - while ( $retries-- > 0 ) { - try { + MWElasticUtils::withRetry( self::BULK_INDEX_RETRY_ATTEMPTS, + function () use ( $docs ) { $this->getType()->addDocuments( $docs ); - break; - } catch ( \Elastica\Exception\ExceptionInterface $e ) { - if ( $retries === 0 ) { - throw $e; - } else { - $c = get_class( $e ); - $msg = $e->getMessage(); - $this->logOutput( "Batch failed ($c: $msg), trying again in 10 seconds" ); - sleep( 10 ); - } + }, + function ( $e, $errors ) { + $c = get_class( $e ); + $msg = $e->getMessage(); + $this->logOutput( "Batch failed ($c: $msg), trying again in 10 seconds" ); + sleep( 10 ); } - } + ); } public function endBatch() { @@ -432,8 +454,8 @@ GROOVY; public function endBootstrap() { $index = $this->getType()->getIndex(); $index->refresh(); - $index->optimize(); - $index->getSettings()->setRefreshInterval( 5 ); + $index->forcemerge(); + $index->getSettings()->setRefreshInterval( '5s' ); } public function getClient() { @@ -451,52 +473,108 @@ GROOVY; * @return true if the backend is configured with the wikimedia extra plugin */ public function useWikimediaExtraPlugin() { - return isset ( $this->config['use_wikimedia_extra'] ) && $this->config['use_wikimedia_extra']; + return isset( $this->config['use_wikimedia_extra'] ) && $this->config['use_wikimedia_extra']; } - public function getType() { + /** + * @return string + */ + private function getIndexName() { if ( isset( $this->config['index'] ) ) { - $index = $this->config['index']; + return $this->config['index']; } else { - $index = 'ttmserver'; + return 'ttmserver'; } - return $this->getClient()->getIndex( $index )->getType( 'message' ); + } + + public function getType() { + return $this->getClient() + ->getIndex( $this->getIndexName() ) + ->getType( 'message' ); } protected function getShardCount() { - return isset( $this->config['shards'] ) ? $this->config['shards'] : 5; + return $this->config['shards'] ?? 1; } protected function getReplicaCount() { - return isset( $this->config['replicas'] ) ? $this->config['replicas'] : 0; + return $this->config['replicas'] ?? '0-2'; } - protected function waitUntilReady() { - $expectedActive = $this->getShardCount() * ( 1 + $this->getReplicaCount() ); - $indexName = $this->getType()->getIndex()->getName(); + /** + * Get index health + * TODO: Remove this code in the future as we drop support for + * older versions of the Elastica extension. + * + * @param string $indexName + * @return array the index health status + */ + protected function getIndexHealth( $indexName ) { $path = "_cluster/health/$indexName"; + $response = $this->getClient()->request( $path ); + if ( $response->hasError() ) { + throw new \Exception( "Error while fetching index health status: " . $response->getError() ); + } + return $response->getData(); + } - while ( true ) { - $response = $this->getClient()->request( $path ); - if ( $response->hasError() ) { - $this->logOutput( 'Error fetching index health. Retrying.' ); - $this->logOutput( 'Message: ' + $response->getError() ); - } else { - $health = $response->getData(); - $active = $health['active_shards']; - $this->logOutput( - "active:$active/$expectedActive ". - "relocating:{$health['relocating_shards']} " . - "initializing:{$health['initializing_shards']} ". - "unassigned:{$health['unassigned_shards']}" - ); + /** + * Wait for the index to go green + * + * NOTE: This method has been copied and adjusted from + * CirrusSearch/includes/Maintenance/ConfigUtils.php. Ideally we'd + * like to make these utility methods available in the Elastica + * extension, but this one requires some refactoring in cirrus first. + * TODO: Remove this code in the future as we drop support for + * older versions of the Elastica extension. + * + * @param string $indexName + * @param int $timeout + * @return bool true if the index is green false otherwise. + */ + protected function waitForGreen( $indexName, $timeout ) { + $startTime = time(); + while ( ( $startTime + $timeout ) > time() ) { + try { + $response = $this->getIndexHealth( $indexName ); + $status = isset( $response['status'] ) ? $response['status'] : 'unknown'; + if ( $status === 'green' ) { + $this->logOutput( "\tGreen!" ); + return true; + } + $this->logOutput( "\tIndex is $status retrying..." ); + sleep( 5 ); + } catch ( \Exception $e ) { + $this->logOutput( "Error while waiting for green ({$e->getMessage()}), retrying..." ); } + } + return false; + } - if ( $active === $expectedActive ) { - break; + protected function waitUntilReady() { + if ( method_exists( 'MWElasticUtils', 'waitForGreen' ) ) { + $statuses = MWElasticUtils::waitForGreen( + $this->getClient(), + $this->getIndexName(), + self::WAIT_UNTIL_READY_TIMEOUT ); + $this->logOutput( "Waiting for the index to go green..." ); + foreach ( $statuses as $message ) { + $this->logOutput( $message ); + } + + if ( !$statuses->getReturn() ) { + die( "Timeout! Please check server logs for {$this->getIndexName()}." ); } - sleep( 10 ); + return; + } + + // TODO: This code can be removed in the future as we drop support for + // older versions of the Elastica extension. + $indexName = $this->getType()->getIndex()->getName(); + $this->logOutput( "Waiting for the index to go green..." ); + if ( !$this->waitForGreen( $indexName, self::WAIT_UNTIL_READY_TIMEOUT ) ) { + die( "Timeout! Please check server logs for {$this->getIndex()->getName()}." ); } } @@ -519,9 +597,14 @@ GROOVY; $this->updateMapping = true; } - // Parse query string and build the search query + /** + * Parse query string and build the search query + * @param string $queryString + * @param array $opts + * @return array + */ protected function parseQueryString( $queryString, array $opts ) { - $fields = $highlights = array(); + $fields = $highlights = []; $terms = preg_split( '/\s+/', $queryString ); $match = $opts['match']; $case = $opts['case']; @@ -560,9 +643,9 @@ GROOVY; } // Fields for highlighting - $highlights[$analyzer] = array( + $highlights[$analyzer] = [ 'number_of_fragments' => 0 - ); + ]; // Allow searching by exact message title (page name with // language subpage). @@ -582,27 +665,33 @@ GROOVY; } } - return array( $searchQuery, $highlights ); + return [ $searchQuery, $highlights ]; } - // Search interface - public function search( $queryString, $opts, $highlight ) { + /** + * Search interface + * @param string $queryString + * @param array $opts + * @param array $highlight + * @return \Elastica\Search + */ + public function createSearch( $queryString, $opts, $highlight ) { $query = new \Elastica\Query(); list( $searchQuery, $highlights ) = $this->parseQueryString( $queryString, $opts ); $query->setQuery( $searchQuery ); - $language = new \Elastica\Facet\Terms( 'language' ); + $language = new \Elastica\Aggregation\Terms( 'language' ); $language->setField( 'language' ); $language->setSize( 500 ); - $query->addFacet( $language ); + $query->addAggregation( $language ); - $group = new \Elastica\Facet\Terms( 'group' ); + $group = new \Elastica\Aggregation\Terms( 'group' ); $group->setField( 'group' ); // Would like to prioritize the top level groups and not show subgroups // if the top group has only few hits, but that doesn't seem to be possile. $group->setSize( 500 ); - $query->addFacet( $group ); + $query->addAggregation( $group ); $query->setSize( $opts['limit'] ); $query->setFrom( $opts['offset'] ); @@ -611,53 +700,70 @@ GROOVY; // multiple must clauses are executed by converting each filter into a bit // field then anding them together. The latter is normally faster if either // of the subfilters are reused. May not make a difference in this context. - $filters = new \Elastica\Filter\BoolFilter(); + $filters = new \Elastica\Query\BoolQuery(); $language = $opts['language']; if ( $language !== '' ) { - $languageFilter = new \Elastica\Filter\Term(); + $languageFilter = new \Elastica\Query\Term(); $languageFilter->setTerm( 'language', $language ); - $filters->addMust( $languageFilter ); + $filters->addFilter( $languageFilter ); } $group = $opts['group']; if ( $group !== '' ) { - $groupFilter = new \Elastica\Filter\Term(); + $groupFilter = new \Elastica\Query\Term(); $groupFilter->setTerm( 'group', $group ); - $filters->addMust( $groupFilter ); + $filters->addFilter( $groupFilter ); } // Check that we have at least one filter to avoid invalid query errors. if ( $language !== '' || $group !== '' ) { + // TODO: This seems wrong, but perhaps for aggregation purposes? + // should make $search a must clause and use the bool query + // as main. $query->setPostFilter( $filters ); } list( $pre, $post ) = $highlight; - $query->setHighlight( array( + $query->setHighlight( [ // The value must be an object - 'pre_tags' => array( $pre ), - 'post_tags' => array( $post ), + 'pre_tags' => [ $pre ], + 'post_tags' => [ $post ], 'fields' => $highlights, - ) ); + ] ); + + return $this->getType()->getIndex()->createSearch( $query ); + } + + /** + * Search interface + * @param string $queryString + * @param array $opts + * @param array $highlight + * @throws TTMServerException + * @return \Elastica\ResultSet + */ + public function search( $queryString, $opts, $highlight ) { + $search = $this->createSearch( $queryString, $opts, $highlight ); try { - return $this->getType()->getIndex()->search( $query ); + return $search->search(); } catch ( \Elastica\Exception\ExceptionInterface $e ) { throw new TTMServerException( $e->getMessage() ); } } public function getFacets( $resultset ) { - $facets = $resultset->getFacets(); + $aggs = $resultset->getAggregations(); - $ret = array( - 'language' => array(), - 'group' => array() - ); + $ret = [ + 'language' => [], + 'group' => [] + ]; - foreach ( $facets as $type => $facetInfo ) { - foreach ( $facetInfo['terms'] as $facetRow ) { - $ret[$type][$facetRow['term']] = $facetRow['count']; + foreach ( $aggs as $type => $info ) { + foreach ( $info['buckets'] as $row ) { + $ret[$type][$row['key']] = $row['doc_count']; } } @@ -669,7 +775,7 @@ GROOVY; } public function getDocuments( $resultset ) { - $ret = array(); + $ret = []; foreach ( $resultset->getResults() as $document ) { $data = $document->getData(); $hl = $document->getHighlights(); @@ -685,4 +791,100 @@ GROOVY; return $ret; } + + /** + * Delete docs by query by using the scroll API. + * TODO: Elastica\Index::deleteByQuery() ? was removed + * in 2.x and returned in 5.x. + * + * @param \Elastica\Type $type the source index + * @param \Elastica\Query $query the query + * @throws \RuntimeException + */ + private function deleteByQuery( \Elastica\Type $type, \Elastica\Query $query ) { + if ( method_exists( 'MWElasticUtils', 'deleteByQuery' ) ) { + try { + MWElasticUtils::deleteByQuery( $type->getIndex(), $query, /* $allowConflicts = */ true ); + } catch ( \Exception $e ) { + LoggerFactory::getInstance( 'ElasticSearchTTMServer' )->error( + 'Problem encountered during deletion.', + [ 'exception' => $e ] + ); + + throw new \RuntimeException( "Problem encountered during deletion.\n" . $e ); + } + return; + } + // TODO: This code can be removed in the future as we drop support for + // older versions of the Elastica extension. + $retryAttempts = self::BULK_INDEX_RETRY_ATTEMPTS; + $search = new \Elastica\Search( $this->getClient() ); + $search->setQuery( $query ); + $search->addType( $type ); + $search->addIndex( $type->getIndex() ); + $scroll = new \Elastica\Scroll( $search, '15m' ); + + foreach ( $scroll as $results ) { + $ids = []; + foreach ( $results as $result ) { + $ids[] = $result->getId(); + } + + if ( $ids === [] ) { + continue; + } + + MWElasticUtils::withRetry( $retryAttempts, + function () use ( $ids, $type ) { + $type->deleteIds( $ids ); + } + ); + } + } + + /** + * @return bool + */ + public function isFrozen() { + if ( method_exists( 'MWElasticUtils', 'isFrozen' ) ) { + try { + return MWElasticUtils::isFrozen( $this->getClient() ); + } catch ( \Exception $e ) { + LoggerFactory::getInstance( 'ElasticSearchTTMServer' )->warning( + 'Problem encountered while checking the frozen index.', + [ 'exception' => $e ] + ); + return false; + } + } + + // TODO: This code can be removed in the future as we drop support for + // older versions of the Elastica extension. + if ( !isset( $this->config['frozen_index'] ) ) { + return false; + } + $frozenIndex = $this->config['frozen_index']; + $indices = [ static::ALL_INDEXES_FROZEN_NAME, $this->getIndexName() ]; + $ids = ( new \Elastica\Query\Ids() ) + ->setIds( $indices ); + + try { + $resp = $this->getClient() + ->getIndex( $frozenIndex ) + ->getType( static::FROZEN_TYPE ) + ->search( \Elastica\Query::create( $ids ) ); + + if ( $resp->count() === 0 ) { + return false; + } else { + return true; + } + } catch ( \Exception $e ) { + LoggerFactory::getInstance( 'ElasticSearchTTMServer' )->warning( + 'Problem encountered while checking the frozen index.', + [ 'exception' => $e ] + ); + return false; + } + } } |