Created event listener to send visits to a RabbitMQ instance

This commit is contained in:
Alejandro Celaya 2021-12-11 21:04:16 +01:00
parent bd3bb67949
commit 966620f840
7 changed files with 126 additions and 17 deletions

View file

@ -48,7 +48,7 @@
"pugx/shortid-php": "^1.0", "pugx/shortid-php": "^1.0",
"ramsey/uuid": "^4.2", "ramsey/uuid": "^4.2",
"rlanvin/php-ip": "dev-master#6b3a785 as 3.0", "rlanvin/php-ip": "dev-master#6b3a785 as 3.0",
"shlinkio/shlink-common": "dev-main#c2e3442 as 4.2", "shlinkio/shlink-common": "dev-main#e7fdff3 as 4.2",
"shlinkio/shlink-config": "^1.4", "shlinkio/shlink-config": "^1.4",
"shlinkio/shlink-event-dispatcher": "dev-main#3925299 as 2.3", "shlinkio/shlink-event-dispatcher": "dev-main#3925299 as 2.3",
"shlinkio/shlink-importer": "dev-main#d099072 as 2.5", "shlinkio/shlink-importer": "dev-main#d099072 as 2.5",

View file

@ -11,13 +11,12 @@ use function Shlinkio\Shlink\Common\env;
return [ return [
'rabbit' => [ 'rabbit' => [
'enabled' => (bool) env('RABBITMQ_ENABLED', false),
'host' => env('RABBITMQ_HOST'), 'host' => env('RABBITMQ_HOST'),
'port' => env('RABBITMQ_PORT', '5672'), 'port' => env('RABBITMQ_PORT', '5672'),
'user' => env('RABBITMQ_USER'), 'user' => env('RABBITMQ_USER'),
'password' => env('RABBITMQ_PASSWORD'), 'password' => env('RABBITMQ_PASSWORD'),
'vhost' => env('RABBITMQ_VHOST', '/'), 'vhost' => env('RABBITMQ_VHOST', '/'),
'exchange' => env('RABBITMQ_EXCHANGE', 'shlink-exchange'),
'queue' => env('RABBITMQ_QUEUE', 'shlink-queue'),
], ],
'dependencies' => [ 'dependencies' => [

View file

@ -5,6 +5,7 @@ declare(strict_types=1);
return [ return [
'rabbit' => [ 'rabbit' => [
'enabled' => true,
'host' => 'shlink_rabbitmq', 'host' => 'shlink_rabbitmq',
'user' => 'rabbit', 'user' => 'rabbit',
'password' => 'rabbit', 'password' => 'rabbit',

View file

@ -11,7 +11,7 @@
}, },
"defaultContentType": "application/json", "defaultContentType": "application/json",
"channels": { "channels": {
"http://shlink.io/new-visit": { "https://shlink.io/new-visit": {
"subscribe": { "subscribe": {
"summary": "Receive information about any new visit occurring on any short URL.", "summary": "Receive information about any new visit occurring on any short URL.",
"operationId": "newVisit", "operationId": "newVisit",
@ -31,7 +31,7 @@
} }
} }
}, },
"http://shlink.io/new-visit/{shortCode}": { "https://shlink.io/new-visit/{shortCode}": {
"parameters": { "parameters": {
"shortCode": { "shortCode": {
"description": "The short code of the short URL", "description": "The short code of the short URL",
@ -59,7 +59,7 @@
} }
} }
}, },
"http://shlink.io/new-orphan-visit": { "https://shlink.io/new-orphan-visit": {
"subscribe": { "subscribe": {
"summary": "Receive information about any new orphan visit.", "summary": "Receive information about any new orphan visit.",
"operationId": "newOrphanVisit", "operationId": "newOrphanVisit",

View file

@ -5,6 +5,7 @@ declare(strict_types=1);
namespace Shlinkio\Shlink\Core; namespace Shlinkio\Shlink\Core;
use Laminas\ServiceManager\AbstractFactory\ConfigAbstractFactory; use Laminas\ServiceManager\AbstractFactory\ConfigAbstractFactory;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use Psr\EventDispatcher\EventDispatcherInterface; use Psr\EventDispatcher\EventDispatcherInterface;
use Shlinkio\Shlink\CLI\Util\GeolocationDbUpdater; use Shlinkio\Shlink\CLI\Util\GeolocationDbUpdater;
use Shlinkio\Shlink\IpGeolocation\GeoLite2\DbUpdater; use Shlinkio\Shlink\IpGeolocation\GeoLite2\DbUpdater;
@ -22,6 +23,7 @@ return [
'async' => [ 'async' => [
EventDispatcher\Event\VisitLocated::class => [ EventDispatcher\Event\VisitLocated::class => [
EventDispatcher\NotifyVisitToMercure::class, EventDispatcher\NotifyVisitToMercure::class,
EventDispatcher\NotifyVisitToRabbit::class,
EventDispatcher\NotifyVisitToWebHooks::class, EventDispatcher\NotifyVisitToWebHooks::class,
EventDispatcher\UpdateGeoLiteDb::class, EventDispatcher\UpdateGeoLiteDb::class,
], ],
@ -33,6 +35,7 @@ return [
EventDispatcher\LocateVisit::class => ConfigAbstractFactory::class, EventDispatcher\LocateVisit::class => ConfigAbstractFactory::class,
EventDispatcher\NotifyVisitToWebHooks::class => ConfigAbstractFactory::class, EventDispatcher\NotifyVisitToWebHooks::class => ConfigAbstractFactory::class,
EventDispatcher\NotifyVisitToMercure::class => ConfigAbstractFactory::class, EventDispatcher\NotifyVisitToMercure::class => ConfigAbstractFactory::class,
EventDispatcher\NotifyVisitToRabbit::class => ConfigAbstractFactory::class,
EventDispatcher\UpdateGeoLiteDb::class => ConfigAbstractFactory::class, EventDispatcher\UpdateGeoLiteDb::class => ConfigAbstractFactory::class,
], ],
@ -40,6 +43,9 @@ return [
EventDispatcher\NotifyVisitToMercure::class => [ EventDispatcher\NotifyVisitToMercure::class => [
EventDispatcher\CloseDbConnectionEventListenerDelegator::class, EventDispatcher\CloseDbConnectionEventListenerDelegator::class,
], ],
EventDispatcher\NotifyVisitToRabbit::class => [
EventDispatcher\CloseDbConnectionEventListenerDelegator::class,
],
EventDispatcher\NotifyVisitToWebHooks::class => [ EventDispatcher\NotifyVisitToWebHooks::class => [
EventDispatcher\CloseDbConnectionEventListenerDelegator::class, EventDispatcher\CloseDbConnectionEventListenerDelegator::class,
], ],
@ -68,6 +74,13 @@ return [
'em', 'em',
'Logger_Shlink', 'Logger_Shlink',
], ],
EventDispatcher\NotifyVisitToRabbit::class => [
AMQPStreamConnection::class,
'em',
'Logger_Shlink',
Visit\Transformer\OrphanVisitDataTransformer::class,
'config.rabbit.enabled',
],
EventDispatcher\UpdateGeoLiteDb::class => [GeolocationDbUpdater::class, 'Logger_Shlink'], EventDispatcher\UpdateGeoLiteDb::class => [GeolocationDbUpdater::class, 'Logger_Shlink'],
], ],

View file

@ -0,0 +1,103 @@
<?php
declare(strict_types=1);
namespace Shlinkio\Shlink\Core\EventDispatcher;
use Doctrine\ORM\EntityManagerInterface;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Exchange\AMQPExchangeType;
use PhpAmqpLib\Message\AMQPMessage;
use Psr\Log\LoggerInterface;
use Shlinkio\Shlink\Common\Rest\DataTransformerInterface;
use Shlinkio\Shlink\Core\Entity\Visit;
use Shlinkio\Shlink\Core\EventDispatcher\Event\VisitLocated;
use Throwable;
use function Shlinkio\Shlink\Common\json_encode;
use function sprintf;
class NotifyVisitToRabbit
{
private const NEW_VISIT_QUEUE = 'https://shlink.io/new-visit';
private const NEW_ORPHAN_VISIT_QUEUE = 'https://shlink.io/new-orphan-visit';
public function __construct(
private AMQPStreamConnection $connection,
private EntityManagerInterface $em,
private LoggerInterface $logger,
private DataTransformerInterface $orphanVisitTransformer,
private bool $isEnabled,
) {
}
public function __invoke(VisitLocated $shortUrlLocated): void
{
if (! $this->isEnabled) {
return;
}
$visitId = $shortUrlLocated->visitId();
/** @var Visit|null $visit */
$visit = $this->em->find(Visit::class, $visitId);
if ($visit === null) {
$this->logger->warning('Tried to notify RabbitMQ for visit with id "{visitId}", but it does not exist.', [
'visitId' => $visitId,
]);
return;
}
if (! $this->connection->isConnected()) {
$this->connection->reconnect();
}
$queues = $this->determineQueuesToPublishTo($visit);
$message = $this->visitToMessage($visit);
try {
$channel = $this->connection->channel();
foreach ($queues as $queue) {
// Declare an exchange and a queue that will persist server restarts
$exchange = $queue; // We use the same name for the exchange and the queue
$channel->exchange_declare($exchange, AMQPExchangeType::DIRECT, false, true, false);
$channel->queue_declare($queue, false, true, false, false);
// Bind the exchange and the queue together, and publish the message
$channel->queue_bind($queue, $exchange);
$channel->basic_publish($message, $exchange);
}
$channel->close();
} catch (Throwable $e) {
$this->logger->debug('Error while trying to notify RabbitMQ with new visit. {e}', ['e' => $e]);
} finally {
$this->connection->close();
}
}
/**
* @return string[]
*/
private function determineQueuesToPublishTo(Visit $visit): array
{
if ($visit->isOrphan()) {
return [self::NEW_ORPHAN_VISIT_QUEUE];
}
return [
self::NEW_VISIT_QUEUE,
sprintf('%s/%s', self::NEW_VISIT_QUEUE, $visit->getShortUrl()?->getShortCode()),
];
}
private function visitToMessage(Visit $visit): AMQPMessage
{
$messageBody = json_encode(! $visit->isOrphan() ? $visit : $this->orphanVisitTransformer->transform($visit));
return new AMQPMessage($messageBody, [
'content_type' => 'application/json',
'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
]);
}
}

View file

@ -8,11 +8,9 @@ use Shlinkio\Shlink\Common\Rest\DataTransformerInterface;
use Shlinkio\Shlink\Core\Entity\Visit; use Shlinkio\Shlink\Core\Entity\Visit;
use Symfony\Component\Mercure\Update; use Symfony\Component\Mercure\Update;
use function json_encode; use function Shlinkio\Shlink\Common\json_encode;
use function sprintf; use function sprintf;
use const JSON_THROW_ON_ERROR;
final class MercureUpdatesGenerator implements MercureUpdatesGeneratorInterface final class MercureUpdatesGenerator implements MercureUpdatesGeneratorInterface
{ {
private const NEW_VISIT_TOPIC = 'https://shlink.io/new-visit'; private const NEW_VISIT_TOPIC = 'https://shlink.io/new-visit';
@ -26,7 +24,7 @@ final class MercureUpdatesGenerator implements MercureUpdatesGeneratorInterface
public function newVisitUpdate(Visit $visit): Update public function newVisitUpdate(Visit $visit): Update
{ {
return new Update(self::NEW_VISIT_TOPIC, $this->serialize([ return new Update(self::NEW_VISIT_TOPIC, json_encode([
'shortUrl' => $this->shortUrlTransformer->transform($visit->getShortUrl()), 'shortUrl' => $this->shortUrlTransformer->transform($visit->getShortUrl()),
'visit' => $visit, 'visit' => $visit,
])); ]));
@ -34,7 +32,7 @@ final class MercureUpdatesGenerator implements MercureUpdatesGeneratorInterface
public function newOrphanVisitUpdate(Visit $visit): Update public function newOrphanVisitUpdate(Visit $visit): Update
{ {
return new Update(self::NEW_ORPHAN_VISIT_TOPIC, $this->serialize([ return new Update(self::NEW_ORPHAN_VISIT_TOPIC, json_encode([
'visit' => $this->orphanVisitTransformer->transform($visit), 'visit' => $this->orphanVisitTransformer->transform($visit),
])); ]));
} }
@ -44,14 +42,9 @@ final class MercureUpdatesGenerator implements MercureUpdatesGeneratorInterface
$shortUrl = $visit->getShortUrl(); $shortUrl = $visit->getShortUrl();
$topic = sprintf('%s/%s', self::NEW_VISIT_TOPIC, $shortUrl?->getShortCode()); $topic = sprintf('%s/%s', self::NEW_VISIT_TOPIC, $shortUrl?->getShortCode());
return new Update($topic, $this->serialize([ return new Update($topic, json_encode([
'shortUrl' => $this->shortUrlTransformer->transform($shortUrl), 'shortUrl' => $this->shortUrlTransformer->transform($shortUrl),
'visit' => $visit, 'visit' => $visit,
])); ]));
} }
private function serialize(array $data): string
{
return json_encode($data, JSON_THROW_ON_ERROR);
}
} }