diff options
Diffstat (limited to 'www/wiki/includes/libs/objectcache/WANObjectCacheReaper.php')
-rw-r--r-- | www/wiki/includes/libs/objectcache/WANObjectCacheReaper.php | 204 |
1 files changed, 204 insertions, 0 deletions
diff --git a/www/wiki/includes/libs/objectcache/WANObjectCacheReaper.php b/www/wiki/includes/libs/objectcache/WANObjectCacheReaper.php new file mode 100644 index 00000000..14737b14 --- /dev/null +++ b/www/wiki/includes/libs/objectcache/WANObjectCacheReaper.php @@ -0,0 +1,204 @@ +<?php +/** + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with this program; if not, write to the Free Software Foundation, Inc., + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + * http://www.gnu.org/copyleft/gpl.html + * + * @file + * @ingroup Cache + */ + +use Psr\Log\LoggerAwareInterface; +use Psr\Log\LoggerInterface; +use Psr\Log\NullLogger; +use Wikimedia\ScopedCallback; + +/** + * Class for scanning through chronological, log-structured data or change logs + * and locally purging cache keys related to entities that appear in this data. + * + * This is useful for repairing cache when purges are missed by using a reliable + * stream, such as Kafka or a replicated MySQL table. Purge loss between datacenters + * is expected to be more common than within them. + * + * @since 1.28 + */ +class WANObjectCacheReaper implements LoggerAwareInterface { + /** @var WANObjectCache */ + protected $cache; + /** @var BagOStuff */ + protected $store; + /** @var callable */ + protected $logChunkCallback; + /** @var callable */ + protected $keyListCallback; + /** @var LoggerInterface */ + protected $logger; + + /** @var string */ + protected $channel; + /** @var int */ + protected $initialStartWindow; + + /** + * @param WANObjectCache $cache Cache to reap bad keys from + * @param BagOStuff $store Cache to store positions use for locking + * @param callable $logCallback Callback taking arguments: + * - The starting position as a UNIX timestamp + * - The starting unique ID used for breaking timestamp collisions or null + * - The ending position as a UNIX timestamp + * - The maximum number of results to return + * It returns a list of maps of (key: cache key, pos: UNIX timestamp, id: unique ID) + * for each key affected, with the corrosponding event timestamp/ID information. + * The events should be in ascending order, by (timestamp,id). + * @param callable $keyCallback Callback taking arguments: + * - The WANObjectCache instance + * - An object from the event log + * It should return a list of WAN cache keys. + * The callback must fully duck-type test the object, since can be any model class. + * @param array $params Additional options: + * - channel: the name of the update event stream. + * Default: WANObjectCache::DEFAULT_PURGE_CHANNEL. + * - initialStartWindow: seconds back in time to start if the position is lost. + * Default: 1 hour. + * - logger: an SPL monolog instance [optional] + */ + public function __construct( + WANObjectCache $cache, + BagOStuff $store, + callable $logCallback, + callable $keyCallback, + array $params + ) { + $this->cache = $cache; + $this->store = $store; + + $this->logChunkCallback = $logCallback; + $this->keyListCallback = $keyCallback; + if ( isset( $params['channel'] ) ) { + $this->channel = $params['channel']; + } else { + throw new UnexpectedValueException( "No channel specified." ); + } + + $this->initialStartWindow = isset( $params['initialStartWindow'] ) + ? $params['initialStartWindow'] + : 3600; + $this->logger = isset( $params['logger'] ) + ? $params['logger'] + : new NullLogger(); + } + + public function setLogger( LoggerInterface $logger ) { + $this->logger = $logger; + } + + /** + * Check and reap stale keys based on a chunk of events + * + * @param int $n Number of events + * @return int Number of keys checked + */ + final public function invoke( $n = 100 ) { + $posKey = $this->store->makeGlobalKey( 'WANCache', 'reaper', $this->channel ); + $scopeLock = $this->store->getScopedLock( "$posKey:busy", 0 ); + if ( !$scopeLock ) { + return 0; + } + + $now = time(); + $status = $this->store->get( $posKey ); + if ( !$status ) { + $status = [ 'pos' => $now - $this->initialStartWindow, 'id' => null ]; + } + + // Get events for entities who's keys tombstones/hold-off should have expired by now + $events = call_user_func_array( + $this->logChunkCallback, + [ $status['pos'], $status['id'], $now - WANObjectCache::HOLDOFF_TTL - 1, $n ] + ); + + $event = null; + $keyEvents = []; + foreach ( $events as $event ) { + $keys = call_user_func_array( + $this->keyListCallback, + [ $this->cache, $event['item'] ] + ); + foreach ( $keys as $key ) { + unset( $keyEvents[$key] ); // use only the latest per key + $keyEvents[$key] = [ + 'pos' => $event['pos'], + 'id' => $event['id'] + ]; + } + } + + $purgeCount = 0; + $lastOkEvent = null; + foreach ( $keyEvents as $key => $keyEvent ) { + if ( !$this->cache->reap( $key, $keyEvent['pos'] ) ) { + break; + } + ++$purgeCount; + $lastOkEvent = $event; + } + + if ( $lastOkEvent ) { + $ok = $this->store->merge( + $posKey, + function ( $bag, $key, $curValue ) use ( $lastOkEvent ) { + if ( !$curValue ) { + // Use new position + } else { + $curCoord = [ $curValue['pos'], $curValue['id'] ]; + $newCoord = [ $lastOkEvent['pos'], $lastOkEvent['id'] ]; + if ( $newCoord < $curCoord ) { + // Keep prior position instead of rolling it back + return $curValue; + } + } + + return [ + 'pos' => $lastOkEvent['pos'], + 'id' => $lastOkEvent['id'], + 'ctime' => $curValue ? $curValue['ctime'] : date( 'c' ) + ]; + }, + IExpiringStore::TTL_INDEFINITE + ); + + $pos = $lastOkEvent['pos']; + $id = $lastOkEvent['id']; + if ( $ok ) { + $this->logger->info( "Updated cache reap position ($pos, $id)." ); + } else { + $this->logger->error( "Could not update cache reap position ($pos, $id)." ); + } + } + + ScopedCallback::consume( $scopeLock ); + + return $purgeCount; + } + + /** + * @return array|bool Returns (pos, id) map or false if not set + */ + public function getState() { + $posKey = $this->store->makeGlobalKey( 'WANCache', 'reaper', $this->channel ); + + return $this->store->get( $posKey ); + } +} |