From 6806291554ffcb15969b5c6a20bf71b356f5cd0f Mon Sep 17 00:00:00 2001 From: Craig Williams Date: Tue, 24 May 2022 16:37:45 +0100 Subject: [PATCH] Implemented job queue table and the gubbins around it --- composer.json | 3 + src/Bakery/WorkerListCommand.php | 2 +- src/Bakery/WorkerRunCommand.php | 17 +- .../Migrations/v100/QueuedJobsTable.php | 60 +++++ src/Database/Models/QueuedJob.php | 68 ++++++ src/Jobs/JobBase.php | 17 +- src/Jobs/JobInspector.php | 110 ++++++++++ src/Jobs/JobInterface.php | 11 + src/ServicesProvider/ServicesProvider.php | 19 +- src/Worker.php | 4 + src/Worker/Worker.php | 206 ++++++++++++------ 11 files changed, 439 insertions(+), 78 deletions(-) create mode 100644 src/Database/Migrations/v100/QueuedJobsTable.php create mode 100644 src/Database/Models/QueuedJob.php create mode 100644 src/Jobs/JobInspector.php diff --git a/composer.json b/composer.json index f882e9e..ba94377 100644 --- a/composer.json +++ b/composer.json @@ -2,6 +2,9 @@ "name": "avsdev/sprinkle-worker", "type": "userfrosting-worker", "description": "A sprinkle to add a worker thread.", + "require": { + "symfony/lock": "*" + }, "autoload": { "files" : [ "defines.php" diff --git a/src/Bakery/WorkerListCommand.php b/src/Bakery/WorkerListCommand.php index c8643bc..c6cccb5 100644 --- a/src/Bakery/WorkerListCommand.php +++ b/src/Bakery/WorkerListCommand.php @@ -37,7 +37,7 @@ class WorkerListCommand extends BaseCommand protected function execute(InputInterface $input, OutputInterface $output) { $this->io->title('Available Jobs List'); - $jobs = $this->ci->worker->getAvailableJobDefinitions(); + $jobs = $this->ci->jobInspector->getAvailableJobDefinitions(); $this->io->table(['Sprinkle', 'Name', 'Namespace'], $jobs); return self::SUCCESS; diff --git a/src/Bakery/WorkerRunCommand.php b/src/Bakery/WorkerRunCommand.php index 04eb3fc..2d34b22 100644 --- a/src/Bakery/WorkerRunCommand.php +++ b/src/Bakery/WorkerRunCommand.php @@ -9,9 +9,10 @@ namespace UserFrosting\Sprinkle\Worker\Bakery; -use Symfony\Component\Console\Input\InputArgument; +use Symfony\Component\Console\Input\InputOption; use Symfony\Component\Console\Input\InputInterface; use Symfony\Component\Console\Output\OutputInterface; +use UserFrosting\Sprinkle\Worker\Worker\Worker; use UserFrosting\System\Bakery\BaseCommand; /** @@ -29,7 +30,9 @@ class WorkerRunCommand extends BaseCommand { $this->setName('worker:run') ->setDescription('Run a worker loop') - ->setHelp('This command runs a the workers job loop. The worker will not return until SIGTERM is sent.'); + ->setHelp('This command runs a the workers job loop. The worker will not return until SIGTERM is sent.') + ->addOption('debug', null, InputOption::VALUE_NONE, 'Output additional status messages.') + ->addOption('quiet', null, InputOption::VALUE_NONE, 'Suppress output messages.'); } /** @@ -37,7 +40,15 @@ class WorkerRunCommand extends BaseCommand */ protected function execute(InputInterface $input, OutputInterface $output) { - $worker = $this->ci->worker; + $worker = new Worker($this->ci); + + if ($input->getOption('debug')) { + $worker->enableDebug(); + } + + if ($input->getOption('quiet')) { + $worker->setQuiet(); + } $worker->run($this->io); diff --git a/src/Database/Migrations/v100/QueuedJobsTable.php b/src/Database/Migrations/v100/QueuedJobsTable.php new file mode 100644 index 0000000..ff0e809 --- /dev/null +++ b/src/Database/Migrations/v100/QueuedJobsTable.php @@ -0,0 +1,60 @@ +schema->hasTable('queued_jobs')) { + $this->schema->create('queued_jobs', function (Blueprint $table) { + $table->increments('id'); + $table->string('job'); + $table->json('params')->default('[]'); + $table->boolean('flag_started')->default(false); + $table->boolean('flag_completed')->default(false); + $table->boolean('flag_failed')->default(false); + $table->timestamp('started_at')->nullable(); + $table->timestamp('completed_at')->nullable(); + $table->timestamps(); + + $table->engine = 'InnoDB'; + $table->collation = 'utf8_unicode_ci'; + $table->charset = 'utf8'; + }); + } + } + + /** + * {@inheritdoc} + */ + public function down() + { + $this->schema->drop('queued_jobs'); + } +} diff --git a/src/Database/Models/QueuedJob.php b/src/Database/Models/QueuedJob.php new file mode 100644 index 0000000..71607f8 --- /dev/null +++ b/src/Database/Models/QueuedJob.php @@ -0,0 +1,68 @@ +job; + $params = json_decode($this->params, true); + + $job = new $jobClass(); + $job->setParams($params); + return $job; + } +} diff --git a/src/Jobs/JobBase.php b/src/Jobs/JobBase.php index 46deb4f..dfe46f0 100644 --- a/src/Jobs/JobBase.php +++ b/src/Jobs/JobBase.php @@ -21,15 +21,20 @@ abstract class JobBase implements JobInterface /** * @var ContainerInterface */ - protected $ci; + public static $ci; + /** - * Constructor. - * - * @param ContainerInterface $ci + * Queues the current job by adding a reference into the database */ - public function __construct(ContainerInterface $ci) + public function queue() { - $this->ci = $ci; + /** @var \UserFrosting\Sprinkle\Core\Util\ClassMapper $classMapper */ + $classMapper = static::$ci->classMapper; + + $classMapper->createInstance('queued_job', [ + 'job' => get_class($this), + 'params' => json_encode($this->getParams()), + ])->save(); } } diff --git a/src/Jobs/JobInspector.php b/src/Jobs/JobInspector.php new file mode 100644 index 0000000..196db6e --- /dev/null +++ b/src/Jobs/JobInspector.php @@ -0,0 +1,110 @@ +ci = $ci; + } + + /** + * Loop all the available sprinkles and return a list of their jobs. + * + * @return array An array of all the task classes found for every sprinkle + */ + public function getAvailableJobDefinitions() + { + $tasks = $this->ci->locator->listResources($this->scheme, false, false); + + return $this->loadAvailableJobDefinitions($tasks); + } + + /** + * Process jobs Resource into info. + * + * @param array $taskFiles List of jobs files + * + * @return array + */ + protected function loadAvailableJobDefinitions(array $jobFiles) + { + $jobs = []; + + foreach ($jobFiles as $jobFile) { + $job = $this->getJobDefinition($jobFile); + if ($job) { + $jobs[] = $job; + } + } + + return $jobs; + } + + /** + * Return an array of job details including the class name and the sprinkle name. + * + * @param ResourceInstance $file The job file + * + * @return array The details about a job file [name, class, sprinkle] + */ + protected function getJobDefinition(ResourceInstance $file) + { + // Format the sprinkle name for the namespace + $sprinkleName = $file->getLocation()->getName(); + $sprinkleName = Str::studly($sprinkleName); + + // Getting base path, name and class name + $basePath = str_replace($file->getBasename(), '', $file->getBasePath()); + $name = $basePath . $file->getFilename(); + $className = str_replace('/', '\\', $basePath) . $file->getFilename(); + $classPath = "\\UserFrosting\\Sprinkle\\$sprinkleName\\Worker\\Jobs\\$className"; + + // Exclude known helper classes + if ($className == "JobInterface" || $className == "JobBase" || $className == "JobInspector") { + return null; + } + + // Build the class name and namespace + return [ + 'sprinkle' => $sprinkleName, + 'name' => $name, + 'class' => $classPath, + ]; + } +} diff --git a/src/Jobs/JobInterface.php b/src/Jobs/JobInterface.php index f50b0bd..b93c0dc 100644 --- a/src/Jobs/JobInterface.php +++ b/src/Jobs/JobInterface.php @@ -17,6 +17,17 @@ namespace UserFrosting\Sprinkle\Worker\Jobs; */ interface JobInterface { + /** + * Sets the job parameters using an array object + */ + public function setParams(array $params); + + /** + * Returns the job parameters as an array + */ + public function getParams(); + + /** * Function used to specify what the job does. * diff --git a/src/ServicesProvider/ServicesProvider.php b/src/ServicesProvider/ServicesProvider.php index 9df8bc7..d6998a3 100644 --- a/src/ServicesProvider/ServicesProvider.php +++ b/src/ServicesProvider/ServicesProvider.php @@ -31,12 +31,23 @@ class ServicesProvider public function register(ContainerInterface $container) { /* - * Return an instance of the worker + * Extend the 'classMapper' service to register sprunje and model classes. * - * @return \UserFrosting\Sprinkle\Worker\Worker\Worker + * @return \UserFrosting\Sprinkle\Core\Util\ClassMapper */ - $container['worker'] = function ($c) { - return new Worker($c); + $container->extend('classMapper', function ($classMapper, $c) { + $classMapper->setClassMapping('queued_job', 'UserFrosting\Sprinkle\Worker\Database\Models\QueuedJob'); + + return $classMapper; + }); + + /* + * Return an instance of the job inspector + * + * @return \UserFrosting\Sprinkle\Worker\Jobs\JobInspector + */ + $container['jobInspector'] = function ($c) { + return new JobInspector($c); }; } } diff --git a/src/Worker.php b/src/Worker.php index 6e17146..e4440e2 100644 --- a/src/Worker.php +++ b/src/Worker.php @@ -10,6 +10,7 @@ namespace UserFrosting\Sprinkle\Worker; use UserFrosting\System\Sprinkle\Sprinkle; +use UserFrosting\Sprinkle\Worker\Jobs\JobBase; /** * Bootstrapper class for the 'worker' sprinkle. @@ -23,6 +24,9 @@ class Worker extends Sprinkle */ public function onSprinklesInitialized() { + // Set container for jobs + JobBase::$ci = $this->ci; + $this->registerStreams(); } diff --git a/src/Worker/Worker.php b/src/Worker/Worker.php index 2f88bea..ba61075 100644 --- a/src/Worker/Worker.php +++ b/src/Worker/Worker.php @@ -9,8 +9,11 @@ namespace UserFrosting\Sprinkle\Worker\Worker; +use Carbon\Carbon; use Illuminate\Support\Str; use Psr\Container\ContainerInterface; +use Symfony\Component\Lock\LockFactory; +use Symfony\Component\Lock\Store\SemaphoreStore; use UserFrosting\UniformResourceLocator\Resource as ResourceInstance; /** @@ -28,15 +31,42 @@ class Worker protected $ci; /** - * @var string The resource locator scheme + * @var object The currently processing job */ - protected $scheme = 'jobs://'; + protected $queuedJob = null; /** * @var object The currently processing job */ protected $currentJob = null; + /** + * @var object The debug stream if enabled + */ + protected $debug = null; + + /** + * @var boolean Allow or silence stdout output text + */ + protected $quiet = false; + + + /** + * @var object Mutex store + */ + protected $store = null; + + /** + * @var object Mutex locker + */ + protected $locker = null; + + /** + * @var object Lock mutex + */ + protected $lock = null; + + /** * Class Constructor. * @@ -45,103 +75,151 @@ class Worker public function __construct(ContainerInterface $ci) { $this->ci = $ci; + + $this->store = new SemaphoreStore(); + $this->locker = new LockFactory($this->store); + $this->lock = $this->locker->createLock('worker-job-query'); } /** - * Loop all the available sprinkles and return a list of their jobs. - * - * @return array An array of all the task classes found for every sprinkle + * Enables runtime debugging */ - public function getAvailableJobDefinitions() + public function enableDebug() { - $tasks = $this->ci->locator->listResources($this->scheme, false, false); - - return $this->loadAvailableJobDefinitions($tasks); + $this->debug = $this->ci->debugLogger; + } + /** + * Enables runtime debugging + */ + public function disableDebug() + { + $this->debug = null; } /** - * Process jobs Resource into info. - * - * @param array $taskFiles List of jobs files - * - * @return array + * Sets the quiet flag */ - protected function loadAvailableJobDefinitions(array $jobFiles) + public function setQuiet() { - $jobs = []; - - foreach ($jobFiles as $jobFile) { - $job = $this->getJobDefinition($jobFile); - if ($job) { - $jobs[] = $job; - } - } - - return $jobs; + $this->quiet = true; } - /** - * Return an array of job details including the class name and the sprinkle name. - * - * @param ResourceInstance $file The job file - * - * @return array The details about a job file [name, class, sprinkle] + * Clears the quiet flag */ - protected function getJobDefinition(ResourceInstance $file) + public function clearQuiet() { - // Format the sprinkle name for the namespace - $sprinkleName = $file->getLocation()->getName(); - $sprinkleName = Str::studly($sprinkleName); - - // Getting base path, name and class name - $basePath = str_replace($file->getBasename(), '', $file->getBasePath()); - $name = $basePath . $file->getFilename(); - $className = str_replace('/', '\\', $basePath) . $file->getFilename(); - $classPath = "\\UserFrosting\\Sprinkle\\$sprinkleName\\Worker\\Jobs\\$className"; - - if ($className == "JobInterface" || $className == "JobBase") { - return null; - } - - // Build the class name and namespace - return [ - 'sprinkle' => $sprinkleName, - 'name' => $name, - 'class' => $classPath, - ]; + $this->quiet = false; } - /** * Runs the worker loop, procssing queued jobs. */ public function run($io) { + // Hacky way to force the DB service to load before attempting to use the worker + $this->ci['db']; + + if ($this->debug) $this->debug->debug('Inserting signal handlers for SIGINT, SIGQUIT and SIGTERM'); + if (!$this->quiet) $io->writeln(' [INFO] Inserting signal handlers for SIGINT, SIGQUIT and SIGTERM'); + pcntl_async_signals(TRUE); pcntl_signal(SIGINT, array($this, "handleTerm")); pcntl_signal(SIGQUIT, array($this, "handleTerm")); pcntl_signal(SIGTERM, array($this, "handleTerm")); - while(true) { - // TODO: find jobs in the database - try { - // Run the job using call_user_func_array - } catch(Exception $ex) { - $io->error('An exception occurred attempting to run a job'); - var_dump($ex); - } finally { - $this->currentJob = null; + if ($this->debug) $this->debug->debug('Starting run loop'); + if (!$this->quiet) $io->writeln(' [INFO] Starting run loop'); + while(true) { + while (!$this->lock->acquire()) { + if ($this->debug) $this->debug->debug('Unable to lock mutex'); + if (!$this->quiet) $io->writeln(' [WARN] No lock!'); + usleep(10000); } + + if (!$this->quiet && $this->debug) $io->writeln(' [INFO] Query!'); + $this->queuedJob = $this->ci->classMapper->getClassMapping('queued_job')::query() + ->where('flag_started', false) + ->orderBy('created_at') + ->first(); + + if ($this->queuedJob) { + if ($this->debug) $this->debug->debug('Found a job to do!', [$this->queuedJob]); + if (!$this->quiet) $io->writeln(' [INFO] Found a job to do!'); + + try { + $this->queuedJob->started_at = Carbon::now(); + $this->queuedJob->flag_started = true; + $this->queuedJob->save(); + $this->lock->release(); + + $this->currentJob = $this->queuedJob->dequeue(); + if ($this->debug) $this->debug->debug('Job details:', [$this->currentJob, $this->currentJob->getParams()]); + + if (!$this->currentJob->run()) { + if ($this->debug) $this->debug->debug('Job failed!'); + if (!$this->quiet) $io->writeln(' [ERROR] Job failed!'); + + $this->queuedJob->flag_failed = true; + } else { + if ($this->debug) $this->debug->debug('Job completed!'); + if (!$this->quiet) $io->writeln(' [INFO] Job completed!'); + + $this->queuedJob->flag_completed = true; + $this->queuedJob->completed_at = Carbon::now(); + } + + $this->queuedJob->save(); + } catch(Exception $ex) { + if ($this->debug) $this->debug->error('Job execution failed!', [$ex]); + + $io->error('An exception occurred attempting to run a job'); + var_dump($ex); + + $this->queuedJob->flag_failed = true; + $this->queuedJob->save(); + } finally { + $this->currentJob = null; + } + + $this->queuedJob = null; + } else { + $this->lock->release(); + } + + sleep(1); } } public function handleTerm($signo, $siginfo) { + if ($this->debug) $this->debug->debug('Trapped signal', [$signo, $siginfo]); + if ($this->currentJob) { - $this->currentJob->terminate(); - $this->currentJob = null; + try { + if ($this->debug) $this->debug->debug('Waiting for job to terminiate'); + + $this->currentJob->terminate(); + $this->queuedJob->started_at = null; + $this->queuedJob->flag_started = false; + $this->queuedJob->save(); + + if ($this->debug) $this->debug->debug('Terminate successful'); + } catch(Exception $ex) { + if ($this->debug) $this->debug->debug('Terminate failed!', [$ex]); + $io->error('An exception occurred attempting to stop a job'); + var_dump($ex); + + $this->queuedJob->flag_failed = true; + $this->queuedJob->save(); + } finally { + $this->currentJob = null; + } + + $this->queuedJob = null; } + + if ($this->debug) $this->debug->debug('Exiting now'); exit; } }