Simplified NotifyNewShortUrlToRabbitMq

This commit is contained in:
Alejandro Celaya 2022-07-24 10:18:19 +02:00
parent 67d91d5fc5
commit 47bfa5fcc0
2 changed files with 17 additions and 36 deletions

View file

@ -40,6 +40,7 @@ return [
EventDispatcher\NotifyVisitToWebHooks::class => ConfigAbstractFactory::class,
EventDispatcher\Mercure\NotifyVisitToMercure::class => ConfigAbstractFactory::class,
EventDispatcher\RabbitMq\NotifyVisitToRabbitMq::class => ConfigAbstractFactory::class,
EventDispatcher\RabbitMq\NotifyNewShortUrlToRabbitMq::class => ConfigAbstractFactory::class,
EventDispatcher\UpdateGeoLiteDb::class => ConfigAbstractFactory::class,
],
@ -50,6 +51,9 @@ return [
EventDispatcher\RabbitMq\NotifyVisitToRabbitMq::class => [
EventDispatcher\CloseDbConnectionEventListenerDelegator::class,
],
EventDispatcher\RabbitMq\NotifyNewShortUrlToRabbitMq::class => [
EventDispatcher\CloseDbConnectionEventListenerDelegator::class,
],
EventDispatcher\NotifyVisitToWebHooks::class => [
EventDispatcher\CloseDbConnectionEventListenerDelegator::class,
],
@ -85,6 +89,13 @@ return [
Visit\Transformer\OrphanVisitDataTransformer::class,
'config.rabbitmq.enabled',
],
EventDispatcher\RabbitMq\NotifyNewShortUrlToRabbitMq::class => [
RabbitMqPublishingHelper::class,
'em',
'Logger_Shlink',
ShortUrl\Transformer\ShortUrlDataTransformer::class,
'config.rabbitmq.enabled',
],
EventDispatcher\UpdateGeoLiteDb::class => [GeolocationDbUpdater::class, 'Logger_Shlink'],
],

View file

@ -5,23 +5,19 @@ declare(strict_types=1);
namespace Shlinkio\Shlink\Core\EventDispatcher\RabbitMq;
use Doctrine\ORM\EntityManagerInterface;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Exchange\AMQPExchangeType;
use PhpAmqpLib\Message\AMQPMessage;
use Psr\Log\LoggerInterface;
use Shlinkio\Shlink\Common\RabbitMq\RabbitMqPublishingHelperInterface;
use Shlinkio\Shlink\Common\Rest\DataTransformerInterface;
use Shlinkio\Shlink\Core\Entity\ShortUrl;
use Shlinkio\Shlink\Core\EventDispatcher\Event\ShortUrlCreated;
use Throwable;
use function Shlinkio\Shlink\Common\json_encode;
class NotifyNewShortUrlToRabbitMq
{
private const NEW_SHORT_URL_QUEUE = 'https://shlink.io/new-short-url';
public function __construct(
private readonly AMQPStreamConnection $connection,
private readonly RabbitMqPublishingHelperInterface $rabbitMqHelper,
private readonly EntityManagerInterface $em,
private readonly LoggerInterface $logger,
private readonly DataTransformerInterface $shortUrlTransformer,
@ -46,39 +42,13 @@ class NotifyNewShortUrlToRabbitMq
return;
}
if (! $this->connection->isConnected()) {
$this->connection->reconnect();
}
$queue = self::NEW_SHORT_URL_QUEUE;
$message = $this->shortUrlToMessage($shortUrl);
try {
$channel = $this->connection->channel();
// Declare an exchange and a queue that will persist server restarts
$exchange = $queue; // We use the same name for the exchange and the queue
$channel->exchange_declare($exchange, AMQPExchangeType::DIRECT, false, true, false);
$channel->queue_declare($queue, false, true, false, false);
// Bind the exchange and the queue together, and publish the message
$channel->queue_bind($queue, $exchange);
$channel->basic_publish($message, $exchange);
$channel->close();
$this->rabbitMqHelper->publishPayloadInQueue(
$this->shortUrlTransformer->transform($shortUrl),
self::NEW_SHORT_URL_QUEUE,
);
} catch (Throwable $e) {
$this->logger->debug('Error while trying to notify RabbitMQ with new short URL. {e}', ['e' => $e]);
} finally {
$this->connection->close();
}
}
private function shortUrlToMessage(ShortUrl $shortUrl): AMQPMessage
{
$messageBody = json_encode($this->shortUrlTransformer->transform($shortUrl));
return new AMQPMessage($messageBody, [
'content_type' => 'application/json',
'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
]);
}
}