diff --git a/composer.json b/composer.json index a93106c5..ba72716d 100644 --- a/composer.json +++ b/composer.json @@ -30,7 +30,6 @@ "mikehaertl/phpwkhtmltopdf": "^2.2", "monolog/monolog": "^1.21", "phly/phly-event-dispatcher": "^1.0", - "phly/phly-swoole-taskworker": "^1.1", "shlinkio/shlink-installer": "^1.1", "symfony/console": "^4.2", "symfony/filesystem": "^4.2", diff --git a/config/autoload/event_dispatcher.global.php b/config/autoload/event_dispatcher.global.php new file mode 100644 index 00000000..53964152 --- /dev/null +++ b/config/autoload/event_dispatcher.global.php @@ -0,0 +1,20 @@ + [ + 'factories' => [ + Psr\ListenerProviderInterface::class => Common\EventDispatcher\ListenerProviderFactory::class, + ], + 'aliases' => [ + Psr\EventDispatcherInterface::class => Phly\EventDispatcher::class, + ], + ], + +]; diff --git a/config/autoload/events.global.php b/config/autoload/events.global.php deleted file mode 100644 index e2d3561e..00000000 --- a/config/autoload/events.global.php +++ /dev/null @@ -1,26 +0,0 @@ - [ - 'factories' => [ - Common\EventDispatcher\SwooleEventDispatcher::class => ConfigAbstractFactory::class, - Psr\ListenerProviderInterface::class => Common\EventDispatcher\ListenerProviderFactory::class, - ], - 'aliases' => [ - Psr\EventDispatcherInterface::class => Common\EventDispatcher\SwooleEventDispatcher::class, - ], - ], - - ConfigAbstractFactory::class => [ - Common\EventDispatcher\SwooleEventDispatcher::class => [Phly\EventDispatcher::class], - ], - -]; diff --git a/config/config.php b/config/config.php index 48f638af..e10d3c1e 100644 --- a/config/config.php +++ b/config/config.php @@ -5,10 +5,8 @@ namespace Shlinkio\Shlink; use Acelaya\ExpressiveErrorHandler; use Phly\EventDispatcher; -use Phly\Swoole\TaskWorker; use Zend\ConfigAggregator; use Zend\Expressive; - use function Shlinkio\Shlink\Common\env; return (new ConfigAggregator\ConfigAggregator([ @@ -19,7 +17,6 @@ return (new ConfigAggregator\ConfigAggregator([ Expressive\Swoole\ConfigProvider::class, ExpressiveErrorHandler\ConfigProvider::class, EventDispatcher\ConfigProvider::class, - TaskWorker\ConfigProvider::class, Common\ConfigProvider::class, Core\ConfigProvider::class, CLI\ConfigProvider::class, diff --git a/module/Common/config/task_runner.config.php b/module/Common/config/task_runner.config.php new file mode 100644 index 00000000..8f0e688e --- /dev/null +++ b/module/Common/config/task_runner.config.php @@ -0,0 +1,21 @@ + [ + 'factories' => [ + EventDispatcher\TaskRunner::class => EventDispatcher\TaskRunnerFactory::class, + ], + 'delegators' => [ + HttpServer::class => [ + EventDispatcher\TaskRunnerDelegator::class, + ], + ], + ], + +]; diff --git a/module/Common/functions/functions.php b/module/Common/functions/functions.php index 485df607..17856af2 100644 --- a/module/Common/functions/functions.php +++ b/module/Common/functions/functions.php @@ -3,6 +3,8 @@ declare(strict_types=1); namespace Shlinkio\Shlink\Common; +use Swoole\Http\Server as HttpServer; + use const JSON_ERROR_NONE; use function getenv; @@ -59,3 +61,8 @@ function json_decode(string $json, int $depth = 512, int $options = 0): array return $data; } + +function asyncListener(HttpServer $server, string $regularListenerName): EventDispatcher\AsyncEventListener +{ + return new EventDispatcher\AsyncEventListener($server, $regularListenerName); +} diff --git a/module/Common/src/EventDispatcher/AsyncEventListener.php b/module/Common/src/EventDispatcher/AsyncEventListener.php new file mode 100644 index 00000000..0b65d083 --- /dev/null +++ b/module/Common/src/EventDispatcher/AsyncEventListener.php @@ -0,0 +1,25 @@ +regularListenerName = $regularListenerName; + $this->server = $server; + } + + public function __invoke(object $event): void + { + $this->server->task(new Task($this->regularListenerName, $event)); + } +} diff --git a/module/Common/src/EventDispatcher/ListenerProviderFactory.php b/module/Common/src/EventDispatcher/ListenerProviderFactory.php index a69f019a..3d9de084 100644 --- a/module/Common/src/EventDispatcher/ListenerProviderFactory.php +++ b/module/Common/src/EventDispatcher/ListenerProviderFactory.php @@ -5,9 +5,11 @@ namespace Shlinkio\Shlink\Common\EventDispatcher; use Interop\Container\ContainerInterface; use Phly\EventDispatcher\ListenerProvider\AttachableListenerProvider; +use Swoole\Http\Server as HttpServer; use Zend\ServiceManager\Factory\FactoryInterface; use function Phly\EventDispatcher\lazyListener; +use function Shlinkio\Shlink\Common\asyncListener; class ListenerProviderFactory implements FactoryInterface { @@ -17,12 +19,31 @@ class ListenerProviderFactory implements FactoryInterface $events = $config['events'] ?? []; $provider = new AttachableListenerProvider(); - foreach ($events as $eventName => $listeners) { - foreach ($listeners as $listenerName) { - $provider->listen($eventName, lazyListener($container, $listenerName)); - } - } + $this->registerListeners($events['regular'] ?? [], $container, $provider); + $this->registerListeners($events['async'] ?? [], $container, $provider, true); return $provider; } + + private function registerListeners( + array $events, + ContainerInterface $container, + AttachableListenerProvider $provider, + bool $isAsync = false + ): void { + // Avoid registering async event listeners when the swoole server is not registered + if ($isAsync && ! $container->has(HttpServer::class)) { + return; + } + + foreach ($events as $eventName => $listeners) { + foreach ($listeners as $listenerName) { + $eventListener = $isAsync + ? asyncListener($container->get(HttpServer::class), $listenerName) + : lazyListener($container, $listenerName); + + $provider->listen($eventName, $eventListener); + } + } + } } diff --git a/module/Common/src/EventDispatcher/SwooleEventDispatcher.php b/module/Common/src/EventDispatcher/SwooleEventDispatcher.php deleted file mode 100644 index 9fada852..00000000 --- a/module/Common/src/EventDispatcher/SwooleEventDispatcher.php +++ /dev/null @@ -1,43 +0,0 @@ -innerDispatcher = $innerDispatcher; - $this->isSwoole = $isSwoole ?? PHP_SAPI === 'cli' && extension_loaded('swoole'); - } - - /** - * Provide all relevant listeners with an event to process. - * - * @param object $event - * The object to process. - * - * @return object - * The Event that was passed, now modified by listeners. - */ - public function dispatch(object $event) - { - // Do not really dispatch the event if the app is not being run with swoole - if (! $this->isSwoole) { - return $event; - } - - return $this->innerDispatcher->dispatch($event); - } -} diff --git a/module/Common/src/EventDispatcher/Task.php b/module/Common/src/EventDispatcher/Task.php new file mode 100644 index 00000000..f12abb55 --- /dev/null +++ b/module/Common/src/EventDispatcher/Task.php @@ -0,0 +1,33 @@ +regularListenerName = $regularListenerName; + $this->event = $event; + } + + public function __invoke(ContainerInterface $container) + { + ($container->get($this->regularListenerName))($this->event); + } + + public function toString(): string + { + return sprintf('Listener -> "%s", Event -> "%s"', $this->regularListenerName, get_class($this->event)); + } +} diff --git a/module/Common/src/EventDispatcher/TaskRunner.php b/module/Common/src/EventDispatcher/TaskRunner.php new file mode 100644 index 00000000..7329307e --- /dev/null +++ b/module/Common/src/EventDispatcher/TaskRunner.php @@ -0,0 +1,55 @@ +logger = $logger; + $this->container = $container; + } + + public function __invoke(HttpServer $server, int $taskId, int $fromId, $task): void + { + if (! $task instanceof Task) { + $this->logger->error('Invalid task provided to task worker: {type}', [ + 'type' => is_object($task) ? get_class($task) : gettype($task), + ]); + $server->finish(''); + return; + } + + $this->logger->notice('Starting work on task {taskId}: {task}', [ + 'taskId' => $taskId, + 'task' => $task->toString(), + ]); + + try { + $task($this->container); + } catch (Throwable $e) { + $this->logger->error('Error processing task {taskId}: {e}', [ + 'taskId' => $taskId, + 'e' => $e, + ]); + } finally { + // Notify the server that processing of the task has finished: + $server->finish(''); + } + } +} diff --git a/module/Common/src/EventDispatcher/TaskRunnerDelegator.php b/module/Common/src/EventDispatcher/TaskRunnerDelegator.php new file mode 100644 index 00000000..bb7f138c --- /dev/null +++ b/module/Common/src/EventDispatcher/TaskRunnerDelegator.php @@ -0,0 +1,29 @@ +get(LoggerInterface::class); + + $server->on('task', $container->get(TaskRunner::class)); + $server->on('finish', function (HttpServer $server, int $taskId) use ($logger) { + $logger->notice('Task #{taskId} has finished processing', ['taskId' => $taskId]); + }); + + return $server; + } +} diff --git a/module/Common/src/EventDispatcher/TaskRunnerFactory.php b/module/Common/src/EventDispatcher/TaskRunnerFactory.php new file mode 100644 index 00000000..b24eebdb --- /dev/null +++ b/module/Common/src/EventDispatcher/TaskRunnerFactory.php @@ -0,0 +1,17 @@ +get(LoggerInterface::class); + return new TaskRunner($logger, $container); + } +} diff --git a/module/Common/test/EventDispatcher/SwooleEventDispatcherTest.php b/module/Common/test/EventDispatcher/SwooleEventDispatcherTest.php deleted file mode 100644 index 50bc8a26..00000000 --- a/module/Common/test/EventDispatcher/SwooleEventDispatcherTest.php +++ /dev/null @@ -1,41 +0,0 @@ -innerDispatcher = $this->prophesize(EventDispatcherInterface::class); - } - - /** - * @test - * @dataProvider provideIsSwoole - */ - public function callsInnerDispatcherOnlyWhenInSwooleContext(bool $isSwoole, int $expectedCalls): void - { - $dispatcher = new SwooleEventDispatcher($this->innerDispatcher->reveal(), $isSwoole); - $event = new stdClass(); - - $dispatcher->dispatch($event); - - $this->innerDispatcher->dispatch($event)->shouldHaveBeenCalledTimes($expectedCalls); - } - - public function provideIsSwoole(): iterable - { - yield 'with swoole' => [true, 1]; - yield 'without swoole' => [false, 0]; - } -} diff --git a/module/Core/config/event_dispatcher.config.php b/module/Core/config/event_dispatcher.config.php index 406e574f..ab34fe99 100644 --- a/module/Core/config/event_dispatcher.config.php +++ b/module/Core/config/event_dispatcher.config.php @@ -3,15 +3,17 @@ declare(strict_types=1); namespace Shlinkio\Shlink\Core; -use Phly\Swoole\TaskWorker\DeferredListenerDelegator; use Shlinkio\Shlink\Common\IpGeolocation\IpLocationResolverInterface; use Zend\ServiceManager\AbstractFactory\ConfigAbstractFactory; return [ 'events' => [ - EventDispatcher\ShortUrlVisited::class => [ - EventDispatcher\LocateShortUrlVisit::class, + 'regular' => [], + 'async' => [ + EventDispatcher\ShortUrlVisited::class => [ + EventDispatcher\LocateShortUrlVisit::class, + ], ], ], @@ -19,11 +21,6 @@ return [ 'factories' => [ EventDispatcher\LocateShortUrlVisit::class => ConfigAbstractFactory::class, ], - 'delegators' => [ - EventDispatcher\LocateShortUrlVisit::class => [ - DeferredListenerDelegator::class, - ], - ], ], ConfigAbstractFactory::class => [