summaryrefslogtreecommitdiff
path: root/www/wiki/includes/libs/objectcache/WANObjectCacheReaper.php
diff options
context:
space:
mode:
Diffstat (limited to 'www/wiki/includes/libs/objectcache/WANObjectCacheReaper.php')
-rw-r--r--www/wiki/includes/libs/objectcache/WANObjectCacheReaper.php204
1 files changed, 204 insertions, 0 deletions
diff --git a/www/wiki/includes/libs/objectcache/WANObjectCacheReaper.php b/www/wiki/includes/libs/objectcache/WANObjectCacheReaper.php
new file mode 100644
index 00000000..14737b14
--- /dev/null
+++ b/www/wiki/includes/libs/objectcache/WANObjectCacheReaper.php
@@ -0,0 +1,204 @@
+<?php
+/**
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License along
+ * with this program; if not, write to the Free Software Foundation, Inc.,
+ * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
+ * http://www.gnu.org/copyleft/gpl.html
+ *
+ * @file
+ * @ingroup Cache
+ */
+
+use Psr\Log\LoggerAwareInterface;
+use Psr\Log\LoggerInterface;
+use Psr\Log\NullLogger;
+use Wikimedia\ScopedCallback;
+
+/**
+ * Class for scanning through chronological, log-structured data or change logs
+ * and locally purging cache keys related to entities that appear in this data.
+ *
+ * This is useful for repairing cache when purges are missed by using a reliable
+ * stream, such as Kafka or a replicated MySQL table. Purge loss between datacenters
+ * is expected to be more common than within them.
+ *
+ * @since 1.28
+ */
+class WANObjectCacheReaper implements LoggerAwareInterface {
+ /** @var WANObjectCache */
+ protected $cache;
+ /** @var BagOStuff */
+ protected $store;
+ /** @var callable */
+ protected $logChunkCallback;
+ /** @var callable */
+ protected $keyListCallback;
+ /** @var LoggerInterface */
+ protected $logger;
+
+ /** @var string */
+ protected $channel;
+ /** @var int */
+ protected $initialStartWindow;
+
+ /**
+ * @param WANObjectCache $cache Cache to reap bad keys from
+ * @param BagOStuff $store Cache to store positions use for locking
+ * @param callable $logCallback Callback taking arguments:
+ * - The starting position as a UNIX timestamp
+ * - The starting unique ID used for breaking timestamp collisions or null
+ * - The ending position as a UNIX timestamp
+ * - The maximum number of results to return
+ * It returns a list of maps of (key: cache key, pos: UNIX timestamp, id: unique ID)
+ * for each key affected, with the corrosponding event timestamp/ID information.
+ * The events should be in ascending order, by (timestamp,id).
+ * @param callable $keyCallback Callback taking arguments:
+ * - The WANObjectCache instance
+ * - An object from the event log
+ * It should return a list of WAN cache keys.
+ * The callback must fully duck-type test the object, since can be any model class.
+ * @param array $params Additional options:
+ * - channel: the name of the update event stream.
+ * Default: WANObjectCache::DEFAULT_PURGE_CHANNEL.
+ * - initialStartWindow: seconds back in time to start if the position is lost.
+ * Default: 1 hour.
+ * - logger: an SPL monolog instance [optional]
+ */
+ public function __construct(
+ WANObjectCache $cache,
+ BagOStuff $store,
+ callable $logCallback,
+ callable $keyCallback,
+ array $params
+ ) {
+ $this->cache = $cache;
+ $this->store = $store;
+
+ $this->logChunkCallback = $logCallback;
+ $this->keyListCallback = $keyCallback;
+ if ( isset( $params['channel'] ) ) {
+ $this->channel = $params['channel'];
+ } else {
+ throw new UnexpectedValueException( "No channel specified." );
+ }
+
+ $this->initialStartWindow = isset( $params['initialStartWindow'] )
+ ? $params['initialStartWindow']
+ : 3600;
+ $this->logger = isset( $params['logger'] )
+ ? $params['logger']
+ : new NullLogger();
+ }
+
+ public function setLogger( LoggerInterface $logger ) {
+ $this->logger = $logger;
+ }
+
+ /**
+ * Check and reap stale keys based on a chunk of events
+ *
+ * @param int $n Number of events
+ * @return int Number of keys checked
+ */
+ final public function invoke( $n = 100 ) {
+ $posKey = $this->store->makeGlobalKey( 'WANCache', 'reaper', $this->channel );
+ $scopeLock = $this->store->getScopedLock( "$posKey:busy", 0 );
+ if ( !$scopeLock ) {
+ return 0;
+ }
+
+ $now = time();
+ $status = $this->store->get( $posKey );
+ if ( !$status ) {
+ $status = [ 'pos' => $now - $this->initialStartWindow, 'id' => null ];
+ }
+
+ // Get events for entities who's keys tombstones/hold-off should have expired by now
+ $events = call_user_func_array(
+ $this->logChunkCallback,
+ [ $status['pos'], $status['id'], $now - WANObjectCache::HOLDOFF_TTL - 1, $n ]
+ );
+
+ $event = null;
+ $keyEvents = [];
+ foreach ( $events as $event ) {
+ $keys = call_user_func_array(
+ $this->keyListCallback,
+ [ $this->cache, $event['item'] ]
+ );
+ foreach ( $keys as $key ) {
+ unset( $keyEvents[$key] ); // use only the latest per key
+ $keyEvents[$key] = [
+ 'pos' => $event['pos'],
+ 'id' => $event['id']
+ ];
+ }
+ }
+
+ $purgeCount = 0;
+ $lastOkEvent = null;
+ foreach ( $keyEvents as $key => $keyEvent ) {
+ if ( !$this->cache->reap( $key, $keyEvent['pos'] ) ) {
+ break;
+ }
+ ++$purgeCount;
+ $lastOkEvent = $event;
+ }
+
+ if ( $lastOkEvent ) {
+ $ok = $this->store->merge(
+ $posKey,
+ function ( $bag, $key, $curValue ) use ( $lastOkEvent ) {
+ if ( !$curValue ) {
+ // Use new position
+ } else {
+ $curCoord = [ $curValue['pos'], $curValue['id'] ];
+ $newCoord = [ $lastOkEvent['pos'], $lastOkEvent['id'] ];
+ if ( $newCoord < $curCoord ) {
+ // Keep prior position instead of rolling it back
+ return $curValue;
+ }
+ }
+
+ return [
+ 'pos' => $lastOkEvent['pos'],
+ 'id' => $lastOkEvent['id'],
+ 'ctime' => $curValue ? $curValue['ctime'] : date( 'c' )
+ ];
+ },
+ IExpiringStore::TTL_INDEFINITE
+ );
+
+ $pos = $lastOkEvent['pos'];
+ $id = $lastOkEvent['id'];
+ if ( $ok ) {
+ $this->logger->info( "Updated cache reap position ($pos, $id)." );
+ } else {
+ $this->logger->error( "Could not update cache reap position ($pos, $id)." );
+ }
+ }
+
+ ScopedCallback::consume( $scopeLock );
+
+ return $purgeCount;
+ }
+
+ /**
+ * @return array|bool Returns (pos, id) map or false if not set
+ */
+ public function getState() {
+ $posKey = $this->store->makeGlobalKey( 'WANCache', 'reaper', $this->channel );
+
+ return $this->store->get( $posKey );
+ }
+}