summaryrefslogtreecommitdiff
path: root/www/wiki/includes/libs/eventrelayer/EventRelayerKafka.php
diff options
context:
space:
mode:
Diffstat (limited to 'www/wiki/includes/libs/eventrelayer/EventRelayerKafka.php')
-rw-r--r--www/wiki/includes/libs/eventrelayer/EventRelayerKafka.php62
1 files changed, 62 insertions, 0 deletions
diff --git a/www/wiki/includes/libs/eventrelayer/EventRelayerKafka.php b/www/wiki/includes/libs/eventrelayer/EventRelayerKafka.php
new file mode 100644
index 00000000..999eb439
--- /dev/null
+++ b/www/wiki/includes/libs/eventrelayer/EventRelayerKafka.php
@@ -0,0 +1,62 @@
+<?php
+use Kafka\Produce;
+
+/**
+ * Event relayer for Apache Kafka.
+ * Configuring for WANCache:
+ * 'relayerConfig' => [ 'class' => 'EventRelayerKafka', 'KafkaEventHost' => 'localhost:9092' ],
+ */
+class EventRelayerKafka extends EventRelayer {
+ /**
+ * Configuration.
+ *
+ * @var Config
+ */
+ protected $config;
+
+ /**
+ * Kafka producer.
+ *
+ * @var Produce
+ */
+ protected $producer;
+
+ /**
+ * Create Kafka producer.
+ *
+ * @param array $params
+ */
+ public function __construct( array $params ) {
+ parent::__construct( $params );
+
+ $this->config = new HashConfig( $params );
+ if ( !$this->config->has( 'KafkaEventHost' ) ) {
+ throw new InvalidArgumentException( "KafkaEventHost must be configured" );
+ }
+ }
+
+ /**
+ * Get the producer object from kafka-php.
+ * @return Produce
+ */
+ protected function getKafkaProducer() {
+ if ( !$this->producer ) {
+ $this->producer = Produce::getInstance(
+ null, null, $this->config->get( 'KafkaEventHost' ) );
+ }
+ return $this->producer;
+ }
+
+ protected function doNotify( $channel, array $events ) {
+ $jsonEvents = array_map( 'json_encode', $events );
+ try {
+ $producer = $this->getKafkaProducer();
+ $producer->setMessages( $channel, 0, $jsonEvents );
+ $producer->send();
+ } catch ( \Kafka\Exception $e ) {
+ $this->logger->warning( "Sending events failed: $e" );
+ return false;
+ }
+ return true;
+ }
+}