diff --git a/composer.json b/composer.json
index f882e9e..ba94377 100644
--- a/composer.json
+++ b/composer.json
@@ -2,6 +2,9 @@
"name": "avsdev/sprinkle-worker",
"type": "userfrosting-worker",
"description": "A sprinkle to add a worker thread.",
+ "require": {
+ "symfony/lock": "*"
+ },
"autoload": {
"files" : [
"defines.php"
diff --git a/src/Bakery/WorkerListCommand.php b/src/Bakery/WorkerListCommand.php
index c8643bc..c6cccb5 100644
--- a/src/Bakery/WorkerListCommand.php
+++ b/src/Bakery/WorkerListCommand.php
@@ -37,7 +37,7 @@ class WorkerListCommand extends BaseCommand
protected function execute(InputInterface $input, OutputInterface $output)
{
$this->io->title('Available Jobs List');
- $jobs = $this->ci->worker->getAvailableJobDefinitions();
+ $jobs = $this->ci->jobInspector->getAvailableJobDefinitions();
$this->io->table(['Sprinkle', 'Name', 'Namespace'], $jobs);
return self::SUCCESS;
diff --git a/src/Bakery/WorkerRunCommand.php b/src/Bakery/WorkerRunCommand.php
index 04eb3fc..2d34b22 100644
--- a/src/Bakery/WorkerRunCommand.php
+++ b/src/Bakery/WorkerRunCommand.php
@@ -9,9 +9,10 @@
namespace UserFrosting\Sprinkle\Worker\Bakery;
-use Symfony\Component\Console\Input\InputArgument;
+use Symfony\Component\Console\Input\InputOption;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Output\OutputInterface;
+use UserFrosting\Sprinkle\Worker\Worker\Worker;
use UserFrosting\System\Bakery\BaseCommand;
/**
@@ -29,7 +30,9 @@ class WorkerRunCommand extends BaseCommand
{
$this->setName('worker:run')
->setDescription('Run a worker loop')
- ->setHelp('This command runs a the workers job loop. The worker will not return until SIGTERM is sent.');
+ ->setHelp('This command runs a the workers job loop. The worker will not return until SIGTERM is sent.')
+ ->addOption('debug', null, InputOption::VALUE_NONE, 'Output additional status messages.')
+ ->addOption('quiet', null, InputOption::VALUE_NONE, 'Suppress output messages.');
}
/**
@@ -37,7 +40,15 @@ class WorkerRunCommand extends BaseCommand
*/
protected function execute(InputInterface $input, OutputInterface $output)
{
- $worker = $this->ci->worker;
+ $worker = new Worker($this->ci);
+
+ if ($input->getOption('debug')) {
+ $worker->enableDebug();
+ }
+
+ if ($input->getOption('quiet')) {
+ $worker->setQuiet();
+ }
$worker->run($this->io);
diff --git a/src/Database/Migrations/v100/QueuedJobsTable.php b/src/Database/Migrations/v100/QueuedJobsTable.php
new file mode 100644
index 0000000..ff0e809
--- /dev/null
+++ b/src/Database/Migrations/v100/QueuedJobsTable.php
@@ -0,0 +1,60 @@
+schema->hasTable('queued_jobs')) {
+ $this->schema->create('queued_jobs', function (Blueprint $table) {
+ $table->increments('id');
+ $table->string('job');
+ $table->json('params')->default('[]');
+ $table->boolean('flag_started')->default(false);
+ $table->boolean('flag_completed')->default(false);
+ $table->boolean('flag_failed')->default(false);
+ $table->timestamp('started_at')->nullable();
+ $table->timestamp('completed_at')->nullable();
+ $table->timestamps();
+
+ $table->engine = 'InnoDB';
+ $table->collation = 'utf8_unicode_ci';
+ $table->charset = 'utf8';
+ });
+ }
+ }
+
+ /**
+ * {@inheritdoc}
+ */
+ public function down()
+ {
+ $this->schema->drop('queued_jobs');
+ }
+}
diff --git a/src/Database/Models/QueuedJob.php b/src/Database/Models/QueuedJob.php
new file mode 100644
index 0000000..71607f8
--- /dev/null
+++ b/src/Database/Models/QueuedJob.php
@@ -0,0 +1,68 @@
+job;
+ $params = json_decode($this->params, true);
+
+ $job = new $jobClass();
+ $job->setParams($params);
+ return $job;
+ }
+}
diff --git a/src/Jobs/JobBase.php b/src/Jobs/JobBase.php
index 46deb4f..dfe46f0 100644
--- a/src/Jobs/JobBase.php
+++ b/src/Jobs/JobBase.php
@@ -21,15 +21,20 @@ abstract class JobBase implements JobInterface
/**
* @var ContainerInterface
*/
- protected $ci;
+ public static $ci;
+
/**
- * Constructor.
- *
- * @param ContainerInterface $ci
+ * Queues the current job by adding a reference into the database
*/
- public function __construct(ContainerInterface $ci)
+ public function queue()
{
- $this->ci = $ci;
+ /** @var \UserFrosting\Sprinkle\Core\Util\ClassMapper $classMapper */
+ $classMapper = static::$ci->classMapper;
+
+ $classMapper->createInstance('queued_job', [
+ 'job' => get_class($this),
+ 'params' => json_encode($this->getParams()),
+ ])->save();
}
}
diff --git a/src/Jobs/JobInspector.php b/src/Jobs/JobInspector.php
new file mode 100644
index 0000000..196db6e
--- /dev/null
+++ b/src/Jobs/JobInspector.php
@@ -0,0 +1,110 @@
+ci = $ci;
+ }
+
+ /**
+ * Loop all the available sprinkles and return a list of their jobs.
+ *
+ * @return array An array of all the task classes found for every sprinkle
+ */
+ public function getAvailableJobDefinitions()
+ {
+ $tasks = $this->ci->locator->listResources($this->scheme, false, false);
+
+ return $this->loadAvailableJobDefinitions($tasks);
+ }
+
+ /**
+ * Process jobs Resource into info.
+ *
+ * @param array $taskFiles List of jobs files
+ *
+ * @return array
+ */
+ protected function loadAvailableJobDefinitions(array $jobFiles)
+ {
+ $jobs = [];
+
+ foreach ($jobFiles as $jobFile) {
+ $job = $this->getJobDefinition($jobFile);
+ if ($job) {
+ $jobs[] = $job;
+ }
+ }
+
+ return $jobs;
+ }
+
+ /**
+ * Return an array of job details including the class name and the sprinkle name.
+ *
+ * @param ResourceInstance $file The job file
+ *
+ * @return array The details about a job file [name, class, sprinkle]
+ */
+ protected function getJobDefinition(ResourceInstance $file)
+ {
+ // Format the sprinkle name for the namespace
+ $sprinkleName = $file->getLocation()->getName();
+ $sprinkleName = Str::studly($sprinkleName);
+
+ // Getting base path, name and class name
+ $basePath = str_replace($file->getBasename(), '', $file->getBasePath());
+ $name = $basePath . $file->getFilename();
+ $className = str_replace('/', '\\', $basePath) . $file->getFilename();
+ $classPath = "\\UserFrosting\\Sprinkle\\$sprinkleName\\Worker\\Jobs\\$className";
+
+ // Exclude known helper classes
+ if ($className == "JobInterface" || $className == "JobBase" || $className == "JobInspector") {
+ return null;
+ }
+
+ // Build the class name and namespace
+ return [
+ 'sprinkle' => $sprinkleName,
+ 'name' => $name,
+ 'class' => $classPath,
+ ];
+ }
+}
diff --git a/src/Jobs/JobInterface.php b/src/Jobs/JobInterface.php
index f50b0bd..b93c0dc 100644
--- a/src/Jobs/JobInterface.php
+++ b/src/Jobs/JobInterface.php
@@ -17,6 +17,17 @@ namespace UserFrosting\Sprinkle\Worker\Jobs;
*/
interface JobInterface
{
+ /**
+ * Sets the job parameters using an array object
+ */
+ public function setParams(array $params);
+
+ /**
+ * Returns the job parameters as an array
+ */
+ public function getParams();
+
+
/**
* Function used to specify what the job does.
*
diff --git a/src/ServicesProvider/ServicesProvider.php b/src/ServicesProvider/ServicesProvider.php
index 9df8bc7..d6998a3 100644
--- a/src/ServicesProvider/ServicesProvider.php
+++ b/src/ServicesProvider/ServicesProvider.php
@@ -31,12 +31,23 @@ class ServicesProvider
public function register(ContainerInterface $container)
{
/*
- * Return an instance of the worker
+ * Extend the 'classMapper' service to register sprunje and model classes.
*
- * @return \UserFrosting\Sprinkle\Worker\Worker\Worker
+ * @return \UserFrosting\Sprinkle\Core\Util\ClassMapper
*/
- $container['worker'] = function ($c) {
- return new Worker($c);
+ $container->extend('classMapper', function ($classMapper, $c) {
+ $classMapper->setClassMapping('queued_job', 'UserFrosting\Sprinkle\Worker\Database\Models\QueuedJob');
+
+ return $classMapper;
+ });
+
+ /*
+ * Return an instance of the job inspector
+ *
+ * @return \UserFrosting\Sprinkle\Worker\Jobs\JobInspector
+ */
+ $container['jobInspector'] = function ($c) {
+ return new JobInspector($c);
};
}
}
diff --git a/src/Worker.php b/src/Worker.php
index 6e17146..e4440e2 100644
--- a/src/Worker.php
+++ b/src/Worker.php
@@ -10,6 +10,7 @@
namespace UserFrosting\Sprinkle\Worker;
use UserFrosting\System\Sprinkle\Sprinkle;
+use UserFrosting\Sprinkle\Worker\Jobs\JobBase;
/**
* Bootstrapper class for the 'worker' sprinkle.
@@ -23,6 +24,9 @@ class Worker extends Sprinkle
*/
public function onSprinklesInitialized()
{
+ // Set container for jobs
+ JobBase::$ci = $this->ci;
+
$this->registerStreams();
}
diff --git a/src/Worker/Worker.php b/src/Worker/Worker.php
index 2f88bea..ba61075 100644
--- a/src/Worker/Worker.php
+++ b/src/Worker/Worker.php
@@ -9,8 +9,11 @@
namespace UserFrosting\Sprinkle\Worker\Worker;
+use Carbon\Carbon;
use Illuminate\Support\Str;
use Psr\Container\ContainerInterface;
+use Symfony\Component\Lock\LockFactory;
+use Symfony\Component\Lock\Store\SemaphoreStore;
use UserFrosting\UniformResourceLocator\Resource as ResourceInstance;
/**
@@ -28,15 +31,42 @@ class Worker
protected $ci;
/**
- * @var string The resource locator scheme
+ * @var object The currently processing job
*/
- protected $scheme = 'jobs://';
+ protected $queuedJob = null;
/**
* @var object The currently processing job
*/
protected $currentJob = null;
+ /**
+ * @var object The debug stream if enabled
+ */
+ protected $debug = null;
+
+ /**
+ * @var boolean Allow or silence stdout output text
+ */
+ protected $quiet = false;
+
+
+ /**
+ * @var object Mutex store
+ */
+ protected $store = null;
+
+ /**
+ * @var object Mutex locker
+ */
+ protected $locker = null;
+
+ /**
+ * @var object Lock mutex
+ */
+ protected $lock = null;
+
+
/**
* Class Constructor.
*
@@ -45,103 +75,151 @@ class Worker
public function __construct(ContainerInterface $ci)
{
$this->ci = $ci;
+
+ $this->store = new SemaphoreStore();
+ $this->locker = new LockFactory($this->store);
+ $this->lock = $this->locker->createLock('worker-job-query');
}
/**
- * Loop all the available sprinkles and return a list of their jobs.
- *
- * @return array An array of all the task classes found for every sprinkle
+ * Enables runtime debugging
*/
- public function getAvailableJobDefinitions()
+ public function enableDebug()
{
- $tasks = $this->ci->locator->listResources($this->scheme, false, false);
-
- return $this->loadAvailableJobDefinitions($tasks);
+ $this->debug = $this->ci->debugLogger;
+ }
+ /**
+ * Enables runtime debugging
+ */
+ public function disableDebug()
+ {
+ $this->debug = null;
}
/**
- * Process jobs Resource into info.
- *
- * @param array $taskFiles List of jobs files
- *
- * @return array
+ * Sets the quiet flag
*/
- protected function loadAvailableJobDefinitions(array $jobFiles)
+ public function setQuiet()
{
- $jobs = [];
-
- foreach ($jobFiles as $jobFile) {
- $job = $this->getJobDefinition($jobFile);
- if ($job) {
- $jobs[] = $job;
- }
- }
-
- return $jobs;
+ $this->quiet = true;
}
-
/**
- * Return an array of job details including the class name and the sprinkle name.
- *
- * @param ResourceInstance $file The job file
- *
- * @return array The details about a job file [name, class, sprinkle]
+ * Clears the quiet flag
*/
- protected function getJobDefinition(ResourceInstance $file)
+ public function clearQuiet()
{
- // Format the sprinkle name for the namespace
- $sprinkleName = $file->getLocation()->getName();
- $sprinkleName = Str::studly($sprinkleName);
-
- // Getting base path, name and class name
- $basePath = str_replace($file->getBasename(), '', $file->getBasePath());
- $name = $basePath . $file->getFilename();
- $className = str_replace('/', '\\', $basePath) . $file->getFilename();
- $classPath = "\\UserFrosting\\Sprinkle\\$sprinkleName\\Worker\\Jobs\\$className";
-
- if ($className == "JobInterface" || $className == "JobBase") {
- return null;
- }
-
- // Build the class name and namespace
- return [
- 'sprinkle' => $sprinkleName,
- 'name' => $name,
- 'class' => $classPath,
- ];
+ $this->quiet = false;
}
-
/**
* Runs the worker loop, procssing queued jobs.
*/
public function run($io)
{
+ // Hacky way to force the DB service to load before attempting to use the worker
+ $this->ci['db'];
+
+ if ($this->debug) $this->debug->debug('Inserting signal handlers for SIGINT, SIGQUIT and SIGTERM');
+ if (!$this->quiet) $io->writeln(' [INFO] Inserting signal handlers for SIGINT, SIGQUIT and SIGTERM');
+
pcntl_async_signals(TRUE);
pcntl_signal(SIGINT, array($this, "handleTerm"));
pcntl_signal(SIGQUIT, array($this, "handleTerm"));
pcntl_signal(SIGTERM, array($this, "handleTerm"));
- while(true) {
- // TODO: find jobs in the database
- try {
- // Run the job using call_user_func_array
- } catch(Exception $ex) {
- $io->error('An exception occurred attempting to run a job');
- var_dump($ex);
- } finally {
- $this->currentJob = null;
+ if ($this->debug) $this->debug->debug('Starting run loop');
+ if (!$this->quiet) $io->writeln(' [INFO] Starting run loop');
+ while(true) {
+ while (!$this->lock->acquire()) {
+ if ($this->debug) $this->debug->debug('Unable to lock mutex');
+ if (!$this->quiet) $io->writeln(' [WARN] No lock!');
+ usleep(10000);
}
+
+ if (!$this->quiet && $this->debug) $io->writeln(' [INFO] Query!');
+ $this->queuedJob = $this->ci->classMapper->getClassMapping('queued_job')::query()
+ ->where('flag_started', false)
+ ->orderBy('created_at')
+ ->first();
+
+ if ($this->queuedJob) {
+ if ($this->debug) $this->debug->debug('Found a job to do!', [$this->queuedJob]);
+ if (!$this->quiet) $io->writeln(' [INFO] Found a job to do!');
+
+ try {
+ $this->queuedJob->started_at = Carbon::now();
+ $this->queuedJob->flag_started = true;
+ $this->queuedJob->save();
+ $this->lock->release();
+
+ $this->currentJob = $this->queuedJob->dequeue();
+ if ($this->debug) $this->debug->debug('Job details:', [$this->currentJob, $this->currentJob->getParams()]);
+
+ if (!$this->currentJob->run()) {
+ if ($this->debug) $this->debug->debug('Job failed!');
+ if (!$this->quiet) $io->writeln(' [ERROR] Job failed!');
+
+ $this->queuedJob->flag_failed = true;
+ } else {
+ if ($this->debug) $this->debug->debug('Job completed!');
+ if (!$this->quiet) $io->writeln(' [INFO] Job completed!');
+
+ $this->queuedJob->flag_completed = true;
+ $this->queuedJob->completed_at = Carbon::now();
+ }
+
+ $this->queuedJob->save();
+ } catch(Exception $ex) {
+ if ($this->debug) $this->debug->error('Job execution failed!', [$ex]);
+
+ $io->error('An exception occurred attempting to run a job');
+ var_dump($ex);
+
+ $this->queuedJob->flag_failed = true;
+ $this->queuedJob->save();
+ } finally {
+ $this->currentJob = null;
+ }
+
+ $this->queuedJob = null;
+ } else {
+ $this->lock->release();
+ }
+
+ sleep(1);
}
}
public function handleTerm($signo, $siginfo) {
+ if ($this->debug) $this->debug->debug('Trapped signal', [$signo, $siginfo]);
+
if ($this->currentJob) {
- $this->currentJob->terminate();
- $this->currentJob = null;
+ try {
+ if ($this->debug) $this->debug->debug('Waiting for job to terminiate');
+
+ $this->currentJob->terminate();
+ $this->queuedJob->started_at = null;
+ $this->queuedJob->flag_started = false;
+ $this->queuedJob->save();
+
+ if ($this->debug) $this->debug->debug('Terminate successful');
+ } catch(Exception $ex) {
+ if ($this->debug) $this->debug->debug('Terminate failed!', [$ex]);
+ $io->error('An exception occurred attempting to stop a job');
+ var_dump($ex);
+
+ $this->queuedJob->flag_failed = true;
+ $this->queuedJob->save();
+ } finally {
+ $this->currentJob = null;
+ }
+
+ $this->queuedJob = null;
}
+
+ if ($this->debug) $this->debug->debug('Exiting now');
exit;
}
}