summaryrefslogtreecommitdiff
path: root/www/wiki/extensions/SemanticMediaWiki/src/Elastic/Connection/Client.php
diff options
context:
space:
mode:
Diffstat (limited to 'www/wiki/extensions/SemanticMediaWiki/src/Elastic/Connection/Client.php')
-rw-r--r--www/wiki/extensions/SemanticMediaWiki/src/Elastic/Connection/Client.php889
1 files changed, 889 insertions, 0 deletions
diff --git a/www/wiki/extensions/SemanticMediaWiki/src/Elastic/Connection/Client.php b/www/wiki/extensions/SemanticMediaWiki/src/Elastic/Connection/Client.php
new file mode 100644
index 00000000..a252c2cb
--- /dev/null
+++ b/www/wiki/extensions/SemanticMediaWiki/src/Elastic/Connection/Client.php
@@ -0,0 +1,889 @@
+<?php
+
+namespace SMW\Elastic\Connection;
+
+use Elasticsearch\Client as ElasticClient;
+use Elasticsearch\Common\Exceptions\NoNodesAvailableException;
+use Exception;
+use Onoi\Cache\Cache;
+use Onoi\Cache\NullCache;
+use Psr\Log\LoggerAwareTrait;
+use Psr\Log\NullLogger;
+use SMW\Elastic\Exception\InvalidJSONException;
+use SMW\Elastic\Exception\ReplicationException;
+use SMW\Options;
+
+/**
+ * Reduced interface to the Elasticsearch client class.
+ *
+ * @license GNU GPL v2+
+ * @since 3.0
+ *
+ * @author mwjames
+ */
+class Client {
+
+ use LoggerAwareTrait;
+
+ /**
+ * Identifies the cache namespace
+ */
+ const CACHE_NAMESPACE = 'smw:elastic';
+
+ const CACHE_CHECK_TTL = 3600;
+
+ /**
+ * @see https://www.elastic.co/blog/index-vs-type
+ * @see https://www.elastic.co/guide/en/elasticsearch/reference/master/removal-of-types.html
+ *
+ * " ... Indices created in Elasticsearch 6.0.0 or later may only contain a
+ * single mapping type ..."
+ */
+ const TYPE_DATA = 'data';
+
+ /**
+ * Index, type to temporary store index lookups during the execution
+ * of subqueries.
+ */
+ const TYPE_LOOKUP = 'lookup';
+
+ /**
+ * @var Client
+ */
+ private $client;
+
+ /**
+ * @var boolean
+ */
+ private static $ping;
+
+ /**
+ * @var Cache
+ */
+ private $cache;
+
+ /**
+ * @var Options
+ */
+ private $options;
+
+ /**
+ * @var boolean
+ */
+ private $inTest = false;
+
+ /**
+ * @var boolean
+ */
+ private static $hasIndex = [];
+
+ /**
+ * @since 3.0
+ *
+ * @param ElasticClient $client
+ * @param Cache|null $cache
+ * @param Options|null $options
+ */
+ public function __construct( ElasticClient $client, Cache $cache = null, Options $options = null ) {
+ $this->client = $client;
+ $this->cache = $cache;
+ $this->options = $options;
+ $this->inTest = defined( 'MW_PHPUNIT_TEST' );
+
+ if ( $this->cache === null ) {
+ $this->cache = new NullCache();
+ }
+
+ if ( $this->options === null ) {
+ $this->options = new Options();
+ }
+
+ $this->logger = new NullLogger();
+ }
+
+ /**
+ * @since 3.0
+ *
+ * @return Options
+ */
+ public function getConfig() {
+ return $this->options;
+ }
+
+ /**
+ * @since 3.0
+ */
+ public function clear() {
+ self::$ping = null;
+ }
+
+ /**
+ * @since 3.0
+ *
+ * @param string $type
+ *
+ * @return string
+ */
+ public function getIndexNameByType( $type ) {
+ return $this->getIndexName( $type );
+ }
+
+ /**
+ * @since 3.0
+ *
+ * @param string $type
+ *
+ * @return string
+ */
+ public function getIndexName( $type ) {
+ static $indices = [];
+
+ if ( !isset( $indices[$type] ) ) {
+ $indices[$type] = "smw-$type-" . wfWikiID();
+ }
+
+ return $indices[$type];
+ }
+
+ /**
+ * @since 3.0
+ *
+ * @param string $type
+ *
+ * @return string
+ */
+ public function getIndexDefByType( $type ) {
+ static $indexDef = [];
+
+ if ( isset( $indexDef[$type] ) ) {
+ return $indexDef[$type];
+ }
+
+ $indexDef[$type] = file_get_contents( $this->options->dotGet( "index_def.$type" ) );
+
+ // Modify settings on-the-fly
+ if ( $this->options->dotGet( "settings.$type", [] ) !== [] ) {
+ $definition = json_decode( $indexDef[$type], true );
+
+ if ( ( $error = json_last_error() ) !== JSON_ERROR_NONE ) {
+ throw new InvalidJSONException( $error, $this->options->dotGet( "index_def.$type" ) );
+ }
+
+ $definition['settings'] = $this->options->dotGet( "settings.$type" ) + $definition['settings'];
+ $indexDef[$type] = json_encode( $definition );
+ }
+
+ return $indexDef[$type];
+ }
+
+ /**
+ * @since 3.0
+ *
+ * @return integer
+ */
+ public function getIndexDefFileModificationTimeByType( $type ) {
+
+ static $filemtime = [];
+
+ if ( !isset( $filemtime[$type] ) ) {
+ $filemtime[$type] = filemtime( $this->options->dotGet( "index_def.$type" ) );
+ }
+
+ return $filemtime[$type];
+ }
+
+ /**
+ * @since 3.0
+ *
+ * @return integer
+ */
+ public function getVersion() {
+
+ $info = $this->info();
+
+ if ( $this->options->safeGet( 'elastic.enabled' ) && isset( $info['version']['number'] ) ) {
+ return $info['version']['number'];
+ }
+
+ return null;
+ }
+
+ /**
+ * @since 3.0
+ *
+ * @return []
+ */
+ public function getSoftwareInfo() {
+ return [
+ 'component' => "[https://www.elastic.co/products/elasticsearch Elasticsearch]",
+ 'version' => $this->getVersion()
+ ];
+ }
+
+ /**
+ * @since 3.0
+ *
+ * @param array
+ */
+ public function info() {
+
+ if ( !$this->ping() ) {
+ return [];
+ }
+
+ try {
+ $info = $this->client->info( [] );
+ } catch( NoNodesAvailableException $e ) {
+ $info = [];
+ }
+
+ return $info;
+ }
+
+ /**
+ * @since 3.0
+ *
+ * @param array
+ */
+ public function stats( $type = 'indices', $params = [] ) {
+
+ $indices = [
+ $this->getIndexNameByType( self::TYPE_DATA ),
+ $this->getIndexNameByType( self::TYPE_LOOKUP )
+ ];
+
+ switch ( $type ) {
+ case 'indices':
+ $res = $this->client->indices()->stats( [ 'index' => $indices ] + $params );
+ break;
+ case 'nodes':
+ $res = $this->client->nodes()->stats( $params );
+ break;
+ default:
+ $res = [];
+ }
+
+ if ( $type === 'indices' && isset( $res['indices'] ) ) {
+ unset( $res['_all'] );
+ ksort( $res['indices'] );
+ }
+
+ if ( $type === 'nodes' && isset( $res['nodes'] ) ) {
+ foreach ( $res['nodes'] as $key => &$value ) {
+ // Remove privacy info
+ unset( $value['transport_address'] );
+ unset( $value['host'] );
+ unset( $value['ip'] );
+ }
+ }
+
+ return $res;
+ }
+
+ /**
+ * @since 3.0
+ *
+ * @param array
+ */
+ public function cat( $type, $params = [] ) {
+
+ $res = [];
+
+ if ( $type === 'indices' ) {
+ $indices = $this->client->cat()->indices( $params );
+
+ foreach ( $indices as $key => $value ) {
+ $res[$value['index']] = $indices[$key];
+ unset( $res[$value['index']]['index'] );
+ }
+ }
+
+ return $res;
+ }
+
+ /**
+ * @since 3.0
+ *
+ * @return Indices
+ */
+ public function indices() {
+ return $this->client->indices();
+ }
+
+ /**
+ * @since 3.0
+ *
+ * @return Ingest
+ */
+ public function ingest() {
+ return $this->client->ingest();
+ }
+
+ /**
+ * @since 3.0
+ *
+ * @param string $type
+ *
+ * @param boolean
+ */
+ public function hasIndex( $type ) {
+
+ if ( isset( self::$hasIndex[$type] ) && self::$hasIndex[$type] ) {
+ return true;
+ }
+
+ $index = $this->getIndexNameByType( $type );
+
+ $ret = $this->client->indices()->exists(
+ [
+ 'index' => $index
+ ]
+ );
+
+ return self::$hasIndex[$type] = $ret;
+ }
+
+ /**
+ * @since 3.0
+ *
+ * @param string $type
+ */
+ public function createIndex( $type ) {
+
+ $index = $this->getIndexNameByType( $type );
+ $indices = $this->client->indices();
+
+ $version = 'v1';
+
+ if ( $indices->exists( [ 'index' => "$index-$version" ] ) ) {
+ $version = 'v2';
+
+ if ( $indices->exists( [ 'index' => "$index-$version" ] ) ) {
+ $indices->delete( [ 'index' => "$index-$version" ] );
+ }
+ }
+
+ $params = [
+ 'index' => "$index-$version",
+ 'body' => $this->getIndexDefByType( $type )
+ ];
+
+ $response = $indices->create( $params );
+
+ $context = [
+ 'method' => __METHOD__,
+ 'role' => 'user',
+ 'index' => $index,
+ 'reponse' => json_encode( $response )
+ ];
+
+ $this->logger->info( 'Created index {index} with: {reponse}', $context );
+
+ return $version;
+ }
+
+ /**
+ * @since 3.0
+ *
+ * @param string $type
+ */
+ public function deleteIndex( $type ) {
+
+ $index = $this->getIndexNameByType( $type );
+
+ $params = [
+ 'index' => $index,
+ ];
+
+ try {
+ $response = $this->client->indices()->delete( $params );
+ } catch ( Exception $e ) {
+ $response = $e->getMessage();
+ }
+
+ $key = smwfCacheKey(
+ self::CACHE_NAMESPACE,
+ [
+ $index,
+ // A modified file causes a new cache key!
+ $this->getIndexDefFileModificationTimeByType( $type )
+ ]
+ );
+
+ $this->cache->delete( $key );
+
+ $context = [
+ 'method' => __METHOD__,
+ 'role' => 'user',
+ 'index' => $index,
+ 'reponse' => json_encode( $response )
+ ];
+
+ $this->logger->info( 'Deleted index {index} with: {reponse}', $context );
+ }
+
+ /**
+ * @since 3.0
+ *
+ * @param array $params
+ */
+ public function putSettings( array $params ) {
+ $this->client->indices()->putSettings( $params );
+ }
+
+ /**
+ * @since 3.0
+ *
+ * @param array $params
+ */
+ public function putMapping( array $params ) {
+ $this->client->indices()->putMapping( $params );
+ }
+
+ /**
+ * @since 3.0
+ *
+ * @param array $params
+ */
+ public function getMapping( array $params ) {
+ return $this->client->indices()->getMapping( $params );
+ }
+
+ /**
+ * @since 3.0
+ *
+ * @param array $params
+ */
+ public function getSettings( array $params ) {
+ return $this->client->indices()->getSettings( $params );
+ }
+
+ /**
+ * @since 3.0
+ *
+ * @param array $params
+ */
+ public function refresh( array $params ) {
+ $this->client->indices()->refresh( [ 'index' => $params['index'] ] );
+ }
+
+ /**
+ * @since 3.0
+ *
+ * @param array $params
+ */
+ public function validate( array $params ) {
+
+ if ( $params === [] ) {
+ return [];
+ }
+
+ $results = [];
+ $context = [
+ 'method' => __METHOD__,
+ 'role' => 'production',
+ 'index' => $params['index']
+ ];
+
+ unset( $params['body']['sort'] );
+ unset( $params['body']['_source'] );
+ unset( $params['body']['profile'] );
+ unset( $params['body']['from'] );
+ unset( $params['body']['size'] );
+
+ try {
+ $results = $this->client->indices()->validateQuery( $params );
+ } catch ( Exception $e ) {
+ $context['exception'] = $e->getMessage();
+ $this->logger->info( 'Failed the validate with: {exception}', $context );
+ }
+
+ return $results;
+ }
+
+ /**
+ * @see Client::ping
+ * @since 3.0
+ *
+ * @return boolean
+ */
+ public function ping() {
+
+ if ( self::$ping !== null ) {
+ return self::$ping;
+ }
+
+ if ( $this->options->dotGet( 'connection.quick_ping' ) ) {
+ return self::$ping = $this->quick_ping();
+ }
+
+ return self::$ping = $this->client->ping( [] );
+ }
+
+ /**
+ * Check is faster than the standard Client::ping
+ *
+ * @since 3.0
+ *
+ * @return boolean
+ */
+ public function quick_ping( $timeout = 2 ) {
+
+ $hosts = $this->options->get( 'endpoints' );
+
+ foreach ( $hosts as $host ) {
+
+ if ( is_string( $host ) ) {
+ $host = parse_url( $host );
+ }
+
+ $fsock = @fsockopen( $host['host'], $host['port'], $errno, $errstr, $timeout );
+
+ if ( $fsock ) {
+ return true;
+ }
+ }
+
+ return false;
+ }
+
+ /**
+ * @see Client::exists
+ * @since 3.0
+ *
+ * @param array $params
+ *
+ * @return boolean
+ */
+ public function exists( array $params ) {
+ return $this->client->exists( $params );
+ }
+
+ /**
+ * @see Client::get
+ * @since 3.0
+ *
+ * @param array $params
+ *
+ * @return mixed
+ */
+ public function get( array $params ) {
+ return $this->client->get( $params );
+ }
+
+ /**
+ * @see Client::delete
+ * @since 3.0
+ *
+ * @param array $params
+ *
+ * @return mixed
+ */
+ public function delete( array $params ) {
+ return $this->client->delete( $params );
+ }
+
+ /**
+ * @see Client::update
+ * @since 3.0
+ *
+ * @param array $params
+ *
+ * @return mixed
+ */
+ public function update( array $params ) {
+
+ $context = [
+ 'method' => __METHOD__,
+ 'role' => 'production',
+ 'index' => $params['index'],
+ 'id' => $params['id'],
+ 'response' => ''
+ ];
+
+ try {
+ $context['response'] = $this->client->update( $params );
+ } catch( Exception $e ) {
+ $context['exception'] = $e->getMessage();
+ $this->logger->info( 'Updated failed for document {id} with: {exception}, DOC: {doc}', $context );
+ }
+
+ return json_encode( $context['response'] );
+ }
+
+ /**
+ * @see Client::index
+ * @since 3.0
+ *
+ * @param array $params
+ *
+ * @return mixed
+ */
+ public function index( array $params ) {
+
+ $context = [
+ 'method' => __METHOD__,
+ 'role' => 'production',
+ 'index' => $params['index'],
+ 'id' => $params['id'],
+ 'response' => ''
+ ];
+
+ try {
+ $context['response'] = $this->client->index( $params );
+ } catch( Exception $e ) {
+ $context['exception'] = $e->getMessage();
+ $this->logger->info( 'Index failed for document {id} with: {exception}', $context );
+ }
+
+ return json_encode( $context['response'] );
+ }
+
+ /**
+ * @see Client::index
+ * @since 3.0
+ *
+ * @param array $params
+ */
+ public function bulk( array $params ) {
+
+ if ( $params === [] ) {
+ return;
+ }
+
+ $context = [
+ 'method' => __METHOD__,
+ 'role' => 'production',
+ 'response' => ''
+ ];
+
+ if ( $this->inTest ) {
+ $params = $params + [ 'refresh' => true ];
+ }
+
+ try {
+ $response = $this->client->bulk( $params );
+
+ // No errors, just log the head otherwise show the entire
+ // response
+ if ( $response['errors'] === false ) {
+ unset( $response['items'] );
+ } else {
+
+ $throw = $this->options->dotGet(
+ 'replication.throw.exception.on.illegal.argument.error'
+ );
+
+ foreach ( $response['items'] as $value ) {
+
+ if ( !isset( $value['index'] ) ) {
+ continue;
+ }
+
+ if ( $throw && $value['index']['error']['type'] === 'illegal_argument_exception' ) {
+ throw new ReplicationException( $value['index']['error']['reason'] );
+ }
+ }
+ }
+
+ $context['response'] = $response;
+ } catch( ReplicationException $e ) {
+ throw new ReplicationException( $e->getMessage() );
+ } catch( Exception $e ) {
+ $this->logger->info( 'Bulk update failed with' . $e->getMessage(), $context );
+ }
+
+ return json_encode( $context['response'] );
+ }
+
+ /**
+ * @see https://www.elastic.co/guide/en/elasticsearch/reference/current/search-count.html
+ * @see Client::count
+ * @since 3.0
+ *
+ * @param array $params
+ *
+ * @return mixed
+ */
+ public function count( array $params ) {
+
+ if ( $params === [] ) {
+ return [];
+ }
+
+ $results = [];
+ $time = -microtime( true );
+
+ // https://discuss.elastic.co/t/es-5-2-refresh-interval-doesnt-work-if-set-to-0/79248/2
+ // Make sure the replication/index lag doesn't hinder the search
+ if ( $this->inTest ) {
+ $this->client->indices()->refresh( [ 'index' => $params['index'] ] );
+ }
+
+ // ... "_source", "from", "profile", "query", "size", "sort" are not valid parameters.
+ unset( $params['body']['sort'] );
+ unset( $params['body']['_source'] );
+ unset( $params['body']['profile'] );
+ unset( $params['body']['from'] );
+ unset( $params['body']['size'] );
+
+ try {
+ $results = $this->client->count( $params );
+ } catch ( Exception $e ) {
+ $context['exception'] = $e->getMessage();
+ $this->logger->info( 'Failed the count with: {exception}', $context );
+ }
+
+ $context = [
+ 'method' => __METHOD__,
+ 'role' => 'production',
+ 'index' => $params['index'],
+ 'query' => json_encode( $params ),
+ 'procTime' => microtime( true ) + $time
+ ];
+
+ $this->logger->info( 'COUNT: {query}, queryTime: {procTime}', $context );
+
+ return $results;
+ }
+
+ /**
+ * @see Client::search
+ * @since 3.0
+ *
+ * @param array $params
+ *
+ * @return array
+ */
+ public function search( array $params ) {
+
+ if ( $params === [] ) {
+ return [];
+ }
+
+ $results = [];
+ $errors = [];
+
+ $time = -microtime( true );
+
+ // https://discuss.elastic.co/t/es-5-2-refresh-interval-doesnt-work-if-set-to-0/79248/2
+ // Make sure the replication/index lag doesn't hinder the search
+ if ( $this->inTest ) {
+ $this->client->indices()->refresh( [ 'index' => $params['index'] ] );
+ }
+
+ try {
+ $results = $this->client->search( $params );
+ } catch ( NoNodesAvailableException $e ) {
+ $errors[] = 'Elasticsearch endpoint returned with "' . $e->getMessage() . '" .';
+ } catch ( Exception $e ) {
+ $context['exception'] = $e->getMessage();
+ $this->logger->info( 'Failed the search with: {exception}', $context );
+ }
+
+ $this->logger->info(
+ [
+ 'Search',
+ '{query}, queryTime: {procTime}'
+ ],
+ [
+ 'method' => __METHOD__,
+ 'role' => 'user',
+ 'index' => $params['index'],
+ 'query' => $params,
+ 'procTime' => microtime( true ) + $time
+ ]
+ );
+
+ return [ $results, $errors ];
+ }
+
+ /**
+ * @see Client::explain
+ * @since 3.0
+ *
+ * @param array $params
+ *
+ * @return mixed
+ */
+ public function explain( array $params ) {
+
+ if ( $params === [] ) {
+ return [];
+ }
+
+ // https://discuss.elastic.co/t/es-5-2-refresh-interval-doesnt-work-if-set-to-0/79248/2
+ // Make sure the replication/index lag doesn't hinder the search
+ if ( $this->inTest ) {
+ $this->client->indices()->refresh( [ 'index' => $params['index'] ] );
+ }
+
+ return $this->client->explain( $params );
+ }
+
+ /**
+ * @since 3.0
+ *
+ * @param string $type
+ * @param string $version
+ */
+ public function setLock( $type, $version ) {
+
+ $key = smwfCacheKey(
+ self::CACHE_NAMESPACE,
+ [ 'lock', $type ]
+ );
+
+ $this->cache->save( $key, $version );
+ }
+
+ /**
+ * @since 3.0
+ *
+ * @param string $type
+ *
+ * @return boolean
+ */
+ public function hasLock( $type ) {
+
+ $key = smwfCacheKey(
+ self::CACHE_NAMESPACE,
+ [ 'lock', $type ]
+ );
+
+ return $this->cache->fetch( $key ) !== false;
+ }
+
+ /**
+ * @since 3.0
+ *
+ * @param string $type
+ *
+ * @return mixed
+ */
+ public function getLock( $type ) {
+
+ $key = smwfCacheKey(
+ self::CACHE_NAMESPACE,
+ [ 'lock', $type ]
+ );
+
+ return $this->cache->fetch( $key );
+ }
+
+ /**
+ * @since 3.0
+ *
+ * @param string $type
+ */
+ public function releaseLock( $type ) {
+
+ $key = smwfCacheKey(
+ self::CACHE_NAMESPACE,
+ [ 'lock', $type ]
+ );
+
+ $this->cache->delete( $key );
+ }
+
+}