Implemented job queue table and the gubbins around it

This commit is contained in:
2022-05-24 16:37:45 +01:00
parent fb24b2ee72
commit 6806291554
11 changed files with 439 additions and 78 deletions

View File

@@ -2,6 +2,9 @@
"name": "avsdev/sprinkle-worker", "name": "avsdev/sprinkle-worker",
"type": "userfrosting-worker", "type": "userfrosting-worker",
"description": "A sprinkle to add a worker thread.", "description": "A sprinkle to add a worker thread.",
"require": {
"symfony/lock": "*"
},
"autoload": { "autoload": {
"files" : [ "files" : [
"defines.php" "defines.php"

View File

@@ -37,7 +37,7 @@ class WorkerListCommand extends BaseCommand
protected function execute(InputInterface $input, OutputInterface $output) protected function execute(InputInterface $input, OutputInterface $output)
{ {
$this->io->title('Available Jobs List'); $this->io->title('Available Jobs List');
$jobs = $this->ci->worker->getAvailableJobDefinitions(); $jobs = $this->ci->jobInspector->getAvailableJobDefinitions();
$this->io->table(['Sprinkle', 'Name', 'Namespace'], $jobs); $this->io->table(['Sprinkle', 'Name', 'Namespace'], $jobs);
return self::SUCCESS; return self::SUCCESS;

View File

@@ -9,9 +9,10 @@
namespace UserFrosting\Sprinkle\Worker\Bakery; namespace UserFrosting\Sprinkle\Worker\Bakery;
use Symfony\Component\Console\Input\InputArgument; use Symfony\Component\Console\Input\InputOption;
use Symfony\Component\Console\Input\InputInterface; use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Output\OutputInterface; use Symfony\Component\Console\Output\OutputInterface;
use UserFrosting\Sprinkle\Worker\Worker\Worker;
use UserFrosting\System\Bakery\BaseCommand; use UserFrosting\System\Bakery\BaseCommand;
/** /**
@@ -29,7 +30,9 @@ class WorkerRunCommand extends BaseCommand
{ {
$this->setName('worker:run') $this->setName('worker:run')
->setDescription('Run a worker loop') ->setDescription('Run a worker loop')
->setHelp('This command runs a the workers job loop. The worker will not return until SIGTERM is sent.'); ->setHelp('This command runs a the workers job loop. The worker will not return until SIGTERM is sent.')
->addOption('debug', null, InputOption::VALUE_NONE, 'Output additional status messages.')
->addOption('quiet', null, InputOption::VALUE_NONE, 'Suppress output messages.');
} }
/** /**
@@ -37,7 +40,15 @@ class WorkerRunCommand extends BaseCommand
*/ */
protected function execute(InputInterface $input, OutputInterface $output) protected function execute(InputInterface $input, OutputInterface $output)
{ {
$worker = $this->ci->worker; $worker = new Worker($this->ci);
if ($input->getOption('debug')) {
$worker->enableDebug();
}
if ($input->getOption('quiet')) {
$worker->setQuiet();
}
$worker->run($this->io); $worker->run($this->io);

View File

@@ -0,0 +1,60 @@
<?php
/*
* AVSDev UF Worker (https://avsdev.uk)
*
* @link https://git.avsdev.uk/avsdev/sprinkle-worker
* @license https://git.avsdev.uk/avsdev/sprinkle-worker/blob/master/LICENSE.md (LGPL-3.0 License)
*/
namespace UserFrosting\Sprinkle\Worker\Database\Migrations\v100;
use Illuminate\Database\Schema\Blueprint;
use UserFrosting\Sprinkle\Core\Database\Migration;
/**
* Queued Jobs table migration
* Version 1.0.0.
*
* @author Craig Williams (https://avsdev.uk)
*/
class QueuedJobsTable extends Migration
{
/**
* {@inheritdoc}
*/
public static $dependencies = [
];
/**
* {@inheritdoc}
*/
public function up()
{
if (!$this->schema->hasTable('queued_jobs')) {
$this->schema->create('queued_jobs', function (Blueprint $table) {
$table->increments('id');
$table->string('job');
$table->json('params')->default('[]');
$table->boolean('flag_started')->default(false);
$table->boolean('flag_completed')->default(false);
$table->boolean('flag_failed')->default(false);
$table->timestamp('started_at')->nullable();
$table->timestamp('completed_at')->nullable();
$table->timestamps();
$table->engine = 'InnoDB';
$table->collation = 'utf8_unicode_ci';
$table->charset = 'utf8';
});
}
}
/**
* {@inheritdoc}
*/
public function down()
{
$this->schema->drop('queued_jobs');
}
}

View File

@@ -0,0 +1,68 @@
<?php
/*
* AVSDev UF Worker (https://avsdev.uk)
*
* @link https://git.avsdev.uk/avsdev/sprinkle-worker
* @license https://git.avsdev.uk/avsdev/sprinkle-worker/blob/master/LICENSE.md (LGPL-3.0 License)
*/
namespace UserFrosting\Sprinkle\Worker\Database\Models;
use Illuminate\Database\Eloquent\SoftDeletes;
use Illuminate\Database\Capsule\Manager as DB;
use UserFrosting\Sprinkle\Core\Database\Models\Model;
/**
* Queued Job class
*
* @author Craig Williams (https://avsdev.uk)
*/
class QueuedJob extends Model
{
/**
* @var string The name of the table for the current model.
*/
protected $table = 'queued_jobs';
/**
* Fields that should be mass-assignable when creating a new Organisation.
*
* @var string[]
*/
protected $fillable = [
'job',
'params',
];
/**
* The attributes that should be mutated to dates.
*
* @var string[]
*/
protected $dates = [
'created_at',
'updated_at',
'started_at',
'completed_at',
];
/**
* @var bool Enable timestamps for this class.
*/
public $timestamps = true;
/**
* Dequeues the current model as a job object
*/
public function dequeue()
{
$jobClass = $this->job;
$params = json_decode($this->params, true);
$job = new $jobClass();
$job->setParams($params);
return $job;
}
}

View File

@@ -21,15 +21,20 @@ abstract class JobBase implements JobInterface
/** /**
* @var ContainerInterface * @var ContainerInterface
*/ */
protected $ci; public static $ci;
/** /**
* Constructor. * Queues the current job by adding a reference into the database
*
* @param ContainerInterface $ci
*/ */
public function __construct(ContainerInterface $ci) public function queue()
{ {
$this->ci = $ci; /** @var \UserFrosting\Sprinkle\Core\Util\ClassMapper $classMapper */
$classMapper = static::$ci->classMapper;
$classMapper->createInstance('queued_job', [
'job' => get_class($this),
'params' => json_encode($this->getParams()),
])->save();
} }
} }

110
src/Jobs/JobInspector.php Normal file
View File

@@ -0,0 +1,110 @@
<?php
/*
* AVSDev UF Worker (https://avsdev.uk)
*
* @link https://git.avsdev.uk/avsdev/sprinkle-worker
* @license https://git.avsdev.uk/avsdev/sprinkle-worker/blob/master/LICENSE.md (LGPL-3.0 License)
*/
namespace UserFrosting\Sprinkle\Worker\Jobs;
use Carbon\Carbon;
use Illuminate\Support\Str;
use Psr\Container\ContainerInterface;
use UserFrosting\UniformResourceLocator\Resource as ResourceInstance;
/**
* Job Inspector Class.
*
* Finds all job classes across sprinkles
*
* @author Craig Williams (https://avsdev.uk)
*/
class JobInspector
{
/**
* @var ContainerInterface
*/
protected $ci;
/**
* @var string The resource locator scheme
*/
protected $scheme = 'jobs://';
/**
* Class Constructor.
*
* @param ContainerInterface $ci
*/
public function __construct(ContainerInterface $ci)
{
$this->ci = $ci;
}
/**
* Loop all the available sprinkles and return a list of their jobs.
*
* @return array An array of all the task classes found for every sprinkle
*/
public function getAvailableJobDefinitions()
{
$tasks = $this->ci->locator->listResources($this->scheme, false, false);
return $this->loadAvailableJobDefinitions($tasks);
}
/**
* Process jobs Resource into info.
*
* @param array $taskFiles List of jobs files
*
* @return array
*/
protected function loadAvailableJobDefinitions(array $jobFiles)
{
$jobs = [];
foreach ($jobFiles as $jobFile) {
$job = $this->getJobDefinition($jobFile);
if ($job) {
$jobs[] = $job;
}
}
return $jobs;
}
/**
* Return an array of job details including the class name and the sprinkle name.
*
* @param ResourceInstance $file The job file
*
* @return array The details about a job file [name, class, sprinkle]
*/
protected function getJobDefinition(ResourceInstance $file)
{
// Format the sprinkle name for the namespace
$sprinkleName = $file->getLocation()->getName();
$sprinkleName = Str::studly($sprinkleName);
// Getting base path, name and class name
$basePath = str_replace($file->getBasename(), '', $file->getBasePath());
$name = $basePath . $file->getFilename();
$className = str_replace('/', '\\', $basePath) . $file->getFilename();
$classPath = "\\UserFrosting\\Sprinkle\\$sprinkleName\\Worker\\Jobs\\$className";
// Exclude known helper classes
if ($className == "JobInterface" || $className == "JobBase" || $className == "JobInspector") {
return null;
}
// Build the class name and namespace
return [
'sprinkle' => $sprinkleName,
'name' => $name,
'class' => $classPath,
];
}
}

View File

@@ -17,6 +17,17 @@ namespace UserFrosting\Sprinkle\Worker\Jobs;
*/ */
interface JobInterface interface JobInterface
{ {
/**
* Sets the job parameters using an array object
*/
public function setParams(array $params);
/**
* Returns the job parameters as an array
*/
public function getParams();
/** /**
* Function used to specify what the job does. * Function used to specify what the job does.
* *

View File

@@ -31,12 +31,23 @@ class ServicesProvider
public function register(ContainerInterface $container) public function register(ContainerInterface $container)
{ {
/* /*
* Return an instance of the worker * Extend the 'classMapper' service to register sprunje and model classes.
* *
* @return \UserFrosting\Sprinkle\Worker\Worker\Worker * @return \UserFrosting\Sprinkle\Core\Util\ClassMapper
*/ */
$container['worker'] = function ($c) { $container->extend('classMapper', function ($classMapper, $c) {
return new Worker($c); $classMapper->setClassMapping('queued_job', 'UserFrosting\Sprinkle\Worker\Database\Models\QueuedJob');
return $classMapper;
});
/*
* Return an instance of the job inspector
*
* @return \UserFrosting\Sprinkle\Worker\Jobs\JobInspector
*/
$container['jobInspector'] = function ($c) {
return new JobInspector($c);
}; };
} }
} }

View File

@@ -10,6 +10,7 @@
namespace UserFrosting\Sprinkle\Worker; namespace UserFrosting\Sprinkle\Worker;
use UserFrosting\System\Sprinkle\Sprinkle; use UserFrosting\System\Sprinkle\Sprinkle;
use UserFrosting\Sprinkle\Worker\Jobs\JobBase;
/** /**
* Bootstrapper class for the 'worker' sprinkle. * Bootstrapper class for the 'worker' sprinkle.
@@ -23,6 +24,9 @@ class Worker extends Sprinkle
*/ */
public function onSprinklesInitialized() public function onSprinklesInitialized()
{ {
// Set container for jobs
JobBase::$ci = $this->ci;
$this->registerStreams(); $this->registerStreams();
} }

View File

@@ -9,8 +9,11 @@
namespace UserFrosting\Sprinkle\Worker\Worker; namespace UserFrosting\Sprinkle\Worker\Worker;
use Carbon\Carbon;
use Illuminate\Support\Str; use Illuminate\Support\Str;
use Psr\Container\ContainerInterface; use Psr\Container\ContainerInterface;
use Symfony\Component\Lock\LockFactory;
use Symfony\Component\Lock\Store\SemaphoreStore;
use UserFrosting\UniformResourceLocator\Resource as ResourceInstance; use UserFrosting\UniformResourceLocator\Resource as ResourceInstance;
/** /**
@@ -28,15 +31,42 @@ class Worker
protected $ci; protected $ci;
/** /**
* @var string The resource locator scheme * @var object The currently processing job
*/ */
protected $scheme = 'jobs://'; protected $queuedJob = null;
/** /**
* @var object The currently processing job * @var object The currently processing job
*/ */
protected $currentJob = null; protected $currentJob = null;
/**
* @var object The debug stream if enabled
*/
protected $debug = null;
/**
* @var boolean Allow or silence stdout output text
*/
protected $quiet = false;
/**
* @var object Mutex store
*/
protected $store = null;
/**
* @var object Mutex locker
*/
protected $locker = null;
/**
* @var object Lock mutex
*/
protected $lock = null;
/** /**
* Class Constructor. * Class Constructor.
* *
@@ -45,103 +75,151 @@ class Worker
public function __construct(ContainerInterface $ci) public function __construct(ContainerInterface $ci)
{ {
$this->ci = $ci; $this->ci = $ci;
$this->store = new SemaphoreStore();
$this->locker = new LockFactory($this->store);
$this->lock = $this->locker->createLock('worker-job-query');
} }
/** /**
* Loop all the available sprinkles and return a list of their jobs. * Enables runtime debugging
*
* @return array An array of all the task classes found for every sprinkle
*/ */
public function getAvailableJobDefinitions() public function enableDebug()
{ {
$tasks = $this->ci->locator->listResources($this->scheme, false, false); $this->debug = $this->ci->debugLogger;
}
return $this->loadAvailableJobDefinitions($tasks); /**
* Enables runtime debugging
*/
public function disableDebug()
{
$this->debug = null;
} }
/** /**
* Process jobs Resource into info. * Sets the quiet flag
*
* @param array $taskFiles List of jobs files
*
* @return array
*/ */
protected function loadAvailableJobDefinitions(array $jobFiles) public function setQuiet()
{ {
$jobs = []; $this->quiet = true;
foreach ($jobFiles as $jobFile) {
$job = $this->getJobDefinition($jobFile);
if ($job) {
$jobs[] = $job;
}
}
return $jobs;
} }
/** /**
* Return an array of job details including the class name and the sprinkle name. * Clears the quiet flag
*
* @param ResourceInstance $file The job file
*
* @return array The details about a job file [name, class, sprinkle]
*/ */
protected function getJobDefinition(ResourceInstance $file) public function clearQuiet()
{ {
// Format the sprinkle name for the namespace $this->quiet = false;
$sprinkleName = $file->getLocation()->getName();
$sprinkleName = Str::studly($sprinkleName);
// Getting base path, name and class name
$basePath = str_replace($file->getBasename(), '', $file->getBasePath());
$name = $basePath . $file->getFilename();
$className = str_replace('/', '\\', $basePath) . $file->getFilename();
$classPath = "\\UserFrosting\\Sprinkle\\$sprinkleName\\Worker\\Jobs\\$className";
if ($className == "JobInterface" || $className == "JobBase") {
return null;
}
// Build the class name and namespace
return [
'sprinkle' => $sprinkleName,
'name' => $name,
'class' => $classPath,
];
} }
/** /**
* Runs the worker loop, procssing queued jobs. * Runs the worker loop, procssing queued jobs.
*/ */
public function run($io) public function run($io)
{ {
// Hacky way to force the DB service to load before attempting to use the worker
$this->ci['db'];
if ($this->debug) $this->debug->debug('Inserting signal handlers for SIGINT, SIGQUIT and SIGTERM');
if (!$this->quiet) $io->writeln('<info> [INFO] Inserting signal handlers for SIGINT, SIGQUIT and SIGTERM</info>');
pcntl_async_signals(TRUE); pcntl_async_signals(TRUE);
pcntl_signal(SIGINT, array($this, "handleTerm")); pcntl_signal(SIGINT, array($this, "handleTerm"));
pcntl_signal(SIGQUIT, array($this, "handleTerm")); pcntl_signal(SIGQUIT, array($this, "handleTerm"));
pcntl_signal(SIGTERM, array($this, "handleTerm")); pcntl_signal(SIGTERM, array($this, "handleTerm"));
while(true) {
// TODO: find jobs in the database
try { if ($this->debug) $this->debug->debug('Starting run loop');
// Run the job using call_user_func_array if (!$this->quiet) $io->writeln('<info> [INFO] Starting run loop</info>');
} catch(Exception $ex) { while(true) {
$io->error('An exception occurred attempting to run a job'); while (!$this->lock->acquire()) {
var_dump($ex); if ($this->debug) $this->debug->debug('Unable to lock mutex');
} finally { if (!$this->quiet) $io->writeln('<comment> [WARN] No lock!</comment>');
$this->currentJob = null; usleep(10000);
} }
if (!$this->quiet && $this->debug) $io->writeln('<info> [INFO] Query!</info>');
$this->queuedJob = $this->ci->classMapper->getClassMapping('queued_job')::query()
->where('flag_started', false)
->orderBy('created_at')
->first();
if ($this->queuedJob) {
if ($this->debug) $this->debug->debug('Found a job to do!', [$this->queuedJob]);
if (!$this->quiet) $io->writeln('<info> [INFO] Found a job to do!</info>');
try {
$this->queuedJob->started_at = Carbon::now();
$this->queuedJob->flag_started = true;
$this->queuedJob->save();
$this->lock->release();
$this->currentJob = $this->queuedJob->dequeue();
if ($this->debug) $this->debug->debug('Job details:', [$this->currentJob, $this->currentJob->getParams()]);
if (!$this->currentJob->run()) {
if ($this->debug) $this->debug->debug('Job failed!');
if (!$this->quiet) $io->writeln('<error> [ERROR] Job failed!</error>');
$this->queuedJob->flag_failed = true;
} else {
if ($this->debug) $this->debug->debug('Job completed!');
if (!$this->quiet) $io->writeln('<info> [INFO] Job completed!</info>');
$this->queuedJob->flag_completed = true;
$this->queuedJob->completed_at = Carbon::now();
}
$this->queuedJob->save();
} catch(Exception $ex) {
if ($this->debug) $this->debug->error('Job execution failed!', [$ex]);
$io->error('An exception occurred attempting to run a job');
var_dump($ex);
$this->queuedJob->flag_failed = true;
$this->queuedJob->save();
} finally {
$this->currentJob = null;
}
$this->queuedJob = null;
} else {
$this->lock->release();
}
sleep(1);
} }
} }
public function handleTerm($signo, $siginfo) { public function handleTerm($signo, $siginfo) {
if ($this->debug) $this->debug->debug('Trapped signal', [$signo, $siginfo]);
if ($this->currentJob) { if ($this->currentJob) {
$this->currentJob->terminate(); try {
$this->currentJob = null; if ($this->debug) $this->debug->debug('Waiting for job to terminiate');
$this->currentJob->terminate();
$this->queuedJob->started_at = null;
$this->queuedJob->flag_started = false;
$this->queuedJob->save();
if ($this->debug) $this->debug->debug('Terminate successful');
} catch(Exception $ex) {
if ($this->debug) $this->debug->debug('Terminate failed!', [$ex]);
$io->error('An exception occurred attempting to stop a job');
var_dump($ex);
$this->queuedJob->flag_failed = true;
$this->queuedJob->save();
} finally {
$this->currentJob = null;
}
$this->queuedJob = null;
} }
if ($this->debug) $this->debug->debug('Exiting now');
exit; exit;
} }
} }