<?php
namespace AppBundle\Worker;
use AppBundle\Integration\Aws\Sqs;
use Doctrine\ORM\EntityManagerInterface;
use Symfony\Component\Console\ConsoleEvents;
use Symfony\Component\DependencyInjection\Container;
use Symfony\Component\EventDispatcher\EventSubscriberInterface;
use Symfony\Component\HttpKernel\KernelEvents;
/**
* Provide ability to schedule workers to run at a later time. In development with no
* background worker environment, execute the worker immediately.
*
*/
class WorkerScheduler implements EventSubscriberInterface
{
public static function getSubscribedEvents()
{
return array(
ConsoleEvents::TERMINATE => 'onTermination',
KernelEvents::TERMINATE => 'onTermination'
);
}
/** @var Sqs */
protected $sqs;
/** @var WorkerManager */
protected $manager;
/** @var EntityManagerInterface */
protected $em;
/** @var Container */
protected $container;
/** @var bool */
protected $immediateExecution = true;
/** @var array */
protected $pending = array();
/**
* @param Sqs $sqs
* @param WorkerManager $manager
* @param EntityManagerInterface $em
*/
public function __construct(Sqs $sqs, WorkerManager $manager, EntityManagerInterface $em, Container $container)
{
$this->sqs = $sqs;
$this->manager = $manager;
$this->em = $em;
$this->container = $container;
}
/**
* @param bool $value
*/
public function allowImmediateExecution($value)
{
$this->immediateExecution = (bool) $value;
}
/**
* @param string $jobCode
* @param array $data
*/
public function schedule($jobCode, $data = array())
{
if ($this->sqs->hasQueueUrl()) {
$this->sqs->send(array_merge(
$data,
array('worker' => $jobCode)
));
}
else {
$this->immediate($jobCode, $data);
}
}
/**
* @param string $jobCode
* @param array $data
*/
public function scheduleBatch($jobCode, $data = array())
{
if ($this->sqs->hasQueueUrl()) {
$this->sqs->sendBatch(array_merge(
$data,
array('worker' => $jobCode)
));
}
else {
$this->immediate($jobCode, $data);
}
}
public function flush()
{
$this->sqs->flushBatch();
}
public function onTermination()
{
// Execution post-termination might cause new pending tasks to be added, so keep going until there are no pending tasks
while (!empty($this->pending)) {
$jobs = $this->pending;
$this->pending = array();
foreach ($jobs as $info) {
$this->run($info['code'], $info['data']);
}
}
}
/**
* It might not be possible to execute immediately in dev (triggered by
* Doctrine event and unable to flush, or already in a job), so save for kernel.terminate
*
* @param string $jobCode
* @param array $data
*/
protected function immediate($jobCode, $data)
{
if ($this->immediateExecution && !$this->manager->isExecuting()) {
return $this->run($jobCode, $data);
}
$this->pending[] = array('code' => $jobCode, 'data' => $data);
}
/**
* @param string $jobCode
* @param array $data
*/
protected function run($jobCode, $data)
{
$worker = $this->container->get("workers.$jobCode");
// Some calling code still specifically designates entities
if (!empty($data['entities'])) {
$entities = $data['entities'];
unset($data['entities']);
$data = array_merge($data, $entities);
}
foreach ($data as $key => $val) {
if (is_array($val) && !empty($val['class']) && !empty($val['id']) && count(array_keys($val)) === 2) {
// If called from legacy code, load entities
$val = $this->em->find($val['class'], $val['id']);
}
$setter = 'set' . ucfirst($key);
if (!method_exists($worker, $setter)) {
throw new \RuntimeException("Unable to set '$key' on worker '$jobCode'");
}
$worker->{$setter}($val);
}
try {
$this->manager->execute($worker);
} catch (\Exception $e) {
// The exception is already logged on the job, and immediate execution only happens in the dev environment
}
}
}