Integrated PublishUpdatesGenerator in NotifyVisitToRedis listener

This commit is contained in:
Alejandro Celaya 2022-07-27 16:55:19 +02:00
parent fa5ebb1677
commit dada6aa3d1
2 changed files with 10 additions and 28 deletions

View file

@ -120,10 +120,9 @@ return [
],
EventDispatcher\RedisPubSub\NotifyVisitToRedis::class => [
RedisPublishingHelper::class,
EventDispatcher\PublishingUpdatesGenerator::class,
'em',
'Logger_Shlink',
Visit\Transformer\OrphanVisitDataTransformer::class,
ShortUrl\Transformer\ShortUrlDataTransformer::class,
'config.redis.pub_sub_enabled',
],
EventDispatcher\RedisPubSub\NotifyNewShortUrlToRedis::class => [

View file

@ -6,12 +6,11 @@ namespace Shlinkio\Shlink\Core\EventDispatcher\RedisPubSub;
use Doctrine\ORM\EntityManagerInterface;
use Psr\Log\LoggerInterface;
use Shlinkio\Shlink\Common\Rest\DataTransformerInterface;
use Shlinkio\Shlink\Common\UpdatePublishing\PublishingHelperInterface;
use Shlinkio\Shlink\Common\UpdatePublishing\Update;
use Shlinkio\Shlink\Core\Entity\Visit;
use Shlinkio\Shlink\Core\EventDispatcher\Event\VisitLocated;
use Shlinkio\Shlink\Core\EventDispatcher\Topic;
use Shlinkio\Shlink\Core\EventDispatcher\PublishingUpdatesGeneratorInterface;
use Throwable;
use function Functional\each;
@ -20,10 +19,9 @@ class NotifyVisitToRedis
{
public function __construct(
private readonly PublishingHelperInterface $redisHelper,
private readonly PublishingUpdatesGeneratorInterface $updatesGenerator,
private readonly EntityManagerInterface $em,
private readonly LoggerInterface $logger,
private readonly DataTransformerInterface $orphanVisitTransformer,
private readonly DataTransformerInterface $shortUrlTransformer,
private readonly bool $enabled,
) {
}
@ -45,42 +43,27 @@ class NotifyVisitToRedis
return;
}
$queues = $this->determineQueuesToPublishTo($visit);
$payload = $this->visitToPayload($visit);
$updates = $this->determineUpdatesForVisit($visit);
try {
each($queues, fn (string $queue) => $this->redisHelper->publishUpdate(
Update::forTopicAndPayload($queue, $payload),
));
each($updates, fn (Update $update) => $this->redisHelper->publishUpdate($update));
} catch (Throwable $e) {
$this->logger->debug('Error while trying to notify Redis pub/sub with new visit. {e}', ['e' => $e]);
}
}
/**
* @return string[]
* @return Update[]
*/
private function determineQueuesToPublishTo(Visit $visit): array
private function determineUpdatesForVisit(Visit $visit): array
{
if ($visit->isOrphan()) {
return [Topic::NEW_ORPHAN_VISIT->value];
return [$this->updatesGenerator->newOrphanVisitUpdate($visit)];
}
return [
Topic::NEW_VISIT->value,
Topic::newShortUrlVisit($visit->getShortUrl()?->getShortCode()),
];
}
private function visitToPayload(Visit $visit): array
{
if ($visit->isOrphan()) {
return ['visit' => $this->orphanVisitTransformer->transform($visit)];
}
return [
'visit' => $visit->jsonSerialize(),
'shortUrl' => $this->shortUrlTransformer->transform($visit->getShortUrl()),
$this->updatesGenerator->newShortUrlVisitUpdate($visit),
$this->updatesGenerator->newVisitUpdate($visit),
];
}
}