diff --git a/composer.json b/composer.json new file mode 100644 index 0000000..f882e9e --- /dev/null +++ b/composer.json @@ -0,0 +1,13 @@ +{ + "name": "avsdev/sprinkle-worker", + "type": "userfrosting-worker", + "description": "A sprinkle to add a worker thread.", + "autoload": { + "files" : [ + "defines.php" + ], + "psr-4": { + "UserFrosting\\Sprinkle\\Worker\\": "src/" + } + } +} \ No newline at end of file diff --git a/defines.php b/defines.php new file mode 100644 index 0000000..969317f --- /dev/null +++ b/defines.php @@ -0,0 +1,13 @@ +setName('worker:list') + ->setDescription('List all potential jobs') + ->setHelp('This command returns a list of potential worker jobs that can be queued to be run by the worker.'); + } + + /** + * {@inheritdoc} + */ + protected function execute(InputInterface $input, OutputInterface $output) + { + $this->io->title('Available Jobs List'); + $jobs = $this->ci->worker->getAvailableJobDefinitions(); + $this->io->table(['Sprinkle', 'Name', 'Namespace'], $jobs); + + return self::SUCCESS; + } +} diff --git a/src/Bakery/WorkerRunCommand.php b/src/Bakery/WorkerRunCommand.php new file mode 100644 index 0000000..04eb3fc --- /dev/null +++ b/src/Bakery/WorkerRunCommand.php @@ -0,0 +1,46 @@ +setName('worker:run') + ->setDescription('Run a worker loop') + ->setHelp('This command runs a the workers job loop. The worker will not return until SIGTERM is sent.'); + } + + /** + * {@inheritdoc} + */ + protected function execute(InputInterface $input, OutputInterface $output) + { + $worker = $this->ci->worker; + + $worker->run($this->io); + + return self::SUCCESS; + } +} \ No newline at end of file diff --git a/src/Jobs/JobBase.php b/src/Jobs/JobBase.php new file mode 100644 index 0000000..46deb4f --- /dev/null +++ b/src/Jobs/JobBase.php @@ -0,0 +1,35 @@ +ci = $ci; + } +} diff --git a/src/Jobs/JobInterface.php b/src/Jobs/JobInterface.php new file mode 100644 index 0000000..f50b0bd --- /dev/null +++ b/src/Jobs/JobInterface.php @@ -0,0 +1,31 @@ +registerStreams(); + } + + /** + * Register worker sprinkle locator streams. + */ + protected function registerStreams() + { + /** @var \UserFrosting\UniformResourceLocator\ResourceLocator $locator */ + $locator = $this->ci->locator; + + // Register jobs sprinkle class streams + $locator->registerStream('jobs', '', \UserFrosting\JOBS_DIR); + } +} diff --git a/src/Worker/Worker.php b/src/Worker/Worker.php new file mode 100644 index 0000000..2f88bea --- /dev/null +++ b/src/Worker/Worker.php @@ -0,0 +1,147 @@ +ci = $ci; + } + + /** + * Loop all the available sprinkles and return a list of their jobs. + * + * @return array An array of all the task classes found for every sprinkle + */ + public function getAvailableJobDefinitions() + { + $tasks = $this->ci->locator->listResources($this->scheme, false, false); + + return $this->loadAvailableJobDefinitions($tasks); + } + + /** + * Process jobs Resource into info. + * + * @param array $taskFiles List of jobs files + * + * @return array + */ + protected function loadAvailableJobDefinitions(array $jobFiles) + { + $jobs = []; + + foreach ($jobFiles as $jobFile) { + $job = $this->getJobDefinition($jobFile); + if ($job) { + $jobs[] = $job; + } + } + + return $jobs; + } + + /** + * Return an array of job details including the class name and the sprinkle name. + * + * @param ResourceInstance $file The job file + * + * @return array The details about a job file [name, class, sprinkle] + */ + protected function getJobDefinition(ResourceInstance $file) + { + // Format the sprinkle name for the namespace + $sprinkleName = $file->getLocation()->getName(); + $sprinkleName = Str::studly($sprinkleName); + + // Getting base path, name and class name + $basePath = str_replace($file->getBasename(), '', $file->getBasePath()); + $name = $basePath . $file->getFilename(); + $className = str_replace('/', '\\', $basePath) . $file->getFilename(); + $classPath = "\\UserFrosting\\Sprinkle\\$sprinkleName\\Worker\\Jobs\\$className"; + + if ($className == "JobInterface" || $className == "JobBase") { + return null; + } + + // Build the class name and namespace + return [ + 'sprinkle' => $sprinkleName, + 'name' => $name, + 'class' => $classPath, + ]; + } + + + /** + * Runs the worker loop, procssing queued jobs. + */ + public function run($io) + { + pcntl_async_signals(TRUE); + + pcntl_signal(SIGINT, array($this, "handleTerm")); + pcntl_signal(SIGQUIT, array($this, "handleTerm")); + pcntl_signal(SIGTERM, array($this, "handleTerm")); + + while(true) { + // TODO: find jobs in the database + + try { + // Run the job using call_user_func_array + } catch(Exception $ex) { + $io->error('An exception occurred attempting to run a job'); + var_dump($ex); + } finally { + $this->currentJob = null; + } + } + } + + public function handleTerm($signo, $siginfo) { + if ($this->currentJob) { + $this->currentJob->terminate(); + $this->currentJob = null; + } + exit; + } +}