summaryrefslogtreecommitdiff
path: root/www/wiki/extensions/SemanticMediaWiki/src/Elastic/Indexer/Indexer.php
diff options
context:
space:
mode:
Diffstat (limited to 'www/wiki/extensions/SemanticMediaWiki/src/Elastic/Indexer/Indexer.php')
-rw-r--r--www/wiki/extensions/SemanticMediaWiki/src/Elastic/Indexer/Indexer.php777
1 files changed, 777 insertions, 0 deletions
diff --git a/www/wiki/extensions/SemanticMediaWiki/src/Elastic/Indexer/Indexer.php b/www/wiki/extensions/SemanticMediaWiki/src/Elastic/Indexer/Indexer.php
new file mode 100644
index 00000000..98e177a9
--- /dev/null
+++ b/www/wiki/extensions/SemanticMediaWiki/src/Elastic/Indexer/Indexer.php
@@ -0,0 +1,777 @@
+<?php
+
+namespace SMW\Elastic\Indexer;
+
+use Onoi\MessageReporter\MessageReporterAwareTrait;
+use Psr\Log\LoggerAwareTrait;
+use SMW\Services\ServicesContainer;
+use RuntimeException;
+use SMW\DIWikiPage;
+use SMW\Elastic\Connection\Client as ElasticClient;
+use SMW\SQLStore\ChangeOp\ChangeDiff;
+use SMW\SQLStore\ChangeOp\ChangeOp;
+use SMW\Store;
+use SMW\Utils\CharArmor;
+use SMWDIBlob as DIBlob;
+use Title;
+use Revision;
+
+/**
+ * @license GNU GPL v2+
+ * @since 3.0
+ *
+ * @author mwjames
+ */
+class Indexer {
+
+ use MessageReporterAwareTrait;
+ use LoggerAwareTrait;
+
+ /**
+ * @var Store
+ */
+ private $store;
+
+ /**
+ * @var ServicesContainer
+ */
+ private $servicesContainer;
+
+ /**
+ * @var FileIndexer
+ */
+ private $fileIndexer;
+
+ /**
+ * @var string
+ */
+ private $origin = '';
+
+ /**
+ * @var boolean
+ */
+ private $isRebuild = false;
+
+ /**
+ * @var []
+ */
+ private $versions = [];
+
+ /**
+ * @since 3.0
+ *
+ * @param Store $store
+ * @param ServicesContainer $servicesContainer
+ */
+ public function __construct( Store $store, ServicesContainer $servicesContainer ) {
+ $this->store = $store;
+ $this->servicesContainer = $servicesContainer;
+ }
+
+ /**
+ * @since 3.0
+ *
+ * @param [] $versions
+ */
+ public function setVersions( array $versions ) {
+ $this->versions = $versions;
+ }
+
+ /**
+ * @since 3.0
+ *
+ * @param string $origin
+ */
+ public function setOrigin( $origin ) {
+ $this->origin = $origin;
+ }
+
+ /**
+ * @since 3.0
+ *
+ * @return FileIndexer
+ */
+ public function getFileIndexer() {
+
+ if ( $this->fileIndexer === null ) {
+ $this->fileIndexer = $this->servicesContainer->get( 'FileIndexer', $this );
+ }
+
+ $this->fileIndexer->setLogger(
+ $this->logger
+ );
+
+ return $this->fileIndexer;
+ }
+
+ /**
+ * @since 3.0
+ *
+ * @param string $type
+ *
+ * @return string
+ */
+ public function getId( DIWikiPage $dataItem ) {
+ return $this->store->getObjectIds()->getId( $dataItem );
+ }
+
+ /**
+ * @since 3.0
+ *
+ * @return boolean
+ */
+ public function isAccessible() {
+ return $this->isSafe();
+ }
+
+ /**
+ * @since 3.0
+ *
+ * @param boolean $isRebuild
+ */
+ public function isRebuild( $isRebuild = true ) {
+ $this->isRebuild = $isRebuild;
+ }
+
+ /**
+ * @since 3.0
+ *
+ * @return Client
+ */
+ public function getConnection() {
+ return $this->store->getConnection( 'elastic' );
+ }
+
+ /**
+ * @since 3.0
+ */
+ public function setup() {
+
+ $rollover = $this->servicesContainer->get(
+ 'Rollover',
+ $this->store->getConnection( 'elastic' )
+ );
+
+ $rollover->update(
+ ElasticClient::TYPE_DATA
+ );
+
+ $rollover->update(
+ ElasticClient::TYPE_LOOKUP
+ );
+ }
+
+ /**
+ * @since 3.0
+ */
+ public function drop() {
+
+ $rollover = $this->servicesContainer->get(
+ 'Rollover',
+ $this->store->getConnection( 'elastic' )
+ );
+
+ $rollover->delete(
+ ElasticClient::TYPE_DATA
+ );
+
+ $rollover->delete(
+ ElasticClient::TYPE_LOOKUP
+ );
+ }
+
+ /**
+ * @since 3.0
+ *
+ * @param string $type
+ *
+ * @return string
+ */
+ public function getIndexName( $type ) {
+
+ $index = $this->store->getConnection( 'elastic' )->getIndexNameByType(
+ $type
+ );
+
+ // If the rebuilder has set a specific version, use it to avoid writing to
+ // the alias of the index when running a rebuild.
+ if ( isset( $this->versions[$type] ) ) {
+ $index = "$index-" . $this->versions[$type];
+ }
+
+ return $index;
+ }
+
+ /**
+ * @since 3.0
+ *
+ * @param array $params
+ *
+ * @return Bulk
+ */
+ public function newBulk( array $params ) {
+
+ $bulk = $this->servicesContainer->get(
+ 'Bulk',
+ $this->store->getConnection( 'elastic' )
+ );
+
+ $bulk->head( $params );
+
+ return $bulk;
+ }
+
+ /**
+ * @since 3.0
+ *
+ * @param array $idList
+ */
+ public function delete( array $idList, $isConcept = false ) {
+
+ if ( $idList === [] ) {
+ return;
+ }
+
+ $title = Title::newFromText( $this->origin . ':' . md5( json_encode( $idList ) ) );
+
+ $params = [
+ 'delete' => $idList
+ ];
+
+ if ( $this->isSafe( $title, $params ) === false ) {
+ return $this->planRecoveryJob( $title, $params );
+ }
+
+ $index = $this->getIndexName(
+ ElasticClient::TYPE_DATA
+ );
+
+ $params = [
+ '_index' => $index,
+ '_type' => ElasticClient::TYPE_DATA
+ ];
+
+ $bulk = $this->newBulk( $params );
+ $time = -microtime( true );
+
+ foreach ( $idList as $id ) {
+
+ $bulk->delete( [ '_id' => $id ] );
+
+ if ( $isConcept ) {
+ $bulk->delete(
+ [
+ '_index' => $this->getIndexName( ElasticClient::TYPE_LOOKUP ),
+ '_type' => ElasticClient::TYPE_LOOKUP,
+ '_id' => md5( $id )
+ ]
+ );
+ }
+ }
+
+ $response = $bulk->execute();
+
+ $this->logger->info(
+ [
+ 'Indexer',
+ 'Deleted list',
+ 'procTime (in sec): {procTime}',
+ 'Response: {response}'
+ ],
+ [
+ 'method' => __METHOD__,
+ 'role' => 'developer',
+ 'origin' => $this->origin,
+ 'procTime' => $time + microtime( true ),
+ 'response' => $response
+ ]
+ );
+ }
+
+ /**
+ * @since 3.0
+ *
+ * @param DIWikiPage $dataItem
+ * @param array $data
+ */
+ public function create( DIWikiPage $dataItem, array $data = [] ) {
+
+ $title = $dataItem->getTitle();
+
+ $params = [
+ 'create' => $dataItem->getHash()
+ ];
+
+ if ( $this->isSafe() === false ) {
+ return $this->planRecoveryJob( $title, $params );
+ }
+
+ if ( $dataItem->getId() == 0 ) {
+ throw new RuntimeException( "Missing ID: " . $dataItem );
+ }
+
+ $connection = $this->store->getConnection( 'elastic' );
+
+ $params = [
+ 'index' => $this->getIndexName( ElasticClient::TYPE_DATA ),
+ 'type' => ElasticClient::TYPE_DATA,
+ 'id' => $dataItem->getId()
+ ];
+
+ $data['subject'] = [
+ 'title' => str_replace( '_', ' ', $dataItem->getDBKey() ),
+ 'subobject' => $dataItem->getSubobjectName(),
+ 'namespace' => $dataItem->getNamespace(),
+ 'interwiki' => $dataItem->getInterwiki(),
+ 'sortkey' => mb_convert_encoding( $dataItem->getSortKey(), 'UTF-8', 'UTF-8' )
+ ];
+
+ $response = $connection->index( $params + [ 'body' => $data ] );
+
+ $this->logger->info(
+ [
+ 'Indexer',
+ 'Create ({subject}, {id})',
+ 'Response: {response}'
+ ],
+ [
+ 'method' => __METHOD__,
+ 'role' => 'developer',
+ 'origin' => $this->origin,
+ 'subject' => $dataItem->getHash(),
+ 'id' => $dataItem->getId(),
+ 'response' => $response
+ ]
+ );
+ }
+
+ /**
+ * @since 3.0
+ *
+ * @param ChangeDiff $changeDiff
+ * @param string $text
+ */
+ public function safeReplicate( ChangeDiff $changeDiff, $text = '' ) {
+
+ $subject = $changeDiff->getSubject();
+
+ $params = [
+ 'index' => $subject->getHash()
+ ];
+
+ if ( $this->isSafe() === false ) {
+ return $this->planRecoveryJob( $subject->getTitle(), $params ) ;
+ }
+
+ $this->index( $changeDiff, $text );
+ }
+
+ /**
+ * @since 3.0
+ *
+ * @param DIWikiPage|Title|integer $id
+ *
+ * @return string
+ */
+ public function fetchNativeData( $id ) {
+
+ if ( $id instanceof DIWikiPage ) {
+ $id = $id->getTitle();
+ }
+
+ if ( $id instanceof Title ) {
+ $id = $id->getLatestRevID( \Title::GAID_FOR_UPDATE );
+ }
+
+ if ( $id == 0 ) {
+ return '';
+ };
+
+ $revision = Revision::newFromId( $id );
+
+ if ( $revision == null ) {
+ return '';
+ };
+
+ $content = $revision->getContent( Revision::RAW );
+
+ return $content->getNativeData();
+ }
+
+ /**
+ * @since 3.0
+ *
+ * @param ChangeDiff $changeDiff
+ * @param string $text
+ */
+ public function index( ChangeDiff $changeDiff, $text = '' ) {
+
+ $time = -microtime( true );
+ $subject = $changeDiff->getSubject();
+
+ $params = [
+ '_index' => $this->getIndexName( ElasticClient::TYPE_DATA ),
+ '_type' => ElasticClient::TYPE_DATA
+ ];
+
+ $bulk = $this->newBulk( $params );
+
+ $this->map_data( $bulk, $changeDiff );
+ $this->map_text( $bulk, $subject, $text );
+
+ $response = $bulk->execute();
+
+ // We always index (not upsert) since we want to have a complete state of
+ // an entity (and ES would delete and insert any document) so trying
+ // to filter and diff the data update has no real merit besides that it
+ // would require us to read each ID in the update from ES and wire the data
+ // back and forth which has shown to be ineffective especially when a
+ // subject has many subobjects.
+ //
+ // The disadvantage is that we loose any auxiliary data that were attached
+ // while not being part of the on-wiki information such as attachment
+ // information from a file ingest.
+ //
+ // In order to reapply those information we could read them in the same
+ // transaction before the actual update but since we expect the
+ // `attachment.content` to contain a large chunk of text, we push that
+ // into the job-queue so that the background process can take of it.
+ //
+ // Of course, this will cause a delay for the file content being searchable
+ // but that should be acceptable to avoid blocking any online transaction.
+ if ( !$this->isRebuild && $subject->getNamespace() === NS_FILE ) {
+ $this->getFileIndexer()->planIngestJob( $subject->getTitle() );
+ }
+
+ $this->logger->info(
+ [
+ 'Indexer',
+ 'Data index completed ({subject})',
+ 'procTime (in sec): {procTime}',
+ 'Response: {response}'
+ ],
+ [
+ 'method' => __METHOD__,
+ 'role' => 'developer',
+ 'origin' => $this->origin,
+ 'subject' => $subject->getHash(),
+ 'procTime' => $time + microtime( true ),
+ 'response' => $response
+ ]
+ );
+ }
+
+ /**
+ * Remove anything that resembles [[:...|foo]] to avoid distracting the indexer
+ * with internal links annotation that are not relevant.
+ *
+ * @param string $text
+ *
+ * @return string
+ */
+ public function removeLinks( $text ) {
+
+ // {{DEFAULTSORT: ... }}
+ $text = preg_replace( "/\\{\\{([^|]+?)\\}\\}/", "", $text );
+ $text = preg_replace( '/\\[\\[[\s\S]+?::/', '[[', $text );
+
+ // [[:foo|bar]]
+ $text = preg_replace( '/\\[\\[:[^|]+?\\|/', '[[', $text );
+ $text = preg_replace( "/\\{\\{([^|]+\\|)(.*?)\\}\\}/", "\\2", $text );
+ $text = preg_replace( "/\\[\\[([^|]+?)\\]\\]/", "\\1", $text );
+
+ // [[Has foo::Bar]]
+ // $text = \SMW\Parser\LinksEncoder::removeAnnotation( $text );
+
+ return $text;
+ }
+
+ private function isSafe() {
+
+ $connection = $this->store->getConnection( 'elastic' );
+
+ // Make sure a node is available and is not locked by the rebuilder
+ if ( !$connection->hasLock( ElasticClient::TYPE_DATA ) && $connection->ping() ) {
+ return true;
+ }
+
+ return false;
+ }
+
+ private function planRecoveryJob( $title, array $params ) {
+
+ $indexerRecoveryJob = new IndexerRecoveryJob(
+ $title,
+ $params
+ );
+
+ $indexerRecoveryJob->insert();
+
+ $this->logger->info(
+ [
+ 'Indexer',
+ 'Insert IndexerRecoveryJob: {subject}',
+ ],
+ [
+ 'method' => __METHOD__,
+ 'role' => 'user',
+ 'origin' => $this->origin,
+ 'subject' => $title->getPrefixedDBKey()
+ ]
+ );
+ }
+
+ private function map_text( $bulk, $subject, $text ) {
+
+ if ( $text === '' ) {
+ return;
+ }
+
+ $id = $subject->getId();
+
+ if ( $id == 0 ) {
+ $id = $this->store->getObjectIds()->getSMWPageID(
+ $subject->getDBkey(),
+ $subject->getNamespace(),
+ $subject->getInterwiki(),
+ $subject->getSubobjectName(),
+ true
+ );
+ }
+
+ $bulk->upsert(
+ [
+ '_index' => $this->getIndexName( ElasticClient::TYPE_DATA ),
+ '_type' => ElasticClient::TYPE_DATA,
+ '_id' => $id
+ ],
+ [
+ 'text_raw' => $this->removeLinks( $text )
+ ]
+ );
+ }
+
+ private function map_data( $bulk, $changeDiff ) {
+
+ $dbType = $this->store->getInfo( 'db' );
+ $unescape_bytea = isset( $dbType['postgres'] );
+
+ $inserts = [];
+ $inverted = [];
+
+ // In the event that a _SOBJ (or hereafter any inherited object)
+ // is deleted, remove the reference directly from the index since
+ // the object is embedded and is therefore handled outside of the
+ // normal wikiPage delete action
+ foreach ( $changeDiff->getTableChangeOps() as $tableChangeOp ) {
+ foreach ( $tableChangeOp->getFieldChangeOps( ChangeOp::OP_DELETE ) as $fieldChangeOp ) {
+
+ if ( !$fieldChangeOp->has( 'o_id' ) ) {
+ continue;
+ }
+
+ $bulk->delete( [ '_id' => $fieldChangeOp->get( 'o_id' ) ] );
+ }
+ }
+
+ $propertyList = $changeDiff->getPropertyList( 'id' );
+
+ foreach ( $changeDiff->getDataOps() as $tableChangeOp ) {
+ foreach ( $tableChangeOp->getFieldChangeOps() as $fieldChangeOp ) {
+
+ if ( !$fieldChangeOp->has( 's_id' ) ) {
+ continue;
+ }
+
+ $this->mapRows( $fieldChangeOp, $propertyList, $inserts, $inverted, $unescape_bytea );
+ }
+ }
+
+ foreach ( $inverted as $id => $update ) {
+ $bulk->upsert( [ '_id' => $id ], $update );
+ }
+
+ foreach ( $inserts as $id => $value ) {
+ $bulk->index( [ '_id' => $id ], $value );
+ }
+ }
+
+ private function mapRows( $fieldChangeOp, $propertyList, &$insertRows, &$invertedRows, $unescape_bytea ) {
+
+ // The structure to be expected in ES:
+ //
+ // "subject": {
+ // "title": "Foaf:knows",
+ // "subobject": "",
+ // "namespace": 102,
+ // "interwiki": "",
+ // "sortkey": "Foaf:knows"
+ // },
+ // "P:8": {
+ // "txtField": [
+ // "foaf knows http://xmlns.com/foaf/0.1/ Type:Page"
+ // ]
+ // },
+ // "P:29": {
+ // "datField": [
+ // 2458150.6958333
+ // ]
+ // },
+ // "P:1": {
+ // "uriField": [
+ // "http://semantic-mediawiki.org/swivt/1.0#_wpg"
+ // ]
+ // }
+
+ // - datField (time value) is a numeric field (JD number) to allow using
+ // ranges on dates with values being representable from January 1, 4713 BC
+ // (proleptic Julian calendar)
+
+ $sid = $fieldChangeOp->get( 's_id' );
+
+ if ( !isset( $insertRows[$sid] ) ) {
+ $insertRows[$sid] = [];
+ }
+
+ if ( !isset( $insertRows[$sid]['subject'] ) ) {
+ $dataItem = $this->store->getObjectIds()->getDataItemById( $sid );
+ $sort = $dataItem->getSortKey();
+
+ // Use collated sort field if available
+ if ( $dataItem->getOption( 'sort', '' ) !== '' ) {
+ $sort = $dataItem->getOption( 'sort' );
+ }
+
+ // Avoid issue with the Ealstic serializer
+ $sort = CharArmor::removeSpecialChars(
+ CharArmor::removeControlChars( $sort )
+ );
+
+ $insertRows[$sid]['subject'] = [
+ 'title' => str_replace( '_', ' ', $dataItem->getDBKey() ),
+ 'subobject' => $dataItem->getSubobjectName(),
+ 'namespace' => $dataItem->getNamespace(),
+ 'interwiki' => $dataItem->getInterwiki(),
+ 'sortkey' => $sort
+ ];
+ }
+
+ // Avoid issues where the p_id is unknown as in case of an empty
+ // concept (red linked) as reference
+ if ( !$fieldChangeOp->has( 'p_id' ) ) {
+ return;
+ }
+
+ $ins = $fieldChangeOp->getChangeOp();
+ $pid = $fieldChangeOp->get( 'p_id' );
+
+ $prop = isset( $propertyList[$pid] ) ? $propertyList[$pid] : [];
+
+ $pid = 'P:' . $pid;
+ unset( $ins['s_id'] );
+
+ $val = 'n/a';
+ $type = 'wpgField';
+
+ if ( $fieldChangeOp->has( 'o_blob' ) && $fieldChangeOp->has( 'o_hash' ) ) {
+ $type = 'txtField';
+ $val = $ins['o_blob'] === null ? $ins['o_hash'] : $ins['o_blob'];
+
+ // Postgres requires special handling of blobs otherwise escaped
+ // text elements are used as index input
+ // Tests: P9010, Q0704, Q1206, and Q0103
+ if ( $unescape_bytea && $ins['o_blob'] !== null ) {
+ $val = pg_unescape_bytea( $val );
+ }
+
+ // #3020, 3035
+ if ( isset( $prop['_type'] ) && $prop['_type'] === '_keyw' ) {
+ $val = DIBlob::normalize( $ins['o_hash'] );
+ }
+
+ // Remove control chars and avoid Elasticsearch to throw a
+ // "SmartSerializer.php: Failed to JSON encode: 5" since JSON requires
+ // valid UTF-8
+ $val = $this->removeLinks( mb_convert_encoding( $val, 'UTF-8', 'UTF-8' ) );
+ } elseif ( $fieldChangeOp->has( 'o_serialized' ) && $fieldChangeOp->has( 'o_blob' ) ) {
+ $type = 'uriField';
+ $val = $ins['o_blob'] === null ? $ins['o_serialized'] : $ins['o_blob'];
+
+ if ( $unescape_bytea && $ins['o_blob'] !== null ) {
+ $val = pg_unescape_bytea( $val );
+ }
+
+ } elseif ( $fieldChangeOp->has( 'o_serialized' ) && $fieldChangeOp->has( 'o_sortkey' ) ) {
+ $type = strpos( $ins['o_serialized'], '/' ) !== false ? 'datField' : 'numField';
+ $val = (float)$ins['o_sortkey'];
+ } elseif ( $fieldChangeOp->has( 'o_value' ) ) {
+ $type = 'booField';
+ // Avoid a "Current token (VALUE_NUMBER_INT) not of boolean type ..."
+ $val = $ins['o_value'] ? true : false;
+ } elseif ( $fieldChangeOp->has( 'o_lat' ) ) {
+ // https://www.elastic.co/guide/en/elasticsearch/reference/6.1/geo-point.html
+ // Geo-point expressed as an array with the format: [ lon, lat ]
+ // Geo-point expressed as a string with the format: "lat,lon".
+ $type = 'geoField';
+ $val = $ins['o_serialized'];
+ } elseif ( $fieldChangeOp->has( 'o_id' ) ) {
+ $type = 'wpgField';
+ $dataItem = $this->store->getObjectIds()->getDataItemById( $ins['o_id'] );
+
+ $val = $dataItem->getSortKey();
+ $val = mb_convert_encoding( $val, 'UTF-8', 'UTF-8' );
+
+ if ( !isset( $insertRows[$sid][$pid][$type] ) ) {
+ $insertRows[$sid][$pid][$type] = [];
+ }
+
+ $insertRows[$sid][$pid][$type] = array_merge( $insertRows[$sid][$pid][$type], [ $val ] );
+ $type = 'wpgID';
+ $val = (int)$ins['o_id'];
+
+ // Create a minimal body for an inverted relation
+ //
+ // When a query `[[-Has mother::Michael]]` inquiries that relationship
+ // on the fact of `Michael` -> `[[Has mother::Carol]] with `Carol`
+ // being redlinked (not exists as page) the query can match the object
+ if ( !isset( $invertedRows[$val] ) ) {
+
+ // Ensure we have something to sort on
+ // See also Q0105#8
+ $subject = [
+ 'title' => str_replace( '_', ' ', $dataItem->getDBKey() ),
+ 'subobject' => $dataItem->getSubobjectName(),
+ 'namespace' => $dataItem->getNamespace(),
+ 'interwiki' => $dataItem->getInterwiki(),
+ 'sortkey' => mb_convert_encoding( $dataItem->getSortKey(), 'UTF-8', 'UTF-8' )
+ ];
+
+ $invertedRows[$val] = [ 'subject' => $subject ];
+ }
+
+ // A null, [] (an empty array), and [null] are all equivalent, they
+ // simply don't exists in an inverted index
+ }
+
+ if ( !isset( $insertRows[$sid][$pid][$type] ) ) {
+ $insertRows[$sid][$pid][$type] = [];
+ }
+
+ $insertRows[$sid][$pid][$type] = array_merge(
+ $insertRows[$sid][$pid][$type],
+ [ $val ]
+ );
+
+ // Replicate dates in the serialized raw_format to give aggregations a chance
+ // to filter dates by term
+ if ( $type === 'datField' && isset( $ins['o_serialized'] ) ) {
+
+ if ( !isset( $insertRows[$sid][$pid]["dat_raw"] ) ) {
+ $insertRows[$sid][$pid]["dat_raw"] = [];
+ }
+
+ $insertRows[$sid][$pid]["dat_raw"][] = $ins['o_serialized'];
+ }
+ }
+
+}