summaryrefslogtreecommitdiff
path: root/www/wiki/extensions/SemanticMediaWiki/src/Elastic/Indexer/FileIndexer.php
diff options
context:
space:
mode:
Diffstat (limited to 'www/wiki/extensions/SemanticMediaWiki/src/Elastic/Indexer/FileIndexer.php')
-rw-r--r--www/wiki/extensions/SemanticMediaWiki/src/Elastic/Indexer/FileIndexer.php479
1 files changed, 479 insertions, 0 deletions
diff --git a/www/wiki/extensions/SemanticMediaWiki/src/Elastic/Indexer/FileIndexer.php b/www/wiki/extensions/SemanticMediaWiki/src/Elastic/Indexer/FileIndexer.php
new file mode 100644
index 00000000..e9408dfc
--- /dev/null
+++ b/www/wiki/extensions/SemanticMediaWiki/src/Elastic/Indexer/FileIndexer.php
@@ -0,0 +1,479 @@
+<?php
+
+namespace SMW\Elastic\Indexer;
+
+use File;
+use Onoi\MessageReporter\MessageReporterAwareTrait;
+use Psr\Log\LoggerAwareTrait;
+use RuntimeException;
+use SMW\ApplicationFactory;
+use SMW\DIWikiPage;
+use SMW\Elastic\Connection\Client as ElasticClient;
+use SMW\Elastic\QueryEngine\FieldMapper;
+use SMW\Store;
+use SMWContainerSemanticData as ContainerSemanticData;
+use Title;
+
+/**
+ * Experimental file indexer that uses the ES ingest pipeline to ingest and retrieve
+ * data from an attachment and make file content searchable outside of a normal
+ * wiki content.
+ *
+ * @license GNU GPL v2+
+ * @since 3.0
+ *
+ * @author mwjames
+ */
+class FileIndexer {
+
+ use MessageReporterAwareTrait;
+ use LoggerAwareTrait;
+
+ /**
+ * @var Indexer
+ */
+ private $indexer;
+
+ /**
+ * @var string
+ */
+ private $origin = '';
+
+ /**
+ * @var boolean
+ */
+ private $sha1Check = true;
+
+ /**
+ * @since 3.0
+ *
+ * @param Indexer $indexer
+ */
+ public function __construct( Indexer $indexer ) {
+ $this->indexer = $indexer;
+ }
+
+ /**
+ * @since 3.0
+ *
+ * @param string $origin
+ */
+ public function setOrigin( $origin ) {
+ $this->origin = $origin;
+ }
+
+ /**
+ * @since 3.0
+ */
+ public function noSha1Check() {
+ $this->sha1Check = false;
+ }
+
+ /**
+ * @since 3.0
+ *
+ * @param File|null $file
+ */
+ public function planIngestJob( Title $title ) {
+
+ $fileIngestJob = new FileIngestJob(
+ $title
+ );
+
+ $fileIngestJob->lazyPush();
+ }
+
+ /**
+ * The ES ingest pipeline only does create (not update) index content which
+ * means any other content is deleted after the ingest process has finished
+ * therefore:
+ *
+ * - Read the document before, and retrieve any annotations that exists for
+ * that entity
+ * - Let ES ingest the file content and attach the earlier retrieved
+ * annotations
+ * - SMW doesn't know anything about the file attachment details ES has gather
+ * from the file hence update the SQLStore (!important not the ElasticStore)
+ * with the data
+ * - After the SQLStore update make sure that those attachment details (which
+ * are represented as subobject) are added to ES manually (means not through
+ * the standard Store::updateData to avoid an update circle) otherwise there
+ * will be invisible the any SMW user
+ *
+ * @since 3.0
+ *
+ * @param DIWikiPage $dataItem
+ * @param File|null $file
+ */
+ public function index( DIWikiPage $dataItem, File $file = null ) {
+
+ if ( $dataItem->getId() == 0 ) {
+ $dataItem->setId( $this->indexer->getId( $dataItem ) );
+ }
+
+ if ( $dataItem->getId() == 0 || $dataItem->getNamespace() !== NS_FILE || $dataItem->getSubobjectName() !== '' ) {
+ return;
+ }
+
+ $time = -microtime( true );
+
+ $params = [
+ 'id' => 'attachment',
+ 'body' => [
+ 'description' => 'Extract attachment information',
+ 'processors' => [
+ [
+ 'attachment' => [
+ 'field' => 'file_content',
+ 'indexed_chars' => -1
+ ]
+ ],
+ [
+ 'remove' => [
+ "field" => "file_content"
+ ]
+ ]
+ ]
+ ],
+ ];
+
+ $connection = $this->indexer->getConnection();
+ $connection->ingest()->putPipeline( $params );
+
+ if ( $file === null ) {
+ $file = wfFindFile( $dataItem->getTitle() );
+ }
+
+ if ( $file === false || $file === null ) {
+ return;
+ }
+
+ $url = $file->getFullURL();
+ $id = $dataItem->getId();
+
+ $sha1 = $file->getSha1();
+ $ingest = true;
+
+ $index = $this->indexer->getIndexName( ElasticClient::TYPE_DATA );
+ $doc = [ '_source' => [] ];
+
+ $params = [
+ 'index' => $index,
+ 'type' => ElasticClient::TYPE_DATA,
+ 'id' => $id,
+ ];
+
+ // Do we have any existing data? The ingest pipeline will override the
+ // entire document, so rescue any data before starting the ingest.
+ if ( $connection->exists( $params ) ) {
+ $doc = $connection->get( $params + [ '_source_include' => [ 'file_sha1', 'subject', 'text_raw', 'text_copy', 'P*' ] ] );
+ }
+
+ // Is the sha1 the same? Don't do anything since the content is expected
+ // to be the same!
+ if ( $this->sha1Check && isset( $doc['_source']['file_sha1'] ) && $doc['_source']['file_sha1'] === $sha1 ) {
+ $ingest = false;
+ }
+
+ $context = [
+ 'method' => __METHOD__,
+ 'role' => 'production',
+ 'origin' => $this->origin,
+ 'subject' => $dataItem->getHash()
+ ];
+
+ if ( $ingest === false ) {
+
+ $msg = [
+ 'File indexer',
+ 'Skipping the ingest process',
+ 'Found identical file_sha1 ({subject})'
+ ];
+
+ return $this->logger->info( $msg, $context );
+ }
+
+ $contents = '';
+
+ // Avoid a "failed to open stream: HTTP request failed! HTTP/1.1 404 Not Found"
+ $file_headers = @get_headers( $url );
+
+ if ( $file_headers !== false && $file_headers[0] !== 'HTTP/1.1 404 Not Found' && $file_headers[0] !== 'HTTP/1.0 404 Not Found' ) {
+ $contents = file_get_contents( $url );
+ } else {
+ $this->logger->info( [ 'File indexer', "HTTP/1.1 404 Not Found for $url" ], $context );
+ }
+
+ $params += [
+ 'pipeline' => 'attachment',
+ 'body' => [
+ 'file_content' => base64_encode( $contents ),
+ 'file_path' => $url,
+ 'file_sha1' => $sha1,
+ ] + $doc['_source']
+ ];
+
+ $context['response'] = $connection->index( $params );
+ $context['procTime'] = microtime( true ) + $time;
+
+ $msg = [
+ 'File indexer',
+ 'Ingest process completed ({subject})',
+ 'procTime (in sec): {procTime}',
+ 'Response: {response}'
+ ];
+
+ $this->logger->info( $msg, $context );
+
+ // Don't use the ElasticStore otherwise we index the added fields once more
+ // and hereby remove the content from the attachment! and start a circle
+ // since the annotation update can only happen after the information is
+ // retrieved from ES.
+ $this->addAnnotation(
+ ApplicationFactory::getInstance()->getStore( '\SMW\SQLStore\SQLStore' ),
+ $dataItem
+ );
+ }
+
+ /**
+ * @since 3.0
+ *
+ * @param Store $store
+ * @param DIWikiPage $dataItem
+ */
+ public function addAnnotation( Store $store, DIWikiPage $dataItem ) {
+
+ $time = -microtime( true );
+
+ if ( $dataItem->getId() == 0 ) {
+ $dataItem->setId( $this->indexer->getId( $dataItem ) );
+ }
+
+ if ( $dataItem->getId() == 0 ) {
+ throw new RuntimException( "Missing ID: " . $dataItem );
+ }
+
+ $context = [
+ 'method' => __METHOD__,
+ 'role' => 'production',
+ 'origin' => $this->origin,
+ 'subject' => $dataItem->getHash()
+ ];
+
+ $semanticData = $store->getSemanticData( $dataItem );
+ $connection = $this->indexer->getConnection();
+
+ $index = $this->indexer->getIndexName( ElasticClient::TYPE_DATA );
+ $doc = [ '_source' => [] ];
+
+ $params = [
+ 'index' => $index,
+ 'type' => ElasticClient::TYPE_DATA,
+ 'id' => $dataItem->getId(),
+ ];
+
+ if ( !$connection->exists( $params ) ) {
+
+ $msg = [
+ 'File indexer',
+ 'Abort annotation update',
+ 'Missing {id} document!'
+ ];
+
+ return $this->logger->info( $msg, $context + [ 'id' => $dataItem->getId() ] );
+ }
+
+ $params = $params + [
+ '_source_include' => [
+ 'file_sha1',
+ 'attachment.date',
+ 'attachment.content_type',
+ 'attachment.author',
+ 'attachment.language',
+ 'attachment.title',
+ 'attachment.content_length'
+ ]
+ ];
+
+ $doc = $connection->get( $params );
+
+ if ( !isset( $doc['_source']['file_sha1'] ) ) {
+
+ $msg = [
+ 'File indexer',
+ 'No annotation update',
+ 'Missing file_sha1!'
+ ];
+
+ return $this->logger->info( $msg, $context );
+ }
+
+ $containerSemanticData = $this->newContainerSemanticData(
+ $dataItem,
+ $doc
+ );
+
+ $attachmentAnnotator = new AttachmentAnnotator(
+ $containerSemanticData,
+ $doc
+ );
+
+ $attachmentAnnotator->addAnnotation();
+ $property = $attachmentAnnotator->getProperty();
+
+ // Remove any existing `_FILE_ATTCH` in case it was a reupload with a different
+ // content sha1
+ $semanticData->removeProperty( $property );
+
+ $semanticData->addPropertyObjectValue(
+ $property,
+ $attachmentAnnotator->getContainer()
+ );
+
+ $callableUpdate = ApplicationFactory::getInstance()->newDeferredTransactionalCallableUpdate( function() use( $store, $semanticData, $attachmentAnnotator ) {
+ // Update the SQLStore with the annotated information which will NOT
+ // trigger another ES index update BUT ...
+ $store->updateData( $semanticData );
+
+ // ... we need to replicate the container data (subobject) in order to
+ // make them usable via query engine therefore ...
+ $this->indexAttachmentInfo( $attachmentAnnotator );
+ } );
+
+ $callableUpdate->setOrigin( __METHOD__ );
+ $callableUpdate->waitOnTransactionIdle();
+ $callableUpdate->pushUpdate();
+
+ $context['procTime'] = microtime( true ) + $time;
+
+ $msg = [
+ 'File indexer',
+ 'Attachment annotation update completed ({subject})',
+ 'procTime (in sec): {procTime}'
+ ];
+
+ $this->logger->info( $msg, $context );
+ }
+
+ /**
+ * Meta assignments from a file ingest need to be republished in a SMW conform
+ * manner so that property path `[[File attachment.Content title::..]]` work
+ * as expected.
+ *
+ * @since 3.0
+ *
+ * @param AttachmentAnnotator $attachmentAnnotator
+ */
+ public function indexAttachmentInfo( AttachmentAnnotator $attachmentAnnotator ) {
+
+ $data = [];
+ $time = -microtime( true );
+
+ $semanticData = $attachmentAnnotator->getSemanticData();
+ $subject = $semanticData->getSubject();
+
+ // Find base document ID
+ $baseDocId = $this->indexer->getId( $subject->asBase() );
+
+ if ( $baseDocId == 0 ) {
+ throw new RuntimeException( "Missing ID: " . $subject );
+ }
+
+ $subject->setId( $this->indexer->getId( $subject ) );
+
+ if ( $subject->getId() == 0 ) {
+ throw new RuntimeException( "Missing ID: " . $subject );
+ }
+
+ $context = [
+ 'method' => __METHOD__,
+ 'role' => 'production',
+ 'origin' => $this->origin,
+ 'subject' => $subject->getHash()
+ ];
+
+ foreach ( $semanticData->getProperties() as $property ) {
+
+ $pid = $this->indexer->getId(
+ $property->getCanonicalDiWikiPage()
+ );
+
+ $pid = FieldMapper::getPID( $pid );
+ $data[$pid] = [];
+ $field = FieldMapper::getField( $property );
+
+ $data[$pid][$field] = [];
+
+ foreach ( $semanticData->getPropertyValues( $property ) as $dataItem ) {
+ $data[$pid][$field][] = $dataItem->getSortKey();
+ }
+ }
+
+ $this->indexer->create( $subject, $data );
+
+ // Attach the subobject to the base subject
+ $response = $this->upsertDoc(
+ $baseDocId,
+ $subject,
+ $attachmentAnnotator->getProperty()
+ );
+
+ $context['time'] = microtime( true ) + $time;
+ $context['response'] = $response;
+
+ $msg = [
+ 'File indexer',
+ 'Pushed attachment information to ES ({subject})',
+ 'procTime (in sec): {procTime}',
+ 'Response: {response}'
+ ];
+
+ $this->logger->info( $msg, $context );
+ }
+
+ private function upsertDoc( $baseDocId, $subject, $property ) {
+
+ $params = [
+ '_index' => $this->indexer->getIndexName( ElasticClient::TYPE_DATA ),
+ '_type' => ElasticClient::TYPE_DATA
+ ];
+
+ $bulk = $this->indexer->newBulk( $params );
+ $data = [];
+
+ $pid = $this->indexer->getId(
+ $property->getCanonicalDiWikiPage()
+ );
+
+ $pid = FieldMapper::getPID( $pid );
+ $data[$pid] = [];
+
+ // It is the ID field we want not any type related field!
+ $field = 'wpgID';
+
+ $data[$pid][$field] = [];
+ $data[$pid][$field][] = $subject->getId();
+
+ // Upsert of the base document to link subject -> subobject otherwise
+ // a property path like `File attachment.Content length`) is not going
+ // to work
+ $bulk->upsert( [ '_id' => $baseDocId ], $data );
+
+ return $bulk->execute();
+ }
+
+ private function newContainerSemanticData( $dataItem, $doc ) {
+
+ $subobjectName = '_FILE' . md5( $doc['_source']['file_sha1'] );
+
+ $subject = new DIWikiPage(
+ $dataItem->getDBkey(),
+ $dataItem->getNamespace(),
+ $dataItem->getInterwiki(),
+ $subobjectName
+ );
+
+ return new ContainerSemanticData( $subject );
+ }
+
+}