diff --git a/composer.json b/composer.json index 2cfe4e14..7a0a8542 100644 --- a/composer.json +++ b/composer.json @@ -48,7 +48,7 @@ "pugx/shortid-php": "^1.0", "ramsey/uuid": "^4.2", "rlanvin/php-ip": "dev-master#6b3a785 as 3.0", - "shlinkio/shlink-common": "dev-main#c2e3442 as 4.2", + "shlinkio/shlink-common": "dev-main#e7fdff3 as 4.2", "shlinkio/shlink-config": "^1.4", "shlinkio/shlink-event-dispatcher": "dev-main#3925299 as 2.3", "shlinkio/shlink-importer": "dev-main#d099072 as 2.5", diff --git a/config/autoload/rabbit.global.php b/config/autoload/rabbit.global.php index 113a0048..a17e9887 100644 --- a/config/autoload/rabbit.global.php +++ b/config/autoload/rabbit.global.php @@ -11,13 +11,12 @@ use function Shlinkio\Shlink\Common\env; return [ 'rabbit' => [ + 'enabled' => (bool) env('RABBITMQ_ENABLED', false), 'host' => env('RABBITMQ_HOST'), 'port' => env('RABBITMQ_PORT', '5672'), 'user' => env('RABBITMQ_USER'), 'password' => env('RABBITMQ_PASSWORD'), 'vhost' => env('RABBITMQ_VHOST', '/'), - 'exchange' => env('RABBITMQ_EXCHANGE', 'shlink-exchange'), - 'queue' => env('RABBITMQ_QUEUE', 'shlink-queue'), ], 'dependencies' => [ diff --git a/config/autoload/rabbit.local.php.dist b/config/autoload/rabbit.local.php.dist index 2425a2c5..141b4b8b 100644 --- a/config/autoload/rabbit.local.php.dist +++ b/config/autoload/rabbit.local.php.dist @@ -5,6 +5,7 @@ declare(strict_types=1); return [ 'rabbit' => [ + 'enabled' => true, 'host' => 'shlink_rabbitmq', 'user' => 'rabbit', 'password' => 'rabbit', diff --git a/docs/async-api/async-api.json b/docs/async-api/async-api.json index 0b546377..82da91c5 100644 --- a/docs/async-api/async-api.json +++ b/docs/async-api/async-api.json @@ -11,7 +11,7 @@ }, "defaultContentType": "application/json", "channels": { - "http://shlink.io/new-visit": { + "https://shlink.io/new-visit": { "subscribe": { "summary": "Receive information about any new visit occurring on any short URL.", "operationId": "newVisit", @@ -31,7 +31,7 @@ } } }, - "http://shlink.io/new-visit/{shortCode}": { + "https://shlink.io/new-visit/{shortCode}": { "parameters": { "shortCode": { "description": "The short code of the short URL", @@ -59,7 +59,7 @@ } } }, - "http://shlink.io/new-orphan-visit": { + "https://shlink.io/new-orphan-visit": { "subscribe": { "summary": "Receive information about any new orphan visit.", "operationId": "newOrphanVisit", diff --git a/module/Core/config/event_dispatcher.config.php b/module/Core/config/event_dispatcher.config.php index 5256bc92..a0f09beb 100644 --- a/module/Core/config/event_dispatcher.config.php +++ b/module/Core/config/event_dispatcher.config.php @@ -5,6 +5,7 @@ declare(strict_types=1); namespace Shlinkio\Shlink\Core; use Laminas\ServiceManager\AbstractFactory\ConfigAbstractFactory; +use PhpAmqpLib\Connection\AMQPStreamConnection; use Psr\EventDispatcher\EventDispatcherInterface; use Shlinkio\Shlink\CLI\Util\GeolocationDbUpdater; use Shlinkio\Shlink\IpGeolocation\GeoLite2\DbUpdater; @@ -22,6 +23,7 @@ return [ 'async' => [ EventDispatcher\Event\VisitLocated::class => [ EventDispatcher\NotifyVisitToMercure::class, + EventDispatcher\NotifyVisitToRabbit::class, EventDispatcher\NotifyVisitToWebHooks::class, EventDispatcher\UpdateGeoLiteDb::class, ], @@ -33,6 +35,7 @@ return [ EventDispatcher\LocateVisit::class => ConfigAbstractFactory::class, EventDispatcher\NotifyVisitToWebHooks::class => ConfigAbstractFactory::class, EventDispatcher\NotifyVisitToMercure::class => ConfigAbstractFactory::class, + EventDispatcher\NotifyVisitToRabbit::class => ConfigAbstractFactory::class, EventDispatcher\UpdateGeoLiteDb::class => ConfigAbstractFactory::class, ], @@ -40,6 +43,9 @@ return [ EventDispatcher\NotifyVisitToMercure::class => [ EventDispatcher\CloseDbConnectionEventListenerDelegator::class, ], + EventDispatcher\NotifyVisitToRabbit::class => [ + EventDispatcher\CloseDbConnectionEventListenerDelegator::class, + ], EventDispatcher\NotifyVisitToWebHooks::class => [ EventDispatcher\CloseDbConnectionEventListenerDelegator::class, ], @@ -68,6 +74,13 @@ return [ 'em', 'Logger_Shlink', ], + EventDispatcher\NotifyVisitToRabbit::class => [ + AMQPStreamConnection::class, + 'em', + 'Logger_Shlink', + Visit\Transformer\OrphanVisitDataTransformer::class, + 'config.rabbit.enabled', + ], EventDispatcher\UpdateGeoLiteDb::class => [GeolocationDbUpdater::class, 'Logger_Shlink'], ], diff --git a/module/Core/src/EventDispatcher/NotifyVisitToRabbit.php b/module/Core/src/EventDispatcher/NotifyVisitToRabbit.php new file mode 100644 index 00000000..426b02bb --- /dev/null +++ b/module/Core/src/EventDispatcher/NotifyVisitToRabbit.php @@ -0,0 +1,103 @@ +isEnabled) { + return; + } + + $visitId = $shortUrlLocated->visitId(); + + /** @var Visit|null $visit */ + $visit = $this->em->find(Visit::class, $visitId); + if ($visit === null) { + $this->logger->warning('Tried to notify RabbitMQ for visit with id "{visitId}", but it does not exist.', [ + 'visitId' => $visitId, + ]); + return; + } + + if (! $this->connection->isConnected()) { + $this->connection->reconnect(); + } + + $queues = $this->determineQueuesToPublishTo($visit); + $message = $this->visitToMessage($visit); + + try { + $channel = $this->connection->channel(); + + foreach ($queues as $queue) { + // Declare an exchange and a queue that will persist server restarts + $exchange = $queue; // We use the same name for the exchange and the queue + $channel->exchange_declare($exchange, AMQPExchangeType::DIRECT, false, true, false); + $channel->queue_declare($queue, false, true, false, false); + + // Bind the exchange and the queue together, and publish the message + $channel->queue_bind($queue, $exchange); + $channel->basic_publish($message, $exchange); + } + + $channel->close(); + } catch (Throwable $e) { + $this->logger->debug('Error while trying to notify RabbitMQ with new visit. {e}', ['e' => $e]); + } finally { + $this->connection->close(); + } + } + + /** + * @return string[] + */ + private function determineQueuesToPublishTo(Visit $visit): array + { + if ($visit->isOrphan()) { + return [self::NEW_ORPHAN_VISIT_QUEUE]; + } + + return [ + self::NEW_VISIT_QUEUE, + sprintf('%s/%s', self::NEW_VISIT_QUEUE, $visit->getShortUrl()?->getShortCode()), + ]; + } + + private function visitToMessage(Visit $visit): AMQPMessage + { + $messageBody = json_encode(! $visit->isOrphan() ? $visit : $this->orphanVisitTransformer->transform($visit)); + return new AMQPMessage($messageBody, [ + 'content_type' => 'application/json', + 'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT, + ]); + } +} diff --git a/module/Core/src/Mercure/MercureUpdatesGenerator.php b/module/Core/src/Mercure/MercureUpdatesGenerator.php index cc0f785a..74b85388 100644 --- a/module/Core/src/Mercure/MercureUpdatesGenerator.php +++ b/module/Core/src/Mercure/MercureUpdatesGenerator.php @@ -8,11 +8,9 @@ use Shlinkio\Shlink\Common\Rest\DataTransformerInterface; use Shlinkio\Shlink\Core\Entity\Visit; use Symfony\Component\Mercure\Update; -use function json_encode; +use function Shlinkio\Shlink\Common\json_encode; use function sprintf; -use const JSON_THROW_ON_ERROR; - final class MercureUpdatesGenerator implements MercureUpdatesGeneratorInterface { private const NEW_VISIT_TOPIC = 'https://shlink.io/new-visit'; @@ -26,7 +24,7 @@ final class MercureUpdatesGenerator implements MercureUpdatesGeneratorInterface public function newVisitUpdate(Visit $visit): Update { - return new Update(self::NEW_VISIT_TOPIC, $this->serialize([ + return new Update(self::NEW_VISIT_TOPIC, json_encode([ 'shortUrl' => $this->shortUrlTransformer->transform($visit->getShortUrl()), 'visit' => $visit, ])); @@ -34,7 +32,7 @@ final class MercureUpdatesGenerator implements MercureUpdatesGeneratorInterface public function newOrphanVisitUpdate(Visit $visit): Update { - return new Update(self::NEW_ORPHAN_VISIT_TOPIC, $this->serialize([ + return new Update(self::NEW_ORPHAN_VISIT_TOPIC, json_encode([ 'visit' => $this->orphanVisitTransformer->transform($visit), ])); } @@ -44,14 +42,9 @@ final class MercureUpdatesGenerator implements MercureUpdatesGeneratorInterface $shortUrl = $visit->getShortUrl(); $topic = sprintf('%s/%s', self::NEW_VISIT_TOPIC, $shortUrl?->getShortCode()); - return new Update($topic, $this->serialize([ + return new Update($topic, json_encode([ 'shortUrl' => $this->shortUrlTransformer->transform($shortUrl), 'visit' => $visit, ])); } - - private function serialize(array $data): string - { - return json_encode($data, JSON_THROW_ON_ERROR); - } }