A framework for defining and running background jobs in a queued manner
Marcus Nyeholt
<marcus (at) symbiote (dot) com (dot) au>
The master branch of this module is currently aiming for SilverStripe 4.x compatibility
See http://github.com/symbiote/silverstripe-queuedjobs/wiki/ for more complete
documentation
The Queued Jobs module provides a framework for SilverStripe developers to
define long running processes that should be run as background tasks.
This asynchronous processing allows users to continue using the system
while long running tasks proceed when time permits. It also lets
developers set these processes to be executed in the future.
The module comes with
*/1 * * * * /path/to/silverstripe/vendor/bin/sake dev/tasks/ProcessJobQueueTask
*/15 * * * * /path/to/silverstripe/vendor/bin/sake dev/tasks/ProcessJobQueueTask queue=large
use Symbiote\QueuedJobs\Services\QueuedJobService;
$publish = new PublishItemsJob(21);
QueuedJobService::singleton()->queueJob($publish);
use SilverStripe\ORM\FieldType\DBDatetime;
use Symbiote\QueuedJobs\Services\QueuedJobService;
$publish = new PublishItemsJob(21);
QueuedJobService::singleton()
->queueJob($publish, DBDatetime::create()->setValue(DBDatetime::now()->getTimestamp() + 86400)->Rfc2822());
Doorman is included by default, and allows for asynchronous task processing.
This requires that you are running an a unix based system, or within some kind of environment
emulator such as cygwin.
In order to enable this, configure the ProcessJobQueueTask to use this backend.
In your YML set the below:
---
Name: localproject
After: '#queuedjobsettings'
---
SilverStripe\Core\Injector\Injector:
Symbiote\QueuedJobs\Services\QueuedJobService:
properties:
queueRunner: %$DoormanRunner
_config/queuedjobs.yml
file in your project with the following declaration---
Name: localproject
After: '#queuedjobsettings'
---
SilverStripe\Core\Injector\Injector:
QueueHandler:
class: Symbiote\QueuedJobs\Services\GearmanQueueHandler
php gearman/gearman_runner.php
in your SS root dirThis will cause all queuedjobs to trigger immediate via a gearman worker (src/workers/JobWorker.php
)
EXCEPT those with a StartAfter date set, for which you will STILL need the cron settings from above
QueuedJob::IMMEDIATE
jobsQueued jobs can be executed immediately (instead of being limited by cron's 1 minute interval) by using
a file based notification system. This relies on something like inotifywait to monitor a folder (by
default this is SILVERSTRIPE_CACHE_DIR/queuedjobs) and triggering the ProcessJobQueueTask as above
but passing job=$filename as the argument. An example script is in queuedjobs/scripts that will run
inotifywait and then call the ProcessJobQueueTask when a new job is ready to run.
Note - if you do NOT have this running, make sure to set QueuedJobService::$use_shutdown_function = true;
so that immediate mode jobs don't stall. By setting this to true, immediate jobs will be executed after
the request finishes as the php script ends.
Some jobs should always be either running or queued to run, things like data refreshes or periodic clean up jobs, we call these Default Jobs.
Default jobs are checked for at the end of each job queue process, using the job type and any fields in the filter to create an SQL query e.g.
ArbitraryName:
type: 'ScheduledExternalImportJob'
filter:
JobTitle: 'Scheduled import from Services'
Will become:
QueuedJobDescriptor::get()->filter([
'type' => 'ScheduledExternalImportJob',
'JobTitle' => 'Scheduled import from Services'
]);
This query is checked to see if there's at least 1 healthly (new, run, wait or paused) job matching the filter. If there's not and recreate is true in the yml config we use the construct array as params to pass to a new job object e.g:
ArbitraryName:
type: 'ScheduledExternalImportJob'
filter:
JobTitle: 'Scheduled import from Services'
recreate: 1
construct:
repeat: 300
contentItem: 100
target: 157
If the above job is missing it will be recreated as:
Injector::inst()->createWithArgs(ScheduledExternalImportJob::class, $construct[])
If you need to stop a default job from raising alerts and being recreated, set an existing copy of the job to Paused in the CMS.
Default jobs are defined in yml config the sample below covers the options and expected values
SilverStripe\Core\Injector\Injector:
Symbiote\QueuedJobs\Services\QueuedJobService:
properties:
defaultJobs:
# This key is used as the title for error logs and alert emails
ArbitraryName:
# The job type should be the class name of a job REQUIRED
type: 'ScheduledExternalImportJob'
# This plus the job type is used to create the SQL query REQUIRED
filter:
# 1 or more Fieldname: 'value' sets that will be queried on REQUIRED
# These can be valid ORM filter
JobTitle: 'Scheduled import from Services'
# Sets whether the job will be recreated or not OPTIONAL
recreate: 1
# Set the email address to send the alert to if not set site admin email is used OPTIONAL
email: '[email protected]'
# Parameters set on the recreated object OPTIONAL
construct:
# 1 or more Fieldname: 'value' sets be passed to the constructor OPTIONAL
repeat: 300
title: 'Scheduled import from Services'
# Minimal implementation will send alerts but not recreate
AnotherTitle:
type: 'AJob'
filter:
JobTitle: 'A job'
By default the CleanupJob is disabled. To enable it, set the following in your YML:
Symbiote\QueuedJobs\Jobs\CleanupJob:
is_enabled: true
You will need to trigger the first run manually in the UI. After that the CleanupJob is run once a day.
You can configure this job to clean up based on the number of jobs, or the age of the jobs. This is
configured with the cleanup_method
setting - current valid values are "age" (default) and "number".
Each of these methods will have a value associated with it - this is an integer, set with cleanup_value
.
For "age", this will be converted into days; for "number", it is the minimum number of records to keep, sorted by LastEdited.
The default value is 30, as we are expecting days.
You can determine which JobStatuses are allowed to be cleaned up. The default setting is to clean up "Broken" and "Complete" jobs. All other statuses can be configured with cleanup_statuses
. You can also define query_limit
to limit the number of rows queried/deleted by the cleanup job (defaults to 100k).
The default configuration looks like this:
Symbiote\QueuedJobs\Jobs\CleanupJob:
is_enabled: false
query_limit: 100000
cleanup_method: "age"
cleanup_value: 30
cleanup_statuses:
- Broken
- Complete
It's possible to enable a setting which allows the pausing of the queued jobs processing. To enable it, add following code to your config YAML file:
Symbiote\QueuedJobs\Services\QueuedJobService:
lock_file_enabled: true
lock_file_path: '/shared-folder-path'
Queue settings
tab will appear in the CMS settings and there will be an option to pause the queued jobs processing. If enabled, no new jobs will start running however, the jobs already running will be left to finish.
This is really useful in case of planned downtime like queue jobs related third party service maintenance or DB restore / backup operation.
Note that this maintenance lock state is stored in a file. This is intentionally not using DB as a storage as it may not be available during some maintenance operations.
Please make sure that the lock_file_path
is pointing to a folder on a shared drive in case you are running a server with multiple instances.
Jobs track their execution in steps - as the job runs it increments the "steps" that have been run. Periodically jobs
are checked to ensure they are healthy. This asserts the count of steps on a job is always increasing between health
checks. By default health checks are performed when a worker picks starts running a queue.
In a multi-worker environment this can cause issues when health checks are performed too frequently. You can disable the
automatic health check with the following configuration:
Symbiote\QueuedJobs\Services\QueuedJobService:
disable_health_check: true
In addition to the config setting there is a task that can be used with a cron to ensure that unhealthy jobs are
detected:
*/5 * * * * /path/to/silverstripe/vendor/bin/sake dev/tasks/CheckJobHealthTask
To make sure your job works, you can first try to execute the job directly outside the framework of the
queues - this can be done by manually calling the setup() and process() methods. If it works fine
under these circumstances, try having getJobType() return QueuedJob::IMMEDIATE to have execution
work immediately, without being persisted or executed via cron. If this works, next make sure your
cronjob is configured and executing correctly.
If defining your own job classes, be aware that when the job is started on the queue, the job class
is constructed without parameters being passed; this means if you accept constructor args, you
must detect whether they're present or not before using them. See this issue
and this wiki page for
more information.
If defining your own jobs, please ensure you follow PSR conventions, i.e. use "YourVendor" rather than "SilverStripe".
Ensure that notifications are configured so that you can get updates or stalled or broken jobs. You can
set the notification email address in your config as below:
SilverStripe\Control\Email\Email:
queued_job_admin_email: [email protected]
Long running jobs are running multiple times!
A long running job may fool the system into thinking it has gone away (ie the job health check fails because
currentStep
hasn't been incremented). To avoid this scenario, you can set $this->currentStep = -1
in your job's
constructor, to prevent any health checks detecting the job.
By default this task will run until either 256mb or the limit specified by php_ini('memory_limit') is reached.
NOTE: This was increased to 256MB in 4.x to handle the increase in memory usage by framework.
You can adjust this with the below config change
# Force memory limit to 256 megabytes
Symbiote\QueuedJobs\Services\QueuedJobService\QueuedJobsService:
# Accepts b, k, m, or b suffixes
memory_limit: 256m
You can also enforce a time limit for each queue, after which the task will attempt a restart to release all
resources. By default this is disabled, so you must specify this in your project as below:
# Force limit to 10 minutes
Symbiote\QueuedJobs\Services\QueuedJobService\QueuedJobsService:
time_limit: 600
ALTER TABLE `QueuedJobDescriptor` ADD INDEX ( `JobStatus` , `JobType` )
Writing units tests for queued jobs can be tricky as it's quite a complex system. Still, it can be done.
Note that you don't actually need to run your queued job via the QueuedJobService
in your unit test in most cases. Instead, you can run it directly, like this:
$job = new YourQueuedJob($someArguments);
$job->setup();
$job->process();
$this->assertTrue($job->jobFinished());
other assertions can be placed here (job side effects, job data assertions...)
setup()
needs to be run only once and process()
needs to be run as many times as needed to complete the job. This depends on your job and the job data.
Usually, process()
needs to be run once for every step
your job completes, but this may vary per job implementation. Please avoid using do while {jobFinished}
, you should always be clear on how many times the process()
runs in your test job.
If you are unsure, do a test run in your application with some logging to indicate how many times it is run.
This should cover most cases, but sometimes you need to run a job via the service. For example you may need to test if your job related extension hooks are working.
Please be sure to disable the shutdown function and the queued job handler as these two will cause you some major pain in your unit tests.
You can do this in multiple ways:
setUp()
at the start of your unit testThis is pretty easy, but it may be tedious to add this to your every unit test.
setUp()
function to itYou can now have the code in just one place, but inheritance has some limitations.
setUp()
function to it, see SilverStripe\Dev\State\TestState
Create your test state like this:
<?php
namespace App\Dev\State;
use SilverStripe\Core\Config\Config;
use SilverStripe\Core\Injector\Injector;
use SilverStripe\Dev\SapphireTest;
use SilverStripe\Dev\State\TestState;
use Symbiote\QueuedJobs\Services\QueuedJobHandler;
use Symbiote\QueuedJobs\Services\QueuedJobService;
use Symbiote\QueuedJobs\Tests\QueuedJobsTest\QueuedJobsTest_Handler;
class QueuedJobTestState implements TestState
{
public function setUp(SapphireTest $test)
{
Injector::inst()->registerService(new QueuedJobsTest_Handler(), QueuedJobHandler::class);
Config::modify()->set(QueuedJobService::class, 'use_shutdown_function', false);
}
public function tearDown(SapphireTest $test)
{
}
public function setUpOnce($class)
{
}
public function tearDownOnce($class)
{
}
}
Register your test state with Injector
like this:
SilverStripe\Core\Injector\Injector:
SilverStripe\Dev\State\SapphireTestState:
properties:
States:
queuedjobs: '%$App\Dev\State\QueuedJobTestState'
This solution is great if you want to apply this change to all of your unit tests.
Regardless of which approach you choose, the two changes that need to be inside the setUp()
function are as follows:
This will replace the standard logger with a dummy one.
Injector::inst()->registerService(new QueuedJobsTest_Handler(), QueuedJobHandler::class);
This will disable the shutdown function completely as QueuedJobService
doesn't work well with SapphireTest
.
Config::modify()->set(QueuedJobService::class, 'use_shutdown_function', false);
This is how your run a job via service in your unit tests.
$job = new YourQueuedJob($someArguments);
/** @var QueuedJobService $service */
$service = Injector::inst()->get(QueuedJobService::class);
$descriptorID = $service->queueJob($job);
$service->runJob($descriptorID);
/** @var QueuedJobDescriptor $descriptor */
$descriptor = QueuedJobDescriptor::get()->byID($descriptorID);
$this->assertNotEquals(QueuedJob::STATUS_BROKEN, $descriptor->JobStatus);
For example, this code snippet runs the job and checks if the job ended up in a non-broken state.
Translations of the natural language strings are managed through a third party translation interface, transifex.com. Newly added strings will be periodically uploaded there for translation, and any new translations will be merged back to the project source code.
Please use https://www.transifex.com/projects/p/silverstripe-queuedjobs to contribute translations, rather than sending pull requests with YAML files.
Module rating system helping users find modules that are well supported. For more on how the rating system works visit Module standards
Score not correct? Let us know there is a problem