From 1ceabf3bc3fee29cc568a21b350486ed3e26e326 Mon Sep 17 00:00:00 2001
From: Alejandro Celaya <alejandro@alejandrocelaya.com>
Date: Sat, 17 Nov 2018 14:10:38 +0100
Subject: [PATCH 1/4] Added locking capabilities to process visits command

---
 composer.json                                 |   1 +
 config/autoload/locks.global.php              |  25 ++++
 data/locks/.gitignore                         |   2 +
 module/CLI/config/dependencies.config.php     |   2 +
 .../Command/Visit/ProcessVisitsCommand.php    | 122 +++++++++++-------
 .../Visit/ProcessVisitsCommandTest.php        |  42 ++++++
 6 files changed, 149 insertions(+), 45 deletions(-)
 create mode 100644 config/autoload/locks.global.php
 create mode 100644 data/locks/.gitignore

diff --git a/composer.json b/composer.json
index 87d7cabc..1f5db2ec 100644
--- a/composer.json
+++ b/composer.json
@@ -32,6 +32,7 @@
         "roave/security-advisories": "dev-master",
         "symfony/console": "^4.1",
         "symfony/filesystem": "^4.1",
+        "symfony/lock": "^4.1",
         "symfony/process": "^4.1",
         "theorchard/monolog-cascade": "^0.4",
         "zendframework/zend-config": "^3.0",
diff --git a/config/autoload/locks.global.php b/config/autoload/locks.global.php
new file mode 100644
index 00000000..7deb06c7
--- /dev/null
+++ b/config/autoload/locks.global.php
@@ -0,0 +1,25 @@
+<?php
+declare(strict_types=1);
+
+use Symfony\Component\Lock;
+use Zend\ServiceManager\AbstractFactory\ConfigAbstractFactory;
+
+return [
+
+    'locks' => [
+        'locks_dir' => __DIR__ . '/../../data/locks',
+    ],
+
+    'dependencies' => [
+        'factories' => [
+            Lock\Store\FlockStore::class => ConfigAbstractFactory::class,
+            Lock\Factory::class => ConfigAbstractFactory::class,
+        ],
+    ],
+
+    ConfigAbstractFactory::class => [
+        Lock\Store\FlockStore::class => ['config.locks.locks_dir'],
+        Lock\Factory::class => [Lock\Store\FlockStore::class],
+    ],
+
+];
diff --git a/data/locks/.gitignore b/data/locks/.gitignore
new file mode 100644
index 00000000..d6b7ef32
--- /dev/null
+++ b/data/locks/.gitignore
@@ -0,0 +1,2 @@
+*
+!.gitignore
diff --git a/module/CLI/config/dependencies.config.php b/module/CLI/config/dependencies.config.php
index 8a31cb02..2781e115 100644
--- a/module/CLI/config/dependencies.config.php
+++ b/module/CLI/config/dependencies.config.php
@@ -9,6 +9,7 @@ use Shlinkio\Shlink\Common\Service\PreviewGenerator;
 use Shlinkio\Shlink\Core\Service;
 use Shlinkio\Shlink\Rest\Service\ApiKeyService;
 use Symfony\Component\Console\Application;
+use Symfony\Component\Lock;
 use Zend\I18n\Translator\Translator;
 use Zend\ServiceManager\AbstractFactory\ConfigAbstractFactory;
 
@@ -68,6 +69,7 @@ return [
         Command\Visit\ProcessVisitsCommand::class => [
             Service\VisitService::class,
             IpLocationResolverInterface::class,
+            Lock\Factory::class,
             'translator',
         ],
         Command\Visit\UpdateDbCommand::class => [DbUpdater::class, 'translator'],
diff --git a/module/CLI/src/Command/Visit/ProcessVisitsCommand.php b/module/CLI/src/Command/Visit/ProcessVisitsCommand.php
index 38dff527..f6a6290b 100644
--- a/module/CLI/src/Command/Visit/ProcessVisitsCommand.php
+++ b/module/CLI/src/Command/Visit/ProcessVisitsCommand.php
@@ -14,12 +14,14 @@ use Symfony\Component\Console\Command\Command;
 use Symfony\Component\Console\Input\InputInterface;
 use Symfony\Component\Console\Output\OutputInterface;
 use Symfony\Component\Console\Style\SymfonyStyle;
+use Symfony\Component\Lock\Factory as Locker;
 use Zend\I18n\Translator\TranslatorInterface;
 use function sprintf;
 
 class ProcessVisitsCommand extends Command
 {
     public const NAME = 'visit:process';
+    private const LOCK_NAME = 'visit_process';
 
     /**
      * @var VisitServiceInterface
@@ -33,69 +35,99 @@ class ProcessVisitsCommand extends Command
      * @var TranslatorInterface
      */
     private $translator;
+    /**
+     * @var Locker
+     */
+    private $locker;
+    /**
+     * @var OutputInterface
+     */
+    private $output;
 
     public function __construct(
         VisitServiceInterface $visitService,
         IpLocationResolverInterface $ipLocationResolver,
+        Locker $locker,
         TranslatorInterface $translator
     ) {
         $this->visitService = $visitService;
         $this->ipLocationResolver = $ipLocationResolver;
         $this->translator = $translator;
+        $this->locker = $locker;
         parent::__construct();
     }
 
     protected function configure(): void
     {
-        $this->setName(self::NAME)
-             ->setDescription(
-                 $this->translator->translate('Processes visits where location is not set yet')
-             );
+        $this
+            ->setName(self::NAME)
+            ->setDescription($this->translator->translate('Processes visits where location is not set yet'));
     }
 
     protected function execute(InputInterface $input, OutputInterface $output): void
     {
-        $this->visitService->locateVisits(function (Visit $visit) use ($output) {
-            if (! $visit->hasRemoteAddr()) {
-                $output->writeln(
-                    sprintf('<comment>%s</comment>', $this->translator->translate('Ignored visit with no IP address')),
-                    OutputInterface::VERBOSITY_VERBOSE
-                );
-                throw new IpCannotBeLocatedException('Ignored visit with no IP address');
-            }
-
-            $ipAddr = $visit->getRemoteAddr();
-            $output->write(sprintf('%s <fg=blue>%s</>', $this->translator->translate('Processing IP'), $ipAddr));
-            if ($ipAddr === IpAddress::LOCALHOST) {
-                $output->writeln(
-                    sprintf(' [<comment>%s</comment>]', $this->translator->translate('Ignored localhost address'))
-                );
-                throw new IpCannotBeLocatedException('Ignored localhost address');
-            }
-
-            try {
-                return $this->ipLocationResolver->resolveIpLocation($ipAddr);
-            } catch (WrongIpException $e) {
-                $output->writeln(
-                    sprintf(
-                        ' [<fg=red>%s</>]',
-                        $this->translator->translate('An error occurred while locating IP. Skipped')
-                    )
-                );
-                if ($output->isVerbose()) {
-                    $this->getApplication()->renderException($e, $output);
-                }
-
-                throw new IpCannotBeLocatedException('An error occurred while locating IP', 0, $e);
-            }
-        }, function (VisitLocation $location) use ($output) {
-            $output->writeln(sprintf(
-                ' [<info>' . $this->translator->translate('Address located at "%s"') . '</info>]',
-                $location->getCountryName()
-            ));
-        });
-
+        $this->output = $output;
         $io = new SymfonyStyle($input, $output);
-        $io->success($this->translator->translate('Finished processing all IPs'));
+
+        $lock = $this->locker->createLock(self::LOCK_NAME);
+        if (! $lock->acquire()) {
+            $io->warning(sprintf(
+                $this->translator->translate('There is already an instance of the "%s" command in execution'),
+                self::NAME
+            ));
+            return;
+        }
+
+        try {
+            $this->visitService->locateVisits(
+                [$this, 'getGeolocationDataForVisit'],
+                function (VisitLocation $location) use ($output) {
+                    $output->writeln(sprintf(
+                        ' [<info>' . $this->translator->translate('Address located at "%s"') . '</info>]',
+                        $location->getCountryName()
+                    ));
+                }
+            );
+
+            $io->success($this->translator->translate('Finished processing all IPs'));
+        } finally {
+            $lock->release();
+        }
+    }
+
+    public function getGeolocationDataForVisit(Visit $visit): array
+    {
+        if (! $visit->hasRemoteAddr()) {
+            $this->output->writeln(sprintf(
+                '<comment>%s</comment>',
+                $this->translator->translate('Ignored visit with no IP address')
+            ), OutputInterface::VERBOSITY_VERBOSE);
+            throw new IpCannotBeLocatedException('Ignored visit with no IP address');
+        }
+
+        $ipAddr = $visit->getRemoteAddr();
+        $this->output->write(sprintf('%s <fg=blue>%s</>', $this->translator->translate('Processing IP'), $ipAddr));
+        if ($ipAddr === IpAddress::LOCALHOST) {
+            $this->output->writeln(
+                sprintf(' [<comment>%s</comment>]', $this->translator->translate('Ignored localhost address'))
+            );
+            throw new IpCannotBeLocatedException('Ignored localhost address');
+        }
+
+        try {
+            return $this->ipLocationResolver->resolveIpLocation($ipAddr);
+        } catch (WrongIpException $e) {
+            $this->output->writeln(
+                sprintf(
+                    ' [<fg=red>%s</>]',
+                    $this->translator->translate('An error occurred while locating IP. Skipped')
+                )
+            );
+            if ($this->output->isVerbose()) {
+                $this->getApplication()->renderException($e, $this->output);
+            }
+
+            throw new IpCannotBeLocatedException('An error occurred while locating IP', 0, $e);
+        }
     }
 }
diff --git a/module/CLI/test/Command/Visit/ProcessVisitsCommandTest.php b/module/CLI/test/Command/Visit/ProcessVisitsCommandTest.php
index efa77752..9c691492 100644
--- a/module/CLI/test/Command/Visit/ProcessVisitsCommandTest.php
+++ b/module/CLI/test/Command/Visit/ProcessVisitsCommandTest.php
@@ -19,9 +19,11 @@ use Shlinkio\Shlink\Core\Service\VisitService;
 use Symfony\Component\Console\Application;
 use Symfony\Component\Console\Output\OutputInterface;
 use Symfony\Component\Console\Tester\CommandTester;
+use Symfony\Component\Lock;
 use Throwable;
 use Zend\I18n\Translator\Translator;
 use function array_shift;
+use function sprintf;
 
 class ProcessVisitsCommandTest extends TestCase
 {
@@ -37,15 +39,31 @@ class ProcessVisitsCommandTest extends TestCase
      * @var ObjectProphecy
      */
     private $ipResolver;
+    /**
+     * @var ObjectProphecy
+     */
+    private $locker;
+    /**
+     * @var ObjectProphecy
+     */
+    private $lock;
 
     public function setUp()
     {
         $this->visitService = $this->prophesize(VisitService::class);
         $this->ipResolver = $this->prophesize(IpApiLocationResolver::class);
 
+        $this->locker = $this->prophesize(Lock\Factory::class);
+        $this->lock = $this->prophesize(Lock\LockInterface::class);
+        $this->lock->acquire()->willReturn(true);
+        $this->lock->release()->will(function () {
+        });
+        $this->locker->createLock(Argument::type('string'))->willReturn($this->lock->reveal());
+
         $command = new ProcessVisitsCommand(
             $this->visitService->reveal(),
             $this->ipResolver->reveal(),
+            $this->locker->reveal(),
             Translator::factory([])
         );
         $app = new Application();
@@ -160,4 +178,28 @@ class ProcessVisitsCommandTest extends TestCase
             $resolveIpLocation->shouldHaveBeenCalledOnce();
         }
     }
+
+    /**
+     * @test
+     */
+    public function noActionIsPerformedIfLockIsAcquired()
+    {
+        $this->lock->acquire()->willReturn(false);
+
+        $locateVisits = $this->visitService->locateVisits(Argument::cetera())->will(function () {
+        });
+        $resolveIpLocation = $this->ipResolver->resolveIpLocation(Argument::any())->willReturn([]);
+
+        $this->commandTester->execute([
+            'command' => 'visit:process',
+        ], ['verbosity' => OutputInterface::VERBOSITY_VERBOSE]);
+        $output = $this->commandTester->getDisplay();
+
+        $this->assertContains(
+            sprintf('There is already an instance of the "%s" command', ProcessVisitsCommand::NAME),
+            $output
+        );
+        $locateVisits->shouldNotHaveBeenCalled();
+        $resolveIpLocation->shouldNotHaveBeenCalled();
+    }
 }

From dd2cffeee98691a76f3af82f993e8888a55a9263 Mon Sep 17 00:00:00 2001
From: Alejandro Celaya <alejandro@alejandrocelaya.com>
Date: Sat, 17 Nov 2018 14:16:45 +0100
Subject: [PATCH 2/4] Reused ProcessVisitsCommand name as the lock name

---
 module/CLI/src/Command/Visit/ProcessVisitsCommand.php | 3 +--
 1 file changed, 1 insertion(+), 2 deletions(-)

diff --git a/module/CLI/src/Command/Visit/ProcessVisitsCommand.php b/module/CLI/src/Command/Visit/ProcessVisitsCommand.php
index f6a6290b..860e3277 100644
--- a/module/CLI/src/Command/Visit/ProcessVisitsCommand.php
+++ b/module/CLI/src/Command/Visit/ProcessVisitsCommand.php
@@ -21,7 +21,6 @@ use function sprintf;
 class ProcessVisitsCommand extends Command
 {
     public const NAME = 'visit:process';
-    private const LOCK_NAME = 'visit_process';
 
     /**
      * @var VisitServiceInterface
@@ -69,7 +68,7 @@ class ProcessVisitsCommand extends Command
         $this->output = $output;
         $io = new SymfonyStyle($input, $output);
 
-        $lock = $this->locker->createLock(self::LOCK_NAME);
+        $lock = $this->locker->createLock(self::NAME);
         if (! $lock->acquire()) {
             $io->warning(sprintf(
                 $this->translator->translate('There is already an instance of the "%s" command in execution'),

From 71ea0bcb5ec381412970fe6a12faffedbbeb281b Mon Sep 17 00:00:00 2001
From: Alejandro Celaya <alejandro@alejandrocelaya.com>
Date: Sat, 17 Nov 2018 14:24:38 +0100
Subject: [PATCH 3/4] Updated changelog with locking capabilities

---
 CHANGELOG.md | 1 +
 1 file changed, 1 insertion(+)

diff --git a/CHANGELOG.md b/CHANGELOG.md
index 355d89b7..5de19a2e 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -25,6 +25,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
 #### Fixed
 
 * [#271](https://github.com/shlinkio/shlink/issues/271) Fixed memory leak produced when processing high amounts of visits at the same time.
+* [#272](https://github.com/shlinkio/shlink/issues/272) Fixed errors produced when trying to process visits multiple times in parallel, by using a lock which ensures only one instance is run at a time.
 
 
 ## 1.14.0 - 2018-11-16

From ed3883b52c67753fa0628ee769a8e7da8eb7d522 Mon Sep 17 00:00:00 2001
From: Alejandro Celaya <alejandro@alejandrocelaya.com>
Date: Sat, 17 Nov 2018 14:29:54 +0100
Subject: [PATCH 4/4] Updated translations

---
 module/CLI/lang/es.mo | Bin 10654 -> 10803 bytes
 module/CLI/lang/es.po |  22 +++++++++++++---------
 2 files changed, 13 insertions(+), 9 deletions(-)

diff --git a/module/CLI/lang/es.mo b/module/CLI/lang/es.mo
index 8bffeb7b9dfd512f45807ecdd8bd621aad638c5f..9be4edfd8c95d20ee11d7d590c11f76e0d7b1977 100644
GIT binary patch
delta 2249
zcmY+_Z%kEn9LMqRjel1_<4;lv>hY4qEfEFus+9aO3^AdCNXpXF<sjFtmj*bat?o*T
zN_x;1YipG}u-1ImF0}Qe>0EO)o0-*v+Q_!1YcAH4bk%CTKj$7F^xM9?&hMOae}BKf
z!~4CNzVv)q?zB<%5mkyDvm-K}Ka|?ASp`P%R&2#}xEuTNHSEAkxD9I-nMLpbs=YUG
z1)fCx{tIlwpON>|R$XYeh8z2_0pG%MJd0)cBNpHS-j3JsHY_PJ%f%=b<66}3no;ld
z;4PRy2VcNEJdQWx2Utn}HphiVdI2@U1=J7oY4ugCM9ts?cHtS^f=d=>XSxHoQ%|Bg
z&Y*)|;V3R3V>Wk**&rs7MYaoAPXG2D7pw4BRL4aOkJU5>StX02?l&T1vrg1Z`_RQ<
zT#H{Kec7L=_X|qQVpxT1un+6;MeN3pF|DPw((K5?*g`dewfGeBjvYe{<Xv2bA7T@p
z4(iuXGc0AlmSPoZYZ`)jEN~YpH{#fUuavR=O0G}2aTvc2>|}ac(o?AW1x%+0(|8x2
zM|JcUYHLcU$a>UDbfdQNdDQ#UcrTv9P53XiVXT7n*T^Rr1>0&zQ6qd4qd1F~LuT`+
zz5Sfb(^mb2%JvX9AHxoO0B2AG{|z<6CYEa@?#DWu#N{}Ln$X2G7rk6mS7wjZVcbgn
zBcu(xg39&~8LI4EhWx}FR71_d{Q+e0Y%j9?HX1yCDR2fgkk3%<%%k2<|H6g#`Z{XH
z4z1Fsb)k}M2&eHmticl2xesI5j#Jo(=W!?I(2EA#gIcL!d<I`f<<j5yI2JqpK+-nM
z1$|ozwO7ZGckB~X(tVHGqD!dvt|HrKVOnSVZ3QN=5w-L)s8etyc>V{rQZHm*<xXtD
zDIC!GKgUHrH)^W0=e7<XqTY?`aR#*#-=GF`8TpC*j_p`}N4DON>UbikPomyGiJH)C
zaQ|DpoBDNZr+=$noz31Me24nWcmy5RdpDlIN&E+Q>G_>z8}TA4%L~~*4Jd;A)H+Zd
zJdIkhw^8rU242O@)FV;$Umfh>f+K8msH8ZH^kqMwRv^T_gm43)y=JTZV+MZVu|Ke&
z>VI4~Fn&?8s;E7EgtW5v6Dqy*?_coYGgiYNgl@FyPf%KLE1{V%9zV(8e?>=60}Tgv
zm4GVsS^U}RQb$f(b{~;LtWy6<F1FJDK(wNkP8r%ssN`qy&$Et}N(ZryP)0vY)Dc?x
z8>NVg?Zg1_L~v85W)Y#&r94qtuK8;Vl+h|T5!wQkM+og(E)gU45-R<~lm3<2qevR~
z|B!YB_k{97r=x(_MmWSCLWS-3KU&I}4SD=oN!&ve6J5k6LPc5sAfXSDN)xe<XwKw?
z-&@@5je1UE%yE;Wo;x_<xG5)*8XI?0anBhUa>j=}C$f4h;>1S|9&}TKJaoKgz4)Q=
z#7HW0d}(E<Am*%Z+S0H&bGqVO>Ba$<$BD6V&pDKGZ?u$fok1_DCTL+K+nz@|2WTgr
NIFm_brfSPV{{i%z;79-f

delta 2101
zcmZ|PS!_&E9LMqhXtgt?)lI9$GM1vWRnwwssZO=3YG>?9B19}f6c3VX-x3j|kxFc7
zL`bcTCmyOHNTi-Lsf2h^i6!+W;lcNJ-ATMS$$viQE_2U0|MS1Mb?KEQ&WH5)Mx)ge
zgNV#HGY9)7@I&+a&9X3n*%-niEX62p!76OSN=yit4aZv4^S0v<JdOJQEv&$&$o-DR
zC7I=OVm5|x4`$#^OvlHVh#i=Noj3q{FdhSam@KBC{ue~uHy$%_4u)_Y_C^Q$;V}$q
zsf~0r(+8*-cA)<F2~XlL)C!I=N-Z{H8Gc8tG%wk#8tYI4U%(LF!QI$_%+)GV%r;^j
zl4N^;8A|mdI>Yc4YTz$OHr;8GB?~ZJozFn#Vq;J%Ex`>~gN1k-8Oyp*_jluD{Ehio
zl1f%^1J1{D=qR-x>1gJku^fA)F$hMGQ??Z~kpnmok7Eg5_l`SJEBt}|@i%H~(%C;9
zXL}Z)GFFLU+>}oKRa{M+*oXH$$FMx5^cw1XH!i?>mNgddp&tAWwKYAS{Wz&i6r#3r
zE$aRTEX8YBfnBHx2eZgO+i0Vhg=SWac^Jh{KC^?U%xtFOv_*$dRo;xNup8?z!cCg+
zS=7opaVQ2zR{`eXU|fz`NCPgx=MJ4BIyphJ8CZ)N@D%cFYerS=6XaHFM?L7hcm5ZW
zH1koxY_nyeu8;JLpeC{&^_+vK`;Vix*lD7p6}KbLvad+C&CgRBaS)Eehqx5GFoK2T
zy%=}lT)csr@DEg`k{ER-=A&xpGOotEsEH(n+_%oLR65$D0_2*lKo#45)E*s1=45A(
zZL{0Rc3T^6!?&o^M;KMre+qT|BF@AX&u>`HaUuJ>5_e#N-v5_$LY#Pmi?9bLU}VJK
zOzc5T=p=H<&SL~0dB;Cd0|!TX-viYBb*KeJz4Lo<9LG(l|3AkejBoy2vr9M{k6=5l
zz-sb-0<YpcT_0^W2^&z=-GZ9XGvqJUjT#`Cja9~`pze?26g-1^?hE7%wGjL6s2IX@
zw8vGbm2X2IYQJ@?K|%x>bY~9k{n`#ycLEV0`Ve~XIzq)*Ps9<irn=I2dSkSJ<&5vn
zY&-{g9cl>b%PkOaD^x}a6_>KGicp(D=v|*mR1j(;!2O>1QJE+s)Jg~yYa%g=P}3JM
zRr^1kj?$;SQsJmgBvd2|iM~WNp;Rh;v8JNc8>83qe{C8URK%+MSR=V^o2*~$KlNkO
zf~63Pi8;i-Z6*hmg!Xh1v4-F+b3Y`t7;E`-RETT66TL8=(Dx<Q%IQ=QIYcSZe>Oj{
lwv0}gnA@7`znuIqYk)5?912e;D=Kasn*BPhb$DKy?+-Yfu<if=

diff --git a/module/CLI/lang/es.po b/module/CLI/lang/es.po
index a81a3611..4698d723 100644
--- a/module/CLI/lang/es.po
+++ b/module/CLI/lang/es.po
@@ -1,8 +1,8 @@
 msgid ""
 msgstr ""
 "Project-Id-Version: Shlink 1.0\n"
-"POT-Creation-Date: 2018-11-12 21:01+0100\n"
-"PO-Revision-Date: 2018-11-12 21:03+0100\n"
+"POT-Creation-Date: 2018-11-17 14:29+0100\n"
+"PO-Revision-Date: 2018-11-17 14:29+0100\n"
 "Last-Translator: Alejandro Celaya <alejandro@alejandrocelaya.com>\n"
 "Language-Team: \n"
 "Language: es_ES\n"
@@ -340,6 +340,17 @@ msgstr "Una etiqueta con nombre \"%s\" no ha sido encontrada"
 msgid "Processes visits where location is not set yet"
 msgstr "Procesa las visitas donde la localización no ha sido establecida aún"
 
+#, php-format
+msgid "There is already an instance of the \"%s\" command in execution"
+msgstr "Ya existe una instancia del comando \"%s\" en ejecución"
+
+#, php-format
+msgid "Address located at \"%s\""
+msgstr "Dirección localizada en \"%s\""
+
+msgid "Finished processing all IPs"
+msgstr "Finalizado el procesado de todas las IPs"
+
 msgid "Ignored visit with no IP address"
 msgstr "Ignorada visita sin dirección IP"
 
@@ -349,16 +360,9 @@ msgstr "Procesando IP"
 msgid "Ignored localhost address"
 msgstr "Ignorada IP de localhost"
 
-#, php-format
-msgid "Address located at \"%s\""
-msgstr "Dirección localizada en \"%s\""
-
 msgid "An error occurred while locating IP. Skipped"
 msgstr "Se produjo un error al localizar la IP. Ignorado"
 
-msgid "Finished processing all IPs"
-msgstr "Finalizado el procesado de todas las IPs"
-
 msgid "Updates the GeoLite2 database file used to geolocate IP addresses"
 msgstr ""
 "Actualiza el fichero de base de datos de GeoLite2 usado para geolocalizar "