Created task running system based on event listener which are transparently cast into tasks

This commit is contained in:
Alejandro Celaya 2019-07-18 19:07:07 +02:00
parent 0dfadcbb4a
commit bccc177414
15 changed files with 238 additions and 127 deletions

View file

@ -30,7 +30,6 @@
"mikehaertl/phpwkhtmltopdf": "^2.2",
"monolog/monolog": "^1.21",
"phly/phly-event-dispatcher": "^1.0",
"phly/phly-swoole-taskworker": "^1.1",
"shlinkio/shlink-installer": "^1.1",
"symfony/console": "^4.2",
"symfony/filesystem": "^4.2",

View file

@ -0,0 +1,20 @@
<?php
declare(strict_types=1);
namespace Shlinkio\Shlink;
use Phly\EventDispatcher as Phly;
use Psr\EventDispatcher as Psr;
return [
'dependencies' => [
'factories' => [
Psr\ListenerProviderInterface::class => Common\EventDispatcher\ListenerProviderFactory::class,
],
'aliases' => [
Psr\EventDispatcherInterface::class => Phly\EventDispatcher::class,
],
],
];

View file

@ -1,26 +0,0 @@
<?php
declare(strict_types=1);
namespace Shlinkio\Shlink;
use Phly\EventDispatcher as Phly;
use Psr\EventDispatcher as Psr;
use Zend\ServiceManager\AbstractFactory\ConfigAbstractFactory;
return [
'dependencies' => [
'factories' => [
Common\EventDispatcher\SwooleEventDispatcher::class => ConfigAbstractFactory::class,
Psr\ListenerProviderInterface::class => Common\EventDispatcher\ListenerProviderFactory::class,
],
'aliases' => [
Psr\EventDispatcherInterface::class => Common\EventDispatcher\SwooleEventDispatcher::class,
],
],
ConfigAbstractFactory::class => [
Common\EventDispatcher\SwooleEventDispatcher::class => [Phly\EventDispatcher::class],
],
];

View file

@ -5,10 +5,8 @@ namespace Shlinkio\Shlink;
use Acelaya\ExpressiveErrorHandler;
use Phly\EventDispatcher;
use Phly\Swoole\TaskWorker;
use Zend\ConfigAggregator;
use Zend\Expressive;
use function Shlinkio\Shlink\Common\env;
return (new ConfigAggregator\ConfigAggregator([
@ -19,7 +17,6 @@ return (new ConfigAggregator\ConfigAggregator([
Expressive\Swoole\ConfigProvider::class,
ExpressiveErrorHandler\ConfigProvider::class,
EventDispatcher\ConfigProvider::class,
TaskWorker\ConfigProvider::class,
Common\ConfigProvider::class,
Core\ConfigProvider::class,
CLI\ConfigProvider::class,

View file

@ -0,0 +1,21 @@
<?php
declare(strict_types=1);
namespace Shlinkio\Shlink\Common;
use Swoole\Http\Server as HttpServer;
return [
'dependencies' => [
'factories' => [
EventDispatcher\TaskRunner::class => EventDispatcher\TaskRunnerFactory::class,
],
'delegators' => [
HttpServer::class => [
EventDispatcher\TaskRunnerDelegator::class,
],
],
],
];

View file

@ -3,6 +3,8 @@ declare(strict_types=1);
namespace Shlinkio\Shlink\Common;
use Swoole\Http\Server as HttpServer;
use const JSON_ERROR_NONE;
use function getenv;
@ -59,3 +61,8 @@ function json_decode(string $json, int $depth = 512, int $options = 0): array
return $data;
}
function asyncListener(HttpServer $server, string $regularListenerName): EventDispatcher\AsyncEventListener
{
return new EventDispatcher\AsyncEventListener($server, $regularListenerName);
}

View file

@ -0,0 +1,25 @@
<?php
declare(strict_types=1);
namespace Shlinkio\Shlink\Common\EventDispatcher;
use Swoole\Http\Server as HttpServer;
class AsyncEventListener
{
/** @var string */
private $regularListenerName;
/** @var HttpServer */
private $server;
public function __construct(HttpServer $server, string $regularListenerName)
{
$this->regularListenerName = $regularListenerName;
$this->server = $server;
}
public function __invoke(object $event): void
{
$this->server->task(new Task($this->regularListenerName, $event));
}
}

View file

@ -5,9 +5,11 @@ namespace Shlinkio\Shlink\Common\EventDispatcher;
use Interop\Container\ContainerInterface;
use Phly\EventDispatcher\ListenerProvider\AttachableListenerProvider;
use Swoole\Http\Server as HttpServer;
use Zend\ServiceManager\Factory\FactoryInterface;
use function Phly\EventDispatcher\lazyListener;
use function Shlinkio\Shlink\Common\asyncListener;
class ListenerProviderFactory implements FactoryInterface
{
@ -17,12 +19,31 @@ class ListenerProviderFactory implements FactoryInterface
$events = $config['events'] ?? [];
$provider = new AttachableListenerProvider();
foreach ($events as $eventName => $listeners) {
foreach ($listeners as $listenerName) {
$provider->listen($eventName, lazyListener($container, $listenerName));
}
}
$this->registerListeners($events['regular'] ?? [], $container, $provider);
$this->registerListeners($events['async'] ?? [], $container, $provider, true);
return $provider;
}
private function registerListeners(
array $events,
ContainerInterface $container,
AttachableListenerProvider $provider,
bool $isAsync = false
): void {
// Avoid registering async event listeners when the swoole server is not registered
if ($isAsync && ! $container->has(HttpServer::class)) {
return;
}
foreach ($events as $eventName => $listeners) {
foreach ($listeners as $listenerName) {
$eventListener = $isAsync
? asyncListener($container->get(HttpServer::class), $listenerName)
: lazyListener($container, $listenerName);
$provider->listen($eventName, $eventListener);
}
}
}
}

View file

@ -1,43 +0,0 @@
<?php
declare(strict_types=1);
namespace Shlinkio\Shlink\Common\EventDispatcher;
use Psr\EventDispatcher\EventDispatcherInterface;
use const PHP_SAPI;
use function extension_loaded;
class SwooleEventDispatcher implements EventDispatcherInterface
{
/** @var bool */
private $isSwoole;
/** @var EventDispatcherInterface */
private $innerDispatcher;
public function __construct(EventDispatcherInterface $innerDispatcher, ?bool $isSwoole = null)
{
$this->innerDispatcher = $innerDispatcher;
$this->isSwoole = $isSwoole ?? PHP_SAPI === 'cli' && extension_loaded('swoole');
}
/**
* Provide all relevant listeners with an event to process.
*
* @param object $event
* The object to process.
*
* @return object
* The Event that was passed, now modified by listeners.
*/
public function dispatch(object $event)
{
// Do not really dispatch the event if the app is not being run with swoole
if (! $this->isSwoole) {
return $event;
}
return $this->innerDispatcher->dispatch($event);
}
}

View file

@ -0,0 +1,33 @@
<?php
declare(strict_types=1);
namespace Shlinkio\Shlink\Common\EventDispatcher;
use Psr\Container\ContainerInterface;
use function get_class;
use function sprintf;
class Task
{
/** @var string */
private $regularListenerName;
/** @var object */
private $event;
public function __construct(string $regularListenerName, object $event)
{
$this->regularListenerName = $regularListenerName;
$this->event = $event;
}
public function __invoke(ContainerInterface $container)
{
($container->get($this->regularListenerName))($this->event);
}
public function toString(): string
{
return sprintf('Listener -> "%s", Event -> "%s"', $this->regularListenerName, get_class($this->event));
}
}

View file

@ -0,0 +1,55 @@
<?php
declare(strict_types=1);
namespace Shlinkio\Shlink\Common\EventDispatcher;
use Psr\Container\ContainerInterface;
use Psr\Log\LoggerInterface;
use Swoole\Http\Server as HttpServer;
use Throwable;
use function get_class;
use function gettype;
use function is_object;
class TaskRunner
{
/** @var LoggerInterface */
private $logger;
/** @var ContainerInterface */
private $container;
public function __construct(LoggerInterface $logger, ContainerInterface $container)
{
$this->logger = $logger;
$this->container = $container;
}
public function __invoke(HttpServer $server, int $taskId, int $fromId, $task): void
{
if (! $task instanceof Task) {
$this->logger->error('Invalid task provided to task worker: {type}', [
'type' => is_object($task) ? get_class($task) : gettype($task),
]);
$server->finish('');
return;
}
$this->logger->notice('Starting work on task {taskId}: {task}', [
'taskId' => $taskId,
'task' => $task->toString(),
]);
try {
$task($this->container);
} catch (Throwable $e) {
$this->logger->error('Error processing task {taskId}: {e}', [
'taskId' => $taskId,
'e' => $e,
]);
} finally {
// Notify the server that processing of the task has finished:
$server->finish('');
}
}
}

View file

@ -0,0 +1,29 @@
<?php
declare(strict_types=1);
namespace Shlinkio\Shlink\Common\EventDispatcher;
use Interop\Container\ContainerInterface;
use Psr\Log\LoggerInterface;
use Swoole\Http\Server as HttpServer;
use Zend\ServiceManager\Factory\DelegatorFactoryInterface;
class TaskRunnerDelegator implements DelegatorFactoryInterface
{
public function __invoke(
ContainerInterface $container,
$name,
callable $callback,
array $options = null
): HttpServer {
$server = $callback();
$logger = $container->get(LoggerInterface::class);
$server->on('task', $container->get(TaskRunner::class));
$server->on('finish', function (HttpServer $server, int $taskId) use ($logger) {
$logger->notice('Task #{taskId} has finished processing', ['taskId' => $taskId]);
});
return $server;
}
}

View file

@ -0,0 +1,17 @@
<?php
declare(strict_types=1);
namespace Shlinkio\Shlink\Common\EventDispatcher;
use Interop\Container\ContainerInterface;
use Psr\Log\LoggerInterface;
use Zend\ServiceManager\Factory\FactoryInterface;
class TaskRunnerFactory implements FactoryInterface
{
public function __invoke(ContainerInterface $container, $requestedName, array $options = null): TaskRunner
{
$logger = $container->get(LoggerInterface::class);
return new TaskRunner($logger, $container);
}
}

View file

@ -1,41 +0,0 @@
<?php
declare(strict_types=1);
namespace ShlinkioTest\Shlink\Common\EventDispatcher;
use PHPUnit\Framework\TestCase;
use Prophecy\Prophecy\ObjectProphecy;
use Psr\EventDispatcher\EventDispatcherInterface;
use Shlinkio\Shlink\Common\EventDispatcher\SwooleEventDispatcher;
use stdClass;
class SwooleEventDispatcherTest extends TestCase
{
/** @var ObjectProphecy */
private $innerDispatcher;
public function setUp(): void
{
$this->innerDispatcher = $this->prophesize(EventDispatcherInterface::class);
}
/**
* @test
* @dataProvider provideIsSwoole
*/
public function callsInnerDispatcherOnlyWhenInSwooleContext(bool $isSwoole, int $expectedCalls): void
{
$dispatcher = new SwooleEventDispatcher($this->innerDispatcher->reveal(), $isSwoole);
$event = new stdClass();
$dispatcher->dispatch($event);
$this->innerDispatcher->dispatch($event)->shouldHaveBeenCalledTimes($expectedCalls);
}
public function provideIsSwoole(): iterable
{
yield 'with swoole' => [true, 1];
yield 'without swoole' => [false, 0];
}
}

View file

@ -3,15 +3,17 @@ declare(strict_types=1);
namespace Shlinkio\Shlink\Core;
use Phly\Swoole\TaskWorker\DeferredListenerDelegator;
use Shlinkio\Shlink\Common\IpGeolocation\IpLocationResolverInterface;
use Zend\ServiceManager\AbstractFactory\ConfigAbstractFactory;
return [
'events' => [
EventDispatcher\ShortUrlVisited::class => [
EventDispatcher\LocateShortUrlVisit::class,
'regular' => [],
'async' => [
EventDispatcher\ShortUrlVisited::class => [
EventDispatcher\LocateShortUrlVisit::class,
],
],
],
@ -19,11 +21,6 @@ return [
'factories' => [
EventDispatcher\LocateShortUrlVisit::class => ConfigAbstractFactory::class,
],
'delegators' => [
EventDispatcher\LocateShortUrlVisit::class => [
DeferredListenerDelegator::class,
],
],
],
ConfigAbstractFactory::class => [