src/AppBundle/Worker/WorkerScheduler.php line 109

Open in your IDE?
  1. <?php
  2. namespace AppBundle\Worker;
  3. use AppBundle\Integration\Aws\Sqs;
  4. use Doctrine\ORM\EntityManagerInterface;
  5. use Symfony\Component\Console\ConsoleEvents;
  6. use Symfony\Component\DependencyInjection\Container;
  7. use Symfony\Component\EventDispatcher\EventSubscriberInterface;
  8. use Symfony\Component\HttpKernel\KernelEvents;
  9. /**
  10.  * Provide ability to schedule workers to run at a later time. In development with no
  11.  *   background worker environment, execute the worker immediately.
  12.  *
  13.  */
  14. class WorkerScheduler implements EventSubscriberInterface
  15. {
  16.     public static function getSubscribedEvents()
  17.     {
  18.         return array(
  19.             ConsoleEvents::TERMINATE => 'onTermination',
  20.             KernelEvents::TERMINATE => 'onTermination'
  21.         );
  22.     }
  23.     /** @var Sqs */
  24.     protected $sqs;
  25.     /** @var WorkerManager */
  26.     protected $manager;
  27.     /** @var EntityManagerInterface */
  28.     protected $em;
  29.     /** @var Container */
  30.     protected $container;
  31.     /** @var bool */
  32.     protected $immediateExecution true;
  33.     /** @var array */
  34.     protected $pending = array();
  35.     /**
  36.      * @param Sqs $sqs
  37.      * @param WorkerManager $manager
  38.      * @param EntityManagerInterface $em
  39.      */
  40.     public function __construct(Sqs $sqsWorkerManager $managerEntityManagerInterface $emContainer $container)
  41.     {
  42.         $this->sqs $sqs;
  43.         $this->manager $manager;
  44.         $this->em $em;
  45.         $this->container $container;
  46.     }
  47.     /**
  48.      * @param bool $value
  49.      */
  50.     public function allowImmediateExecution($value)
  51.     {
  52.         $this->immediateExecution = (bool) $value;
  53.     }
  54.     /**
  55.      * @param string $jobCode
  56.      * @param array $data
  57.      */
  58.     public function schedule($jobCode$data = array())
  59.     {
  60.         
  61.         if ($this->sqs->hasQueueUrl()) {
  62.             
  63.             $this->sqs->send(array_merge(
  64.                 $data,
  65.                 array('worker' => $jobCode)
  66.             ));
  67.         }
  68.         else {
  69.             
  70.             
  71.             $this->immediate($jobCode$data);
  72.         }
  73.     }
  74.     /**
  75.      * @param string $jobCode
  76.      * @param array $data
  77.      */
  78.     public function scheduleBatch($jobCode$data = array())
  79.     {
  80.         if ($this->sqs->hasQueueUrl()) {
  81.             $this->sqs->sendBatch(array_merge(
  82.                 $data,
  83.                 array('worker' => $jobCode)
  84.             ));
  85.         }
  86.         else {
  87.             $this->immediate($jobCode$data);
  88.         }
  89.     }
  90.     public function flush()
  91.     {
  92.         $this->sqs->flushBatch();
  93.     }
  94.     public function onTermination()
  95.     {
  96.         // Execution post-termination might cause new pending tasks to be added, so keep going until there are no pending tasks
  97.         while (!empty($this->pending)) {
  98.             $jobs $this->pending;
  99.             $this->pending = array();
  100.             foreach ($jobs as $info) {
  101.                 $this->run($info['code'], $info['data']);
  102.             }
  103.         }
  104.     }
  105.     /**
  106.      * It might not be possible to execute immediately in dev (triggered by
  107.      *   Doctrine event and unable to flush, or already in a job), so save for kernel.terminate
  108.      *
  109.      * @param string $jobCode
  110.      * @param array $data
  111.      */
  112.     protected function immediate($jobCode$data)
  113.     {
  114.         
  115.         if ($this->immediateExecution && !$this->manager->isExecuting()) {
  116.             return $this->run($jobCode$data);
  117.         }
  118.         
  119.         $this->pending[] = array('code' => $jobCode'data' => $data);
  120.     }
  121.     /**
  122.      * @param string $jobCode
  123.      * @param array $data
  124.      */
  125.     protected function run($jobCode$data)
  126.     {
  127.         $worker $this->container->get("workers.$jobCode");
  128.         // Some calling code still specifically designates entities
  129.         if (!empty($data['entities'])) {
  130.             $entities $data['entities'];
  131.             unset($data['entities']);
  132.             $data array_merge($data$entities);
  133.         }
  134.         foreach ($data as $key => $val) {
  135.             if (is_array($val) && !empty($val['class']) && !empty($val['id']) && count(array_keys($val)) === 2) {
  136.                 // If called from legacy code, load entities
  137.                 $val $this->em->find($val['class'], $val['id']);
  138.             }
  139.             $setter 'set' ucfirst($key);
  140.             if (!method_exists($worker$setter)) {
  141.                 throw new \RuntimeException("Unable to set '$key' on worker '$jobCode'");
  142.             }
  143.             
  144.             $worker->{$setter}($val);
  145.         }
  146.         try {
  147.             
  148.             $this->manager->execute($worker);
  149.         } catch (\Exception $e) {
  150.             // The exception is already logged on the job, and immediate execution only happens in the dev environment
  151.         }
  152.     }
  153. }