vendor/pimcore/portal-engine/src/EventSubscriber/BatchTaskSubscriber.php line 77

Open in your IDE?
  1. <?php
  2. /**
  3.  * Pimcore
  4.  *
  5.  * This source file is available under following license:
  6.  * - Pimcore Commercial License (PCL)
  7.  *
  8.  *  @copyright  Copyright (c) Pimcore GmbH (http://www.pimcore.org)
  9.  *  @license    http://www.pimcore.org/license     PCL
  10.  */
  11. namespace Pimcore\Bundle\PortalEngineBundle\EventSubscriber;
  12. use Pimcore\Bundle\PortalEngineBundle\Entity\BatchTask;
  13. use Pimcore\Bundle\PortalEngineBundle\Message\BatchTask\Interfaces\BatchTaskMessageInterface;
  14. use Pimcore\Bundle\PortalEngineBundle\Message\BatchTask\Interfaces\SequentialBatchTaskMessageInterface;
  15. use Pimcore\Bundle\PortalEngineBundle\Message\BatchTask\Interfaces\SplittedBatchTaskMessageInterface;
  16. use Pimcore\Bundle\PortalEngineBundle\Message\BatchTask\Interfaces\TriggerFinishedMessageBatchTaskMessageInterface;
  17. use Pimcore\Bundle\PortalEngineBundle\Service\BatchTask\BatchTaskService;
  18. use Symfony\Component\EventDispatcher\EventSubscriberInterface;
  19. use Symfony\Component\HttpKernel\Event\TerminateEvent;
  20. use Symfony\Component\Messenger\Event\WorkerMessageFailedEvent;
  21. use Symfony\Component\Messenger\Event\WorkerMessageHandledEvent;
  22. use Symfony\Component\Messenger\MessageBusInterface;
  23. /**
  24.  * Class IndexUpdateListener
  25.  *
  26.  * @package Pimcore\Bundle\PortalEngineBundle\EventListener
  27.  */
  28. class BatchTaskSubscriber implements EventSubscriberInterface
  29. {
  30.     /**
  31.      * @var BatchTaskService
  32.      */
  33.     protected $batchTaskService;
  34.     /**
  35.      * @var MessageBusInterface
  36.      */
  37.     protected $messageBus;
  38.     /**
  39.      * @var BatchTask
  40.      */
  41.     protected $terminateBatchTask;
  42.     /**
  43.      * @param BatchTaskService $batchTaskService
  44.      * @param MessageBusInterface $messengerBusPortalEngine
  45.      */
  46.     public function __construct(BatchTaskService $batchTaskServiceMessageBusInterface $messengerBusPortalEngine)
  47.     {
  48.         $this->batchTaskService $batchTaskService;
  49.         $this->messageBus $messengerBusPortalEngine;
  50.     }
  51.     /**
  52.      * @return array
  53.      */
  54.     public static function getSubscribedEvents()
  55.     {
  56.         return [
  57.             WorkerMessageFailedEvent::class => 'onBatchTaskMessageFailed',
  58.             WorkerMessageHandledEvent::class => 'onWorkerMessageHandled',
  59.             TerminateEvent::class => 'onTerminate',
  60.         ];
  61.     }
  62.     /**
  63.      * Mark batch tasks with failed items as finished as otherwise they will run forever.
  64.      *
  65.      * @param WorkerMessageFailedEvent $event
  66.      *
  67.      */
  68.     public function onBatchTaskMessageFailed(WorkerMessageFailedEvent $event)
  69.     {
  70.         $message $event->getEnvelope()->getMessage();
  71.         if (!$message instanceof BatchTaskMessageInterface) {
  72.             return;
  73.         }
  74.         if ($event->willRetry()) {
  75.             return;
  76.         }
  77.         if (!$batchTask $this->batchTaskService->getTaskById($message->getTaskId())) {
  78.             return;
  79.         }
  80.         foreach (array_keys($message->getItems()) as $itemIndex) {
  81.             if (!$this->batchTaskService->isItemIndexProcessed($batchTask$itemIndex)) {
  82.                 $this->batchTaskService->markItemIndexAsProcessed($batchTask$itemIndex);
  83.             }
  84.         }
  85.         $this->checkBatchTaskFinished($batchTask$message);
  86.     }
  87.     public function onWorkerMessageHandled(WorkerMessageHandledEvent $event)
  88.     {
  89.         $message $event->getEnvelope()->getMessage();
  90.         if (!$message instanceof BatchTaskMessageInterface) {
  91.             return;
  92.         }
  93.         if ($message instanceof SplittedBatchTaskMessageInterface) {
  94.             return;
  95.         }
  96.         if (!$batchTask $this->batchTaskService->getTaskById($message->getTaskId())) {
  97.             return;
  98.         }
  99.         if ($message instanceof SequentialBatchTaskMessageInterface && $message->hasRemainingItems()) {
  100.             $remainingMessage $message->createRemainingMessage($this->batchTaskService);
  101.             $this->messageBus->dispatch($remainingMessage);
  102.             return;
  103.         }
  104.         $this->checkBatchTaskFinished($batchTask$message);
  105.     }
  106.     public function onTerminate(TerminateEvent $event)
  107.     {
  108.         $this->batchTaskService->terminateBatchTask();
  109.     }
  110.     public function setTerminateBatchTask(BatchTask $batchTask)
  111.     {
  112.         $this->terminateBatchTask $batchTask;
  113.     }
  114.     protected function checkBatchTaskFinished(BatchTask $batchTaskBatchTaskMessageInterface $message)
  115.     {
  116.         $this->batchTaskService->checkBatchTaskFinished($batchTask);
  117.         if ($message instanceof TriggerFinishedMessageBatchTaskMessageInterface) {
  118.             $finishedMessage $message->createFinishedMessage();
  119.             $this->messageBus->dispatch($finishedMessage);
  120.         }
  121.     }
  122. }