diff options
author | Yaco <franco@reevo.org> | 2020-06-04 11:01:00 -0300 |
---|---|---|
committer | Yaco <franco@reevo.org> | 2020-06-04 11:01:00 -0300 |
commit | fc7369835258467bf97eb64f184b93691f9a9fd5 (patch) | |
tree | daabd60089d2dd76d9f5fb416b005fbe159c799d /www/wiki/includes/objectcache |
first commit
Diffstat (limited to 'www/wiki/includes/objectcache')
-rw-r--r-- | www/wiki/includes/objectcache/ObjectCache.php | 414 | ||||
-rw-r--r-- | www/wiki/includes/objectcache/SqlBagOStuff.php | 825 |
2 files changed, 1239 insertions, 0 deletions
diff --git a/www/wiki/includes/objectcache/ObjectCache.php b/www/wiki/includes/objectcache/ObjectCache.php new file mode 100644 index 00000000..67d23460 --- /dev/null +++ b/www/wiki/includes/objectcache/ObjectCache.php @@ -0,0 +1,414 @@ +<?php +/** + * Functions to get cache objects. + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with this program; if not, write to the Free Software Foundation, Inc., + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + * http://www.gnu.org/copyleft/gpl.html + * + * @file + * @ingroup Cache + */ + +use MediaWiki\Logger\LoggerFactory; +use MediaWiki\MediaWikiServices; + +/** + * Functions to get cache objects + * + * The word "cache" has two main dictionary meanings, and both + * are used in this factory class. They are: + * + * - a) Cache (the computer science definition). + * A place to store copies or computations on existing data for + * higher access speeds. + * - b) Storage. + * A place to store lightweight data that is not canonically + * stored anywhere else (e.g. a "hoard" of objects). + * + * The former should always use strongly consistent stores, so callers don't + * have to deal with stale reads. The latter may be eventually consistent, but + * callers can use BagOStuff:READ_LATEST to see the latest available data. + * + * Primary entry points: + * + * - ObjectCache::getMainWANInstance() + * Purpose: Memory cache. + * Stored in the local data-center's main cache (keyspace different from local-cluster cache). + * Delete events are broadcasted to other DCs main cache. See WANObjectCache for details. + * + * - ObjectCache::getLocalServerInstance( $fallbackType ) + * Purpose: Memory cache for very hot keys. + * Stored only on the individual web server (typically APC or APCu for web requests, + * and EmptyBagOStuff in CLI mode). + * Not replicated to the other servers. + * + * - ObjectCache::getLocalClusterInstance() + * Purpose: Memory storage for per-cluster coordination and tracking. + * A typical use case would be a rate limit counter or cache regeneration mutex. + * Stored centrally within the local data-center. Not replicated to other DCs. + * Configured by $wgMainCacheType. + * + * - ObjectCache::getMainStashInstance() + * Purpose: Ephemeral global storage. + * Stored centrally within the primary data-center. + * Changes are applied there first and replicated to other DCs (best-effort). + * To retrieve the latest value (e.g. not from a replica DB), use BagOStuff::READ_LATEST. + * This store may be subject to LRU style evictions. + * + * - ObjectCache::getInstance( $cacheType ) + * Purpose: Special cases (like tiered memory/disk caches). + * Get a specific cache type by key in $wgObjectCaches. + * + * All the above cache instances (BagOStuff and WANObjectCache) have their makeKey() + * method scoped to the *current* wiki ID. Use makeGlobalKey() to avoid this scoping + * when using keys that need to be shared amongst wikis. + * + * @ingroup Cache + */ +class ObjectCache { + /** @var BagOStuff[] Map of (id => BagOStuff) */ + public static $instances = []; + /** @var WANObjectCache[] Map of (id => WANObjectCache) */ + public static $wanInstances = []; + + /** + * Get a cached instance of the specified type of cache object. + * + * @param string $id A key in $wgObjectCaches. + * @return BagOStuff + */ + public static function getInstance( $id ) { + if ( !isset( self::$instances[$id] ) ) { + self::$instances[$id] = self::newFromId( $id ); + } + + return self::$instances[$id]; + } + + /** + * Get a cached instance of the specified type of WAN cache object. + * + * @since 1.26 + * @param string $id A key in $wgWANObjectCaches. + * @return WANObjectCache + */ + public static function getWANInstance( $id ) { + if ( !isset( self::$wanInstances[$id] ) ) { + self::$wanInstances[$id] = self::newWANCacheFromId( $id ); + } + + return self::$wanInstances[$id]; + } + + /** + * Create a new cache object of the specified type. + * + * @param string $id A key in $wgObjectCaches. + * @return BagOStuff + * @throws InvalidArgumentException + */ + public static function newFromId( $id ) { + global $wgObjectCaches; + + if ( !isset( $wgObjectCaches[$id] ) ) { + // Always recognize these ones + if ( $id === CACHE_NONE ) { + return new EmptyBagOStuff(); + } elseif ( $id === 'hash' ) { + return new HashBagOStuff(); + } + + throw new InvalidArgumentException( "Invalid object cache type \"$id\" requested. " . + "It is not present in \$wgObjectCaches." ); + } + + return self::newFromParams( $wgObjectCaches[$id] ); + } + + /** + * Get the default keyspace for this wiki. + * + * This is either the value of the `CachePrefix` configuration variable, + * or (if the former is unset) the `DBname` configuration variable, with + * `DBprefix` (if defined). + * + * @return string + */ + public static function getDefaultKeyspace() { + global $wgCachePrefix; + + $keyspace = $wgCachePrefix; + if ( is_string( $keyspace ) && $keyspace !== '' ) { + return $keyspace; + } + + return wfWikiID(); + } + + /** + * Create a new cache object from parameters. + * + * @param array $params Must have 'factory' or 'class' property. + * - factory: Callback passed $params that returns BagOStuff. + * - class: BagOStuff subclass constructed with $params. + * - loggroup: Alias to set 'logger' key with LoggerFactory group. + * - .. Other parameters passed to factory or class. + * @return BagOStuff + * @throws InvalidArgumentException + */ + public static function newFromParams( $params ) { + if ( isset( $params['loggroup'] ) ) { + $params['logger'] = LoggerFactory::getInstance( $params['loggroup'] ); + } else { + $params['logger'] = LoggerFactory::getInstance( 'objectcache' ); + } + if ( !isset( $params['keyspace'] ) ) { + $params['keyspace'] = self::getDefaultKeyspace(); + } + if ( isset( $params['factory'] ) ) { + return call_user_func( $params['factory'], $params ); + } elseif ( isset( $params['class'] ) ) { + $class = $params['class']; + // Automatically set the 'async' update handler + $params['asyncHandler'] = isset( $params['asyncHandler'] ) + ? $params['asyncHandler'] + : 'DeferredUpdates::addCallableUpdate'; + // Enable reportDupes by default + $params['reportDupes'] = isset( $params['reportDupes'] ) + ? $params['reportDupes'] + : true; + // Do b/c logic for SqlBagOStuff + if ( is_a( $class, SqlBagOStuff::class, true ) ) { + if ( isset( $params['server'] ) && !isset( $params['servers'] ) ) { + $params['servers'] = [ $params['server'] ]; + unset( $params['server'] ); + } + // In the past it was not required to set 'dbDirectory' in $wgObjectCaches + if ( isset( $params['servers'] ) ) { + foreach ( $params['servers'] as &$server ) { + if ( $server['type'] === 'sqlite' && !isset( $server['dbDirectory'] ) ) { + $server['dbDirectory'] = MediaWikiServices::getInstance() + ->getMainConfig()->get( 'SQLiteDataDir' ); + } + } + } + } + + // Do b/c logic for MemcachedBagOStuff + if ( is_subclass_of( $class, MemcachedBagOStuff::class ) ) { + if ( !isset( $params['servers'] ) ) { + $params['servers'] = $GLOBALS['wgMemCachedServers']; + } + if ( !isset( $params['debug'] ) ) { + $params['debug'] = $GLOBALS['wgMemCachedDebug']; + } + if ( !isset( $params['persistent'] ) ) { + $params['persistent'] = $GLOBALS['wgMemCachedPersistent']; + } + if ( !isset( $params['timeout'] ) ) { + $params['timeout'] = $GLOBALS['wgMemCachedTimeout']; + } + } + return new $class( $params ); + } else { + throw new InvalidArgumentException( "The definition of cache type \"" + . print_r( $params, true ) . "\" lacks both " + . "factory and class parameters." ); + } + } + + /** + * Factory function for CACHE_ANYTHING (referenced from DefaultSettings.php) + * + * CACHE_ANYTHING means that stuff has to be cached, not caching is not an option. + * If a caching method is configured for any of the main caches ($wgMainCacheType, + * $wgMessageCacheType, $wgParserCacheType), then CACHE_ANYTHING will effectively + * be an alias to the configured cache choice for that. + * If no cache choice is configured (by default $wgMainCacheType is CACHE_NONE), + * then CACHE_ANYTHING will forward to CACHE_DB. + * + * @param array $params + * @return BagOStuff + */ + public static function newAnything( $params ) { + global $wgMainCacheType, $wgMessageCacheType, $wgParserCacheType; + $candidates = [ $wgMainCacheType, $wgMessageCacheType, $wgParserCacheType ]; + foreach ( $candidates as $candidate ) { + $cache = false; + if ( $candidate !== CACHE_NONE && $candidate !== CACHE_ANYTHING ) { + $cache = self::getInstance( $candidate ); + // CACHE_ACCEL might default to nothing if no APCu + // See includes/ServiceWiring.php + if ( !( $cache instanceof EmptyBagOStuff ) ) { + return $cache; + } + } + } + + if ( MediaWikiServices::getInstance()->isServiceDisabled( 'DBLoadBalancer' ) ) { + // The LoadBalancer is disabled, probably because + // MediaWikiServices::disableStorageBackend was called. + $candidate = CACHE_NONE; + } else { + $candidate = CACHE_DB; + } + + return self::getInstance( $candidate ); + } + + /** + * Factory function for CACHE_ACCEL (referenced from DefaultSettings.php) + * + * This will look for any APC or APCu style server-local cache. + * A fallback cache can be specified if none is found. + * + * // Direct calls + * ObjectCache::getLocalServerInstance( $fallbackType ); + * + * // From $wgObjectCaches via newFromParams() + * ObjectCache::getLocalServerInstance( [ 'fallback' => $fallbackType ] ); + * + * @param int|string|array $fallback Fallback cache or parameter map with 'fallback' + * @return BagOStuff + * @throws InvalidArgumentException + * @since 1.27 + */ + public static function getLocalServerInstance( $fallback = CACHE_NONE ) { + $cache = MediaWikiServices::getInstance()->getLocalServerObjectCache(); + if ( $cache instanceof EmptyBagOStuff ) { + if ( is_array( $fallback ) ) { + $fallback = isset( $fallback['fallback'] ) ? $fallback['fallback'] : CACHE_NONE; + } + $cache = self::getInstance( $fallback ); + } + + return $cache; + } + + /** + * Create a new cache object of the specified type. + * + * @since 1.26 + * @param string $id A key in $wgWANObjectCaches. + * @return WANObjectCache + * @throws UnexpectedValueException + */ + public static function newWANCacheFromId( $id ) { + global $wgWANObjectCaches, $wgObjectCaches; + + if ( !isset( $wgWANObjectCaches[$id] ) ) { + throw new UnexpectedValueException( + "Cache type \"$id\" requested is not present in \$wgWANObjectCaches." ); + } + + $params = $wgWANObjectCaches[$id]; + if ( !isset( $wgObjectCaches[$params['cacheId']] ) ) { + throw new UnexpectedValueException( + "Cache type \"{$params['cacheId']}\" is not present in \$wgObjectCaches." ); + } + $params['store'] = $wgObjectCaches[$params['cacheId']]; + + return self::newWANCacheFromParams( $params ); + } + + /** + * Create a new cache object of the specified type. + * + * @since 1.28 + * @param array $params + * @return WANObjectCache + * @throws UnexpectedValueException + */ + public static function newWANCacheFromParams( array $params ) { + global $wgCommandLineMode; + + $services = MediaWikiServices::getInstance(); + + $erGroup = $services->getEventRelayerGroup(); + foreach ( $params['channels'] as $action => $channel ) { + $params['relayers'][$action] = $erGroup->getRelayer( $channel ); + $params['channels'][$action] = $channel; + } + $params['cache'] = self::newFromParams( $params['store'] ); + if ( isset( $params['loggroup'] ) ) { + $params['logger'] = LoggerFactory::getInstance( $params['loggroup'] ); + } else { + $params['logger'] = LoggerFactory::getInstance( 'objectcache' ); + } + if ( !$wgCommandLineMode ) { + // Send the statsd data post-send on HTTP requests; avoid in CLI mode (T181385) + $params['stats'] = $services->getStatsdDataFactory(); + // Let pre-emptive refreshes happen post-send on HTTP requests + $params['asyncHandler'] = [ DeferredUpdates::class, 'addCallableUpdate' ]; + } + $class = $params['class']; + + return new $class( $params ); + } + + /** + * Get the main cluster-local cache object. + * + * @since 1.27 + * @return BagOStuff + */ + public static function getLocalClusterInstance() { + global $wgMainCacheType; + + return self::getInstance( $wgMainCacheType ); + } + + /** + * Get the main WAN cache object. + * + * @since 1.26 + * @return WANObjectCache + * @deprecated Since 1.28 Use MediaWikiServices::getMainWANObjectCache() + */ + public static function getMainWANInstance() { + return MediaWikiServices::getInstance()->getMainWANObjectCache(); + } + + /** + * Get the cache object for the main stash. + * + * Stash objects are BagOStuff instances suitable for storing light + * weight data that is not canonically stored elsewhere (such as RDBMS). + * Stashes should be configured to propagate changes to all data-centers. + * + * Callers should be prepared for: + * - a) Writes to be slower in non-"primary" (e.g. HTTP GET/HEAD only) DCs + * - b) Reads to be eventually consistent, e.g. for get()/getMulti() + * In general, this means avoiding updates on idempotent HTTP requests and + * avoiding an assumption of perfect serializability (or accepting anomalies). + * Reads may be eventually consistent or data might rollback as nodes flap. + * Callers can use BagOStuff:READ_LATEST to see the latest available data. + * + * @return BagOStuff + * @since 1.26 + * @deprecated Since 1.28 Use MediaWikiServices::getMainObjectStash + */ + public static function getMainStashInstance() { + return MediaWikiServices::getInstance()->getMainObjectStash(); + } + + /** + * Clear all the cached instances. + */ + public static function clear() { + self::$instances = []; + self::$wanInstances = []; + } +} diff --git a/www/wiki/includes/objectcache/SqlBagOStuff.php b/www/wiki/includes/objectcache/SqlBagOStuff.php new file mode 100644 index 00000000..8ff14ed7 --- /dev/null +++ b/www/wiki/includes/objectcache/SqlBagOStuff.php @@ -0,0 +1,825 @@ +<?php +/** + * Object caching using a SQL database. + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with this program; if not, write to the Free Software Foundation, Inc., + * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + * http://www.gnu.org/copyleft/gpl.html + * + * @file + * @ingroup Cache + */ + +use MediaWiki\MediaWikiServices; +use Wikimedia\Rdbms\Database; +use Wikimedia\Rdbms\IDatabase; +use Wikimedia\Rdbms\DBError; +use Wikimedia\Rdbms\DBQueryError; +use Wikimedia\Rdbms\DBConnectionError; +use Wikimedia\Rdbms\LoadBalancer; +use Wikimedia\Rdbms\TransactionProfiler; +use Wikimedia\WaitConditionLoop; + +/** + * Class to store objects in the database + * + * @ingroup Cache + */ +class SqlBagOStuff extends BagOStuff { + /** @var array[] (server index => server config) */ + protected $serverInfos; + /** @var string[] (server index => tag/host name) */ + protected $serverTags; + /** @var int */ + protected $numServers; + /** @var int */ + protected $lastExpireAll = 0; + /** @var int */ + protected $purgePeriod = 100; + /** @var int */ + protected $shards = 1; + /** @var string */ + protected $tableName = 'objectcache'; + /** @var bool */ + protected $replicaOnly = false; + /** @var int */ + protected $syncTimeout = 3; + + /** @var LoadBalancer|null */ + protected $separateMainLB; + /** @var array */ + protected $conns; + /** @var array UNIX timestamps */ + protected $connFailureTimes = []; + /** @var array Exceptions */ + protected $connFailureErrors = []; + + /** + * Constructor. Parameters are: + * - server: A server info structure in the format required by each + * element in $wgDBServers. + * + * - servers: An array of server info structures describing a set of database servers + * to distribute keys to. If this is specified, the "server" option will be + * ignored. If string keys are used, then they will be used for consistent + * hashing *instead* of the host name (from the server config). This is useful + * when a cluster is replicated to another site (with different host names) + * but each server has a corresponding replica in the other cluster. + * + * - purgePeriod: The average number of object cache requests in between + * garbage collection operations, where expired entries + * are removed from the database. Or in other words, the + * reciprocal of the probability of purging on any given + * request. If this is set to zero, purging will never be + * done. + * + * - tableName: The table name to use, default is "objectcache". + * + * - shards: The number of tables to use for data storage on each server. + * If this is more than 1, table names will be formed in the style + * objectcacheNNN where NNN is the shard index, between 0 and + * shards-1. The number of digits will be the minimum number + * required to hold the largest shard index. Data will be + * distributed across all tables by key hash. This is for + * MySQL bugs 61735 and 61736. + * - slaveOnly: Whether to only use replica DBs and avoid triggering + * garbage collection logic of expired items. This only + * makes sense if the primary DB is used and only if get() + * calls will be used. This is used by ReplicatedBagOStuff. + * - syncTimeout: Max seconds to wait for replica DBs to catch up for WRITE_SYNC. + * + * @param array $params + */ + public function __construct( $params ) { + parent::__construct( $params ); + + $this->attrMap[self::ATTR_EMULATION] = self::QOS_EMULATION_SQL; + $this->attrMap[self::ATTR_SYNCWRITES] = self::QOS_SYNCWRITES_NONE; + + if ( isset( $params['servers'] ) ) { + $this->serverInfos = []; + $this->serverTags = []; + $this->numServers = count( $params['servers'] ); + $index = 0; + foreach ( $params['servers'] as $tag => $info ) { + $this->serverInfos[$index] = $info; + if ( is_string( $tag ) ) { + $this->serverTags[$index] = $tag; + } else { + $this->serverTags[$index] = isset( $info['host'] ) ? $info['host'] : "#$index"; + } + ++$index; + } + } elseif ( isset( $params['server'] ) ) { + $this->serverInfos = [ $params['server'] ]; + $this->numServers = count( $this->serverInfos ); + } else { + // Default to using the main wiki's database servers + $this->serverInfos = false; + $this->numServers = 1; + $this->attrMap[self::ATTR_SYNCWRITES] = self::QOS_SYNCWRITES_BE; + } + if ( isset( $params['purgePeriod'] ) ) { + $this->purgePeriod = intval( $params['purgePeriod'] ); + } + if ( isset( $params['tableName'] ) ) { + $this->tableName = $params['tableName']; + } + if ( isset( $params['shards'] ) ) { + $this->shards = intval( $params['shards'] ); + } + if ( isset( $params['syncTimeout'] ) ) { + $this->syncTimeout = $params['syncTimeout']; + } + $this->replicaOnly = !empty( $params['slaveOnly'] ); + } + + /** + * Get a connection to the specified database + * + * @param int $serverIndex + * @return Database + * @throws MWException + */ + protected function getDB( $serverIndex ) { + if ( !isset( $this->conns[$serverIndex] ) ) { + if ( $serverIndex >= $this->numServers ) { + throw new MWException( __METHOD__ . ": Invalid server index \"$serverIndex\"" ); + } + + # Don't keep timing out trying to connect for each call if the DB is down + if ( isset( $this->connFailureErrors[$serverIndex] ) + && ( time() - $this->connFailureTimes[$serverIndex] ) < 60 + ) { + throw $this->connFailureErrors[$serverIndex]; + } + + if ( $this->serverInfos ) { + // Use custom database defined by server connection info + $info = $this->serverInfos[$serverIndex]; + $type = isset( $info['type'] ) ? $info['type'] : 'mysql'; + $host = isset( $info['host'] ) ? $info['host'] : '[unknown]'; + $this->logger->debug( __CLASS__ . ": connecting to $host" ); + // Use a blank trx profiler to ignore expections as this is a cache + $info['trxProfiler'] = new TransactionProfiler(); + $db = Database::factory( $type, $info ); + $db->clearFlag( DBO_TRX ); // auto-commit mode + } else { + // Use the main LB database + $lb = MediaWikiServices::getInstance()->getDBLoadBalancer(); + $index = $this->replicaOnly ? DB_REPLICA : DB_MASTER; + if ( $lb->getServerType( $lb->getWriterIndex() ) !== 'sqlite' ) { + // Keep a separate connection to avoid contention and deadlocks + $db = $lb->getConnection( $index, [], false, $lb::CONN_TRX_AUTOCOMMIT ); + // @TODO: Use a blank trx profiler to ignore expections as this is a cache + } else { + // However, SQLite has the opposite behavior due to DB-level locking. + // Stock sqlite MediaWiki installs use a separate sqlite cache DB instead. + $db = $lb->getConnection( $index ); + } + } + + $this->logger->debug( sprintf( "Connection %s will be used for SqlBagOStuff", $db ) ); + $this->conns[$serverIndex] = $db; + } + + return $this->conns[$serverIndex]; + } + + /** + * Get the server index and table name for a given key + * @param string $key + * @return array Server index and table name + */ + protected function getTableByKey( $key ) { + if ( $this->shards > 1 ) { + $hash = hexdec( substr( md5( $key ), 0, 8 ) ) & 0x7fffffff; + $tableIndex = $hash % $this->shards; + } else { + $tableIndex = 0; + } + if ( $this->numServers > 1 ) { + $sortedServers = $this->serverTags; + ArrayUtils::consistentHashSort( $sortedServers, $key ); + reset( $sortedServers ); + $serverIndex = key( $sortedServers ); + } else { + $serverIndex = 0; + } + return [ $serverIndex, $this->getTableNameByShard( $tableIndex ) ]; + } + + /** + * Get the table name for a given shard index + * @param int $index + * @return string + */ + protected function getTableNameByShard( $index ) { + if ( $this->shards > 1 ) { + $decimals = strlen( $this->shards - 1 ); + return $this->tableName . + sprintf( "%0{$decimals}d", $index ); + } else { + return $this->tableName; + } + } + + protected function doGet( $key, $flags = 0 ) { + $casToken = null; + + return $this->getWithToken( $key, $casToken, $flags ); + } + + protected function getWithToken( $key, &$casToken, $flags = 0 ) { + $values = $this->getMulti( [ $key ] ); + if ( array_key_exists( $key, $values ) ) { + $casToken = $values[$key]; + return $values[$key]; + } + return false; + } + + public function getMulti( array $keys, $flags = 0 ) { + $values = []; // array of (key => value) + + $keysByTable = []; + foreach ( $keys as $key ) { + list( $serverIndex, $tableName ) = $this->getTableByKey( $key ); + $keysByTable[$serverIndex][$tableName][] = $key; + } + + $this->garbageCollect(); // expire old entries if any + + $dataRows = []; + foreach ( $keysByTable as $serverIndex => $serverKeys ) { + try { + $db = $this->getDB( $serverIndex ); + foreach ( $serverKeys as $tableName => $tableKeys ) { + $res = $db->select( $tableName, + [ 'keyname', 'value', 'exptime' ], + [ 'keyname' => $tableKeys ], + __METHOD__, + // Approximate write-on-the-fly BagOStuff API via blocking. + // This approximation fails if a ROLLBACK happens (which is rare). + // We do not want to flush the TRX as that can break callers. + $db->trxLevel() ? [ 'LOCK IN SHARE MODE' ] : [] + ); + if ( $res === false ) { + continue; + } + foreach ( $res as $row ) { + $row->serverIndex = $serverIndex; + $row->tableName = $tableName; + $dataRows[$row->keyname] = $row; + } + } + } catch ( DBError $e ) { + $this->handleReadError( $e, $serverIndex ); + } + } + + foreach ( $keys as $key ) { + if ( isset( $dataRows[$key] ) ) { // HIT? + $row = $dataRows[$key]; + $this->debug( "get: retrieved data; expiry time is " . $row->exptime ); + $db = null; + try { + $db = $this->getDB( $row->serverIndex ); + if ( $this->isExpired( $db, $row->exptime ) ) { // MISS + $this->debug( "get: key has expired" ); + } else { // HIT + $values[$key] = $this->unserialize( $db->decodeBlob( $row->value ) ); + } + } catch ( DBQueryError $e ) { + $this->handleWriteError( $e, $db, $row->serverIndex ); + } + } else { // MISS + $this->debug( 'get: no matching rows' ); + } + } + + return $values; + } + + public function setMulti( array $data, $expiry = 0 ) { + $keysByTable = []; + foreach ( $data as $key => $value ) { + list( $serverIndex, $tableName ) = $this->getTableByKey( $key ); + $keysByTable[$serverIndex][$tableName][] = $key; + } + + $this->garbageCollect(); // expire old entries if any + + $result = true; + $exptime = (int)$expiry; + foreach ( $keysByTable as $serverIndex => $serverKeys ) { + $db = null; + try { + $db = $this->getDB( $serverIndex ); + } catch ( DBError $e ) { + $this->handleWriteError( $e, $db, $serverIndex ); + $result = false; + continue; + } + + if ( $exptime < 0 ) { + $exptime = 0; + } + + if ( $exptime == 0 ) { + $encExpiry = $this->getMaxDateTime( $db ); + } else { + $exptime = $this->convertExpiry( $exptime ); + $encExpiry = $db->timestamp( $exptime ); + } + foreach ( $serverKeys as $tableName => $tableKeys ) { + $rows = []; + foreach ( $tableKeys as $key ) { + $rows[] = [ + 'keyname' => $key, + 'value' => $db->encodeBlob( $this->serialize( $data[$key] ) ), + 'exptime' => $encExpiry, + ]; + } + + try { + $db->replace( + $tableName, + [ 'keyname' ], + $rows, + __METHOD__ + ); + } catch ( DBError $e ) { + $this->handleWriteError( $e, $db, $serverIndex ); + $result = false; + } + + } + + } + + return $result; + } + + public function set( $key, $value, $exptime = 0, $flags = 0 ) { + $ok = $this->setMulti( [ $key => $value ], $exptime ); + if ( ( $flags & self::WRITE_SYNC ) == self::WRITE_SYNC ) { + $ok = $this->waitForReplication() && $ok; + } + + return $ok; + } + + protected function cas( $casToken, $key, $value, $exptime = 0 ) { + list( $serverIndex, $tableName ) = $this->getTableByKey( $key ); + $db = null; + try { + $db = $this->getDB( $serverIndex ); + $exptime = intval( $exptime ); + + if ( $exptime < 0 ) { + $exptime = 0; + } + + if ( $exptime == 0 ) { + $encExpiry = $this->getMaxDateTime( $db ); + } else { + $exptime = $this->convertExpiry( $exptime ); + $encExpiry = $db->timestamp( $exptime ); + } + // (T26425) use a replace if the db supports it instead of + // delete/insert to avoid clashes with conflicting keynames + $db->update( + $tableName, + [ + 'keyname' => $key, + 'value' => $db->encodeBlob( $this->serialize( $value ) ), + 'exptime' => $encExpiry + ], + [ + 'keyname' => $key, + 'value' => $db->encodeBlob( $this->serialize( $casToken ) ) + ], + __METHOD__ + ); + } catch ( DBQueryError $e ) { + $this->handleWriteError( $e, $db, $serverIndex ); + + return false; + } + + return (bool)$db->affectedRows(); + } + + public function delete( $key ) { + list( $serverIndex, $tableName ) = $this->getTableByKey( $key ); + $db = null; + try { + $db = $this->getDB( $serverIndex ); + $db->delete( + $tableName, + [ 'keyname' => $key ], + __METHOD__ ); + } catch ( DBError $e ) { + $this->handleWriteError( $e, $db, $serverIndex ); + return false; + } + + return true; + } + + public function incr( $key, $step = 1 ) { + list( $serverIndex, $tableName ) = $this->getTableByKey( $key ); + $db = null; + try { + $db = $this->getDB( $serverIndex ); + $step = intval( $step ); + $row = $db->selectRow( + $tableName, + [ 'value', 'exptime' ], + [ 'keyname' => $key ], + __METHOD__, + [ 'FOR UPDATE' ] ); + if ( $row === false ) { + // Missing + + return null; + } + $db->delete( $tableName, [ 'keyname' => $key ], __METHOD__ ); + if ( $this->isExpired( $db, $row->exptime ) ) { + // Expired, do not reinsert + + return null; + } + + $oldValue = intval( $this->unserialize( $db->decodeBlob( $row->value ) ) ); + $newValue = $oldValue + $step; + $db->insert( $tableName, + [ + 'keyname' => $key, + 'value' => $db->encodeBlob( $this->serialize( $newValue ) ), + 'exptime' => $row->exptime + ], __METHOD__, 'IGNORE' ); + + if ( $db->affectedRows() == 0 ) { + // Race condition. See T30611 + $newValue = null; + } + } catch ( DBError $e ) { + $this->handleWriteError( $e, $db, $serverIndex ); + return null; + } + + return $newValue; + } + + public function merge( $key, callable $callback, $exptime = 0, $attempts = 10, $flags = 0 ) { + $ok = $this->mergeViaCas( $key, $callback, $exptime, $attempts ); + if ( ( $flags & self::WRITE_SYNC ) == self::WRITE_SYNC ) { + $ok = $this->waitForReplication() && $ok; + } + + return $ok; + } + + public function changeTTL( $key, $expiry = 0 ) { + list( $serverIndex, $tableName ) = $this->getTableByKey( $key ); + $db = null; + try { + $db = $this->getDB( $serverIndex ); + $db->update( + $tableName, + [ 'exptime' => $db->timestamp( $this->convertExpiry( $expiry ) ) ], + [ 'keyname' => $key, 'exptime > ' . $db->addQuotes( $db->timestamp( time() ) ) ], + __METHOD__ + ); + if ( $db->affectedRows() == 0 ) { + return false; + } + } catch ( DBError $e ) { + $this->handleWriteError( $e, $db, $serverIndex ); + return false; + } + + return true; + } + + /** + * @param IDatabase $db + * @param string $exptime + * @return bool + */ + protected function isExpired( $db, $exptime ) { + return $exptime != $this->getMaxDateTime( $db ) && wfTimestamp( TS_UNIX, $exptime ) < time(); + } + + /** + * @param IDatabase $db + * @return string + */ + protected function getMaxDateTime( $db ) { + if ( time() > 0x7fffffff ) { + return $db->timestamp( 1 << 62 ); + } else { + return $db->timestamp( 0x7fffffff ); + } + } + + protected function garbageCollect() { + if ( !$this->purgePeriod || $this->replicaOnly ) { + // Disabled + return; + } + // Only purge on one in every $this->purgePeriod requests. + if ( $this->purgePeriod !== 1 && mt_rand( 0, $this->purgePeriod - 1 ) ) { + return; + } + $now = time(); + // Avoid repeating the delete within a few seconds + if ( $now > ( $this->lastExpireAll + 1 ) ) { + $this->lastExpireAll = $now; + $this->expireAll(); + } + } + + public function expireAll() { + $this->deleteObjectsExpiringBefore( wfTimestampNow() ); + } + + /** + * Delete objects from the database which expire before a certain date. + * @param string $timestamp + * @param bool|callable $progressCallback + * @return bool + */ + public function deleteObjectsExpiringBefore( $timestamp, $progressCallback = false ) { + for ( $serverIndex = 0; $serverIndex < $this->numServers; $serverIndex++ ) { + $db = null; + try { + $db = $this->getDB( $serverIndex ); + $dbTimestamp = $db->timestamp( $timestamp ); + $totalSeconds = false; + $baseConds = [ 'exptime < ' . $db->addQuotes( $dbTimestamp ) ]; + for ( $i = 0; $i < $this->shards; $i++ ) { + $maxExpTime = false; + while ( true ) { + $conds = $baseConds; + if ( $maxExpTime !== false ) { + $conds[] = 'exptime >= ' . $db->addQuotes( $maxExpTime ); + } + $rows = $db->select( + $this->getTableNameByShard( $i ), + [ 'keyname', 'exptime' ], + $conds, + __METHOD__, + [ 'LIMIT' => 100, 'ORDER BY' => 'exptime' ] ); + if ( $rows === false || !$rows->numRows() ) { + break; + } + $keys = []; + $row = $rows->current(); + $minExpTime = $row->exptime; + if ( $totalSeconds === false ) { + $totalSeconds = wfTimestamp( TS_UNIX, $timestamp ) + - wfTimestamp( TS_UNIX, $minExpTime ); + } + foreach ( $rows as $row ) { + $keys[] = $row->keyname; + $maxExpTime = $row->exptime; + } + + $db->delete( + $this->getTableNameByShard( $i ), + [ + 'exptime >= ' . $db->addQuotes( $minExpTime ), + 'exptime < ' . $db->addQuotes( $dbTimestamp ), + 'keyname' => $keys + ], + __METHOD__ ); + + if ( $progressCallback ) { + if ( intval( $totalSeconds ) === 0 ) { + $percent = 0; + } else { + $remainingSeconds = wfTimestamp( TS_UNIX, $timestamp ) + - wfTimestamp( TS_UNIX, $maxExpTime ); + if ( $remainingSeconds > $totalSeconds ) { + $totalSeconds = $remainingSeconds; + } + $processedSeconds = $totalSeconds - $remainingSeconds; + $percent = ( $i + $processedSeconds / $totalSeconds ) + / $this->shards * 100; + } + $percent = ( $percent / $this->numServers ) + + ( $serverIndex / $this->numServers * 100 ); + call_user_func( $progressCallback, $percent ); + } + } + } + } catch ( DBError $e ) { + $this->handleWriteError( $e, $db, $serverIndex ); + return false; + } + } + return true; + } + + /** + * Delete content of shard tables in every server. + * Return true if the operation is successful, false otherwise. + * @return bool + */ + public function deleteAll() { + for ( $serverIndex = 0; $serverIndex < $this->numServers; $serverIndex++ ) { + $db = null; + try { + $db = $this->getDB( $serverIndex ); + for ( $i = 0; $i < $this->shards; $i++ ) { + $db->delete( $this->getTableNameByShard( $i ), '*', __METHOD__ ); + } + } catch ( DBError $e ) { + $this->handleWriteError( $e, $db, $serverIndex ); + return false; + } + } + return true; + } + + /** + * Serialize an object and, if possible, compress the representation. + * On typical message and page data, this can provide a 3X decrease + * in storage requirements. + * + * @param mixed &$data + * @return string + */ + protected function serialize( &$data ) { + $serial = serialize( $data ); + + if ( function_exists( 'gzdeflate' ) ) { + return gzdeflate( $serial ); + } else { + return $serial; + } + } + + /** + * Unserialize and, if necessary, decompress an object. + * @param string $serial + * @return mixed + */ + protected function unserialize( $serial ) { + if ( function_exists( 'gzinflate' ) ) { + Wikimedia\suppressWarnings(); + $decomp = gzinflate( $serial ); + Wikimedia\restoreWarnings(); + + if ( false !== $decomp ) { + $serial = $decomp; + } + } + + $ret = unserialize( $serial ); + + return $ret; + } + + /** + * Handle a DBError which occurred during a read operation. + * + * @param DBError $exception + * @param int $serverIndex + */ + protected function handleReadError( DBError $exception, $serverIndex ) { + if ( $exception instanceof DBConnectionError ) { + $this->markServerDown( $exception, $serverIndex ); + } + $this->logger->error( "DBError: {$exception->getMessage()}" ); + if ( $exception instanceof DBConnectionError ) { + $this->setLastError( BagOStuff::ERR_UNREACHABLE ); + $this->logger->debug( __METHOD__ . ": ignoring connection error" ); + } else { + $this->setLastError( BagOStuff::ERR_UNEXPECTED ); + $this->logger->debug( __METHOD__ . ": ignoring query error" ); + } + } + + /** + * Handle a DBQueryError which occurred during a write operation. + * + * @param DBError $exception + * @param IDatabase|null $db DB handle or null if connection failed + * @param int $serverIndex + * @throws Exception + */ + protected function handleWriteError( DBError $exception, IDatabase $db = null, $serverIndex ) { + if ( !$db ) { + $this->markServerDown( $exception, $serverIndex ); + } elseif ( $db->wasReadOnlyError() ) { + if ( $db->trxLevel() && $this->usesMainDB() ) { + // Errors like deadlocks and connection drops already cause rollback. + // For consistency, we have no choice but to throw an error and trigger + // complete rollback if the main DB is also being used as the cache DB. + throw $exception; + } + } + + $this->logger->error( "DBError: {$exception->getMessage()}" ); + if ( $exception instanceof DBConnectionError ) { + $this->setLastError( BagOStuff::ERR_UNREACHABLE ); + $this->logger->debug( __METHOD__ . ": ignoring connection error" ); + } else { + $this->setLastError( BagOStuff::ERR_UNEXPECTED ); + $this->logger->debug( __METHOD__ . ": ignoring query error" ); + } + } + + /** + * Mark a server down due to a DBConnectionError exception + * + * @param DBError $exception + * @param int $serverIndex + */ + protected function markServerDown( DBError $exception, $serverIndex ) { + unset( $this->conns[$serverIndex] ); // bug T103435 + + if ( isset( $this->connFailureTimes[$serverIndex] ) ) { + if ( time() - $this->connFailureTimes[$serverIndex] >= 60 ) { + unset( $this->connFailureTimes[$serverIndex] ); + unset( $this->connFailureErrors[$serverIndex] ); + } else { + $this->logger->debug( __METHOD__ . ": Server #$serverIndex already down" ); + return; + } + } + $now = time(); + $this->logger->info( __METHOD__ . ": Server #$serverIndex down until " . ( $now + 60 ) ); + $this->connFailureTimes[$serverIndex] = $now; + $this->connFailureErrors[$serverIndex] = $exception; + } + + /** + * Create shard tables. For use from eval.php. + */ + public function createTables() { + for ( $serverIndex = 0; $serverIndex < $this->numServers; $serverIndex++ ) { + $db = $this->getDB( $serverIndex ); + if ( $db->getType() !== 'mysql' ) { + throw new MWException( __METHOD__ . ' is not supported on this DB server' ); + } + + for ( $i = 0; $i < $this->shards; $i++ ) { + $db->query( + 'CREATE TABLE ' . $db->tableName( $this->getTableNameByShard( $i ) ) . + ' LIKE ' . $db->tableName( 'objectcache' ), + __METHOD__ ); + } + } + } + + /** + * @return bool Whether the main DB is used, e.g. wfGetDB( DB_MASTER ) + */ + protected function usesMainDB() { + return !$this->serverInfos; + } + + protected function waitForReplication() { + if ( !$this->usesMainDB() ) { + // Custom DB server list; probably doesn't use replication + return true; + } + + $lb = MediaWikiServices::getInstance()->getDBLoadBalancer(); + if ( $lb->getServerCount() <= 1 ) { + return true; // no replica DBs + } + + // Main LB is used; wait for any replica DBs to catch up + $masterPos = $lb->getMasterPos(); + if ( !$masterPos ) { + return true; // not applicable + } + + $loop = new WaitConditionLoop( + function () use ( $lb, $masterPos ) { + return $lb->waitForAll( $masterPos, 1 ); + }, + $this->syncTimeout, + $this->busyCallbacks + ); + + return ( $loop->invoke() === $loop::CONDITION_REACHED ); + } +} |