From 47bfa5fcc02439ce17334e36214cbd0767835176 Mon Sep 17 00:00:00 2001 From: Alejandro Celaya Date: Sun, 24 Jul 2022 10:18:19 +0200 Subject: [PATCH] Simplified NotifyNewShortUrlToRabbitMq --- .../Core/config/event_dispatcher.config.php | 11 +++++ .../RabbitMq/NotifyNewShortUrlToRabbitMq.php | 42 +++---------------- 2 files changed, 17 insertions(+), 36 deletions(-) diff --git a/module/Core/config/event_dispatcher.config.php b/module/Core/config/event_dispatcher.config.php index be4872e2..806d3104 100644 --- a/module/Core/config/event_dispatcher.config.php +++ b/module/Core/config/event_dispatcher.config.php @@ -40,6 +40,7 @@ return [ EventDispatcher\NotifyVisitToWebHooks::class => ConfigAbstractFactory::class, EventDispatcher\Mercure\NotifyVisitToMercure::class => ConfigAbstractFactory::class, EventDispatcher\RabbitMq\NotifyVisitToRabbitMq::class => ConfigAbstractFactory::class, + EventDispatcher\RabbitMq\NotifyNewShortUrlToRabbitMq::class => ConfigAbstractFactory::class, EventDispatcher\UpdateGeoLiteDb::class => ConfigAbstractFactory::class, ], @@ -50,6 +51,9 @@ return [ EventDispatcher\RabbitMq\NotifyVisitToRabbitMq::class => [ EventDispatcher\CloseDbConnectionEventListenerDelegator::class, ], + EventDispatcher\RabbitMq\NotifyNewShortUrlToRabbitMq::class => [ + EventDispatcher\CloseDbConnectionEventListenerDelegator::class, + ], EventDispatcher\NotifyVisitToWebHooks::class => [ EventDispatcher\CloseDbConnectionEventListenerDelegator::class, ], @@ -85,6 +89,13 @@ return [ Visit\Transformer\OrphanVisitDataTransformer::class, 'config.rabbitmq.enabled', ], + EventDispatcher\RabbitMq\NotifyNewShortUrlToRabbitMq::class => [ + RabbitMqPublishingHelper::class, + 'em', + 'Logger_Shlink', + ShortUrl\Transformer\ShortUrlDataTransformer::class, + 'config.rabbitmq.enabled', + ], EventDispatcher\UpdateGeoLiteDb::class => [GeolocationDbUpdater::class, 'Logger_Shlink'], ], diff --git a/module/Core/src/EventDispatcher/RabbitMq/NotifyNewShortUrlToRabbitMq.php b/module/Core/src/EventDispatcher/RabbitMq/NotifyNewShortUrlToRabbitMq.php index ef38c356..8e66dedb 100644 --- a/module/Core/src/EventDispatcher/RabbitMq/NotifyNewShortUrlToRabbitMq.php +++ b/module/Core/src/EventDispatcher/RabbitMq/NotifyNewShortUrlToRabbitMq.php @@ -5,23 +5,19 @@ declare(strict_types=1); namespace Shlinkio\Shlink\Core\EventDispatcher\RabbitMq; use Doctrine\ORM\EntityManagerInterface; -use PhpAmqpLib\Connection\AMQPStreamConnection; -use PhpAmqpLib\Exchange\AMQPExchangeType; -use PhpAmqpLib\Message\AMQPMessage; use Psr\Log\LoggerInterface; +use Shlinkio\Shlink\Common\RabbitMq\RabbitMqPublishingHelperInterface; use Shlinkio\Shlink\Common\Rest\DataTransformerInterface; use Shlinkio\Shlink\Core\Entity\ShortUrl; use Shlinkio\Shlink\Core\EventDispatcher\Event\ShortUrlCreated; use Throwable; -use function Shlinkio\Shlink\Common\json_encode; - class NotifyNewShortUrlToRabbitMq { private const NEW_SHORT_URL_QUEUE = 'https://shlink.io/new-short-url'; public function __construct( - private readonly AMQPStreamConnection $connection, + private readonly RabbitMqPublishingHelperInterface $rabbitMqHelper, private readonly EntityManagerInterface $em, private readonly LoggerInterface $logger, private readonly DataTransformerInterface $shortUrlTransformer, @@ -46,39 +42,13 @@ class NotifyNewShortUrlToRabbitMq return; } - if (! $this->connection->isConnected()) { - $this->connection->reconnect(); - } - - $queue = self::NEW_SHORT_URL_QUEUE; - $message = $this->shortUrlToMessage($shortUrl); - try { - $channel = $this->connection->channel(); - - // Declare an exchange and a queue that will persist server restarts - $exchange = $queue; // We use the same name for the exchange and the queue - $channel->exchange_declare($exchange, AMQPExchangeType::DIRECT, false, true, false); - $channel->queue_declare($queue, false, true, false, false); - - // Bind the exchange and the queue together, and publish the message - $channel->queue_bind($queue, $exchange); - $channel->basic_publish($message, $exchange); - - $channel->close(); + $this->rabbitMqHelper->publishPayloadInQueue( + $this->shortUrlTransformer->transform($shortUrl), + self::NEW_SHORT_URL_QUEUE, + ); } catch (Throwable $e) { $this->logger->debug('Error while trying to notify RabbitMQ with new short URL. {e}', ['e' => $e]); - } finally { - $this->connection->close(); } } - - private function shortUrlToMessage(ShortUrl $shortUrl): AMQPMessage - { - $messageBody = json_encode($this->shortUrlTransformer->transform($shortUrl)); - return new AMQPMessage($messageBody, [ - 'content_type' => 'application/json', - 'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT, - ]); - } }