Implemented redis pub/sub listeners

This commit is contained in:
Alejandro Celaya 2022-07-26 10:17:50 +02:00
parent eff50ca202
commit db8a816524
6 changed files with 135 additions and 10 deletions

View file

@ -36,7 +36,7 @@
"mezzio/mezzio": "^3.7",
"mezzio/mezzio-fastroute": "^3.3",
"mezzio/mezzio-problem-details": "^1.5",
"mezzio/mezzio-swoole": "^4.0",
"mezzio/mezzio-swoole": "^4.3",
"mlocati/ip-lib": "^1.17",
"ocramius/proxy-manager": "^2.11",
"pagerfanta/core": "^3.5",

View file

@ -6,17 +6,22 @@ use Shlinkio\Shlink\Core\Config\EnvVars;
return (static function (): array {
$redisServers = EnvVars::REDIS_SERVERS->loadFromEnv();
$pubSub = [
'redis' => [
'pub_sub_enabled' => $redisServers !== null && EnvVars::REDIS_PUB_SUB_ENABLED->loadFromEnv(false),
],
];
return match ($redisServers) {
null => [],
null => $pubSub,
default => [
'cache' => [
'redis' => [
'servers' => $redisServers,
'sentinel_service' => EnvVars::REDIS_SENTINEL_SERVICE->loadFromEnv(),
'pub_sub_enabled' => (bool) EnvVars::REDIS_PUB_SUB_ENABLED->loadFromEnv(false),
],
],
...$pubSub,
],
};
})();

View file

@ -7,6 +7,7 @@ namespace Shlinkio\Shlink\Core;
use Laminas\ServiceManager\AbstractFactory\ConfigAbstractFactory;
use Psr\EventDispatcher\EventDispatcherInterface;
use Shlinkio\Shlink\CLI\Util\GeolocationDbUpdater;
use Shlinkio\Shlink\Common\Cache\RedisPublishingHelper;
use Shlinkio\Shlink\Common\RabbitMq\RabbitMqPublishingHelper;
use Shlinkio\Shlink\IpGeolocation\GeoLite2\DbUpdater;
use Shlinkio\Shlink\IpGeolocation\Resolver\IpLocationResolverInterface;
@ -117,8 +118,21 @@ return [
ShortUrl\Transformer\ShortUrlDataTransformer::class,
Options\RabbitMqOptions::class,
],
EventDispatcher\RedisPubSub\NotifyVisitToRedis::class => [],
EventDispatcher\RedisPubSub\NotifyNewShortUrlToRedis::class => [],
EventDispatcher\RedisPubSub\NotifyVisitToRedis::class => [
RedisPublishingHelper::class,
'em',
'Logger_Shlink',
Visit\Transformer\OrphanVisitDataTransformer::class,
ShortUrl\Transformer\ShortUrlDataTransformer::class,
'config.redis.pub_sub_enabled',
],
EventDispatcher\RedisPubSub\NotifyNewShortUrlToRedis::class => [
RedisPublishingHelper::class,
'em',
'Logger_Shlink',
ShortUrl\Transformer\ShortUrlDataTransformer::class,
'config.redis.pub_sub_enabled',
],
EventDispatcher\UpdateGeoLiteDb::class => [GeolocationDbUpdater::class, 'Logger_Shlink'],
],

View file

@ -14,6 +14,8 @@ use Shlinkio\Shlink\Core\EventDispatcher\Topic;
use Shlinkio\Shlink\Core\Options\RabbitMqOptions;
use Throwable;
use function Functional\each;
class NotifyVisitToRabbitMq
{
public function __construct(
@ -46,9 +48,7 @@ class NotifyVisitToRabbitMq
$payload = $this->visitToPayload($visit);
try {
foreach ($queues as $queue) {
$this->rabbitMqHelper->publishPayloadInQueue($payload, $queue);
}
each($queues, fn (string $queue) => $this->rabbitMqHelper->publishPayloadInQueue($payload, $queue));
} catch (Throwable $e) {
$this->logger->debug('Error while trying to notify RabbitMQ with new visit. {e}', ['e' => $e]);
}

View file

@ -4,12 +4,50 @@ declare(strict_types=1);
namespace Shlinkio\Shlink\Core\EventDispatcher\RedisPubSub;
use Doctrine\ORM\EntityManagerInterface;
use Psr\Log\LoggerInterface;
use Shlinkio\Shlink\Common\Cache\RedisPublishingHelperInterface;
use Shlinkio\Shlink\Common\Rest\DataTransformerInterface;
use Shlinkio\Shlink\Core\Entity\ShortUrl;
use Shlinkio\Shlink\Core\EventDispatcher\Event\ShortUrlCreated;
use Shlinkio\Shlink\Core\EventDispatcher\Topic;
use Throwable;
class NotifyNewShortUrlToRedis
{
public function __construct(
private readonly RedisPublishingHelperInterface $redisHelper,
private readonly EntityManagerInterface $em,
private readonly LoggerInterface $logger,
private readonly DataTransformerInterface $shortUrlTransformer,
private readonly bool $enabled,
) {
}
public function __invoke(ShortUrlCreated $shortUrlCreated): void
{
// TODO: Implement __invoke() method.
if (! $this->enabled) {
return;
}
$shortUrlId = $shortUrlCreated->shortUrlId;
$shortUrl = $this->em->find(ShortUrl::class, $shortUrlId);
if ($shortUrl === null) {
$this->logger->warning(
'Tried to notify Redis pub/sub for new short URL with id "{shortUrlId}", but it does not exist.',
['shortUrlId' => $shortUrlId],
);
return;
}
try {
$this->redisHelper->publishPayloadInQueue(
['shortUrl' => $this->shortUrlTransformer->transform($shortUrl)],
Topic::NEW_SHORT_URL->value,
);
} catch (Throwable $e) {
$this->logger->debug('Error while trying to notify Redis pub/sub with new short URL. {e}', ['e' => $e]);
}
}
}

View file

@ -4,12 +4,80 @@ declare(strict_types=1);
namespace Shlinkio\Shlink\Core\EventDispatcher\RedisPubSub;
use Doctrine\ORM\EntityManagerInterface;
use Psr\Log\LoggerInterface;
use Shlinkio\Shlink\Common\Cache\RedisPublishingHelperInterface;
use Shlinkio\Shlink\Common\Rest\DataTransformerInterface;
use Shlinkio\Shlink\Core\Entity\Visit;
use Shlinkio\Shlink\Core\EventDispatcher\Event\VisitLocated;
use Shlinkio\Shlink\Core\EventDispatcher\Topic;
use Throwable;
use function Functional\each;
class NotifyVisitToRedis
{
public function __construct(
private readonly RedisPublishingHelperInterface $redisHelper,
private readonly EntityManagerInterface $em,
private readonly LoggerInterface $logger,
private readonly DataTransformerInterface $orphanVisitTransformer,
private readonly DataTransformerInterface $shortUrlTransformer,
private readonly bool $enabled,
) {
}
public function __invoke(VisitLocated $visitLocated): void
{
// TODO: Implement __invoke() method.
if (! $this->enabled) {
return;
}
$visitId = $visitLocated->visitId;
$visit = $this->em->find(Visit::class, $visitId);
if ($visit === null) {
$this->logger->warning(
'Tried to notify Redis pub/sub for visit with id "{visitId}", but it does not exist.',
['visitId' => $visitId],
);
return;
}
$queues = $this->determineQueuesToPublishTo($visit);
$payload = $this->visitToPayload($visit);
try {
each($queues, fn (string $queue) => $this->redisHelper->publishPayloadInQueue($payload, $queue));
} catch (Throwable $e) {
$this->logger->debug('Error while trying to notify Redis pub/sub with new visit. {e}', ['e' => $e]);
}
}
/**
* @return string[]
*/
private function determineQueuesToPublishTo(Visit $visit): array
{
if ($visit->isOrphan()) {
return [Topic::NEW_ORPHAN_VISIT->value];
}
return [
Topic::NEW_VISIT->value,
Topic::newShortUrlVisit($visit->getShortUrl()?->getShortCode()),
];
}
private function visitToPayload(Visit $visit): array
{
if ($visit->isOrphan()) {
return ['visit' => $this->orphanVisitTransformer->transform($visit)];
}
return [
'visit' => $visit->jsonSerialize(),
'shortUrl' => $this->shortUrlTransformer->transform($visit->getShortUrl()),
];
}
}