class Worker (View source)

Traits

Constants

EXIT_SUCCESS

EXIT_ERROR

EXIT_MEMORY_LIMIT

Properties

protected string $name

The name of the worker.

protected Factory $manager

The queue manager instance.

protected Dispatcher $events

The event dispatcher instance.

protected Repository $cache

The cache repository implementation.

protected ExceptionHandler $exceptions

The exception handler instance.

protected callable $isDownForMaintenance

The callback used to determine if the application is in maintenance mode.

protected callable $resetScope

The callback used to reset the application's scope.

bool $shouldQuit

Indicates if the worker should exit.

bool $paused

Indicates if the worker is paused.

static protected callable[] $popCallbacks

The callbacks used to pop jobs from queues.

Methods

bool
causedByLostConnection(Throwable $e)

Determine if the given exception was caused by a lost connection.

void
__construct(Factory $manager, Dispatcher $events, ExceptionHandler $exceptions, callable $isDownForMaintenance, callable $resetScope = null)

Create a new queue worker.

int
daemon(string $connectionName, string $queue, WorkerOptions $options)

Listen to the given queue in a loop.

void
registerTimeoutHandler(Job|null $job, WorkerOptions $options)

Register the worker timeout handler.

void
resetTimeoutHandler()

Reset the worker timeout handler.

int
timeoutForJob(Job|null $job, WorkerOptions $options)

Get the appropriate timeout for the given job.

bool
daemonShouldRun(WorkerOptions $options, string $connectionName, string $queue)

Determine if the daemon should process on this iteration.

int|null
pauseWorker(WorkerOptions $options, int $lastRestart)

Pause the worker for the current loop.

int|null
stopIfNecessary(WorkerOptions $options, int $lastRestart, int $startTime = 0, int $jobsProcessed = 0, mixed $job = null)

Determine the exit code to stop the process if necessary.

void
runNextJob(string $connectionName, string $queue, WorkerOptions $options)

Process the next job on the queue.

Job|null
getNextJob(Queue $connection, string $queue)

Get the next job from the queue connection.

void
runJob(Job $job, string $connectionName, WorkerOptions $options)

Process the given job.

void
stopWorkerIfLostConnection(Throwable $e)

Stop the worker if we have lost connection to a database.

void
process(string $connectionName, Job $job, WorkerOptions $options)

Process the given job from the queue.

void
handleJobException(string $connectionName, Job $job, WorkerOptions $options, Throwable $e)

Handle an exception that occurred while the job was running.

void
markJobAsFailedIfAlreadyExceedsMaxAttempts(string $connectionName, Job $job, int $maxTries)

Mark the given job as failed if it has exceeded the maximum allowed attempts.

void
markJobAsFailedIfWillExceedMaxAttempts(string $connectionName, Job $job, int $maxTries, Throwable $e)

Mark the given job as failed if it has exceeded the maximum allowed attempts.

void
markJobAsFailedIfWillExceedMaxExceptions(string $connectionName, Job $job, Throwable $e)

Mark the given job as failed if it has exceeded the maximum allowed attempts.

void
markJobAsFailedIfItShouldFailOnTimeout(string $connectionName, Job $job, Throwable $e)

Mark the given job as failed if it should fail on timeouts.

void
failJob(Job $job, Throwable $e)

Mark the given job as failed and raise the relevant event.

int
calculateBackoff(Job $job, WorkerOptions $options)

Calculate the backoff for the given job.

void
raiseBeforeJobEvent(string $connectionName, Job $job)

Raise the before queue job event.

void
raiseAfterJobEvent(string $connectionName, Job $job)

Raise the after queue job event.

void
raiseExceptionOccurredJobEvent(string $connectionName, Job $job, Throwable $e)

Raise the exception occurred queue job event.

bool
queueShouldRestart(int|null $lastRestart)

Determine if the queue worker should restart.

int|null
getTimestampOfLastQueueRestart()

Get the last queue restart timestamp, or null.

void
listenForSignals()

Enable async signals for the process.

bool
supportsAsyncSignals()

Determine if "async" signals are supported.

bool
memoryExceeded(int $memoryLimit)

Determine if the memory limit has been exceeded.

int
stop(int $status = 0)

Stop listening and bail out of the script.

never
kill(int $status = 0)

Kill the process.

maxAttemptsExceededException(Job $job)

Create an instance of MaxAttemptsExceededException.

void
sleep(int|float $seconds)

Sleep the script for a given number of seconds.

$this
setCache(Repository $cache)

Set the cache repository implementation.

$this
setName(string $name)

Set the name of the worker.

static void
popUsing(string $workerName, callable $callback)

Register a callback to be executed to pick jobs.

getManager()

Get the queue manager instance.

void
setManager(Factory $manager)

Set the queue manager instance.

Details

protected bool causedByLostConnection(Throwable $e)

Determine if the given exception was caused by a lost connection.

Parameters

Throwable $e

Return Value

bool

void __construct(Factory $manager, Dispatcher $events, ExceptionHandler $exceptions, callable $isDownForMaintenance, callable $resetScope = null)

Create a new queue worker.

Parameters

Factory $manager
Dispatcher $events
ExceptionHandler $exceptions
callable $isDownForMaintenance
callable $resetScope

Return Value

void

int daemon(string $connectionName, string $queue, WorkerOptions $options)

Listen to the given queue in a loop.

Parameters

string $connectionName
string $queue
WorkerOptions $options

Return Value

int

protected void registerTimeoutHandler(Job|null $job, WorkerOptions $options)

Register the worker timeout handler.

Parameters

Job|null $job
WorkerOptions $options

Return Value

void

protected void resetTimeoutHandler()

Reset the worker timeout handler.

Return Value

void

protected int timeoutForJob(Job|null $job, WorkerOptions $options)

Get the appropriate timeout for the given job.

Parameters

Job|null $job
WorkerOptions $options

Return Value

int

protected bool daemonShouldRun(WorkerOptions $options, string $connectionName, string $queue)

Determine if the daemon should process on this iteration.

Parameters

WorkerOptions $options
string $connectionName
string $queue

Return Value

bool

protected int|null pauseWorker(WorkerOptions $options, int $lastRestart)

Pause the worker for the current loop.

Parameters

WorkerOptions $options
int $lastRestart

Return Value

int|null

protected int|null stopIfNecessary(WorkerOptions $options, int $lastRestart, int $startTime = 0, int $jobsProcessed = 0, mixed $job = null)

Determine the exit code to stop the process if necessary.

Parameters

WorkerOptions $options
int $lastRestart
int $startTime
int $jobsProcessed
mixed $job

Return Value

int|null

void runNextJob(string $connectionName, string $queue, WorkerOptions $options)

Process the next job on the queue.

Parameters

string $connectionName
string $queue
WorkerOptions $options

Return Value

void

protected Job|null getNextJob(Queue $connection, string $queue)

Get the next job from the queue connection.

Parameters

Queue $connection
string $queue

Return Value

Job|null

protected void runJob(Job $job, string $connectionName, WorkerOptions $options)

Process the given job.

Parameters

Job $job
string $connectionName
WorkerOptions $options

Return Value

void

protected void stopWorkerIfLostConnection(Throwable $e)

Stop the worker if we have lost connection to a database.

Parameters

Throwable $e

Return Value

void

void process(string $connectionName, Job $job, WorkerOptions $options)

Process the given job from the queue.

Parameters

string $connectionName
Job $job
WorkerOptions $options

Return Value

void

Exceptions

Throwable

protected void handleJobException(string $connectionName, Job $job, WorkerOptions $options, Throwable $e)

Handle an exception that occurred while the job was running.

Parameters

string $connectionName
Job $job
WorkerOptions $options
Throwable $e

Return Value

void

Exceptions

Throwable

protected void markJobAsFailedIfAlreadyExceedsMaxAttempts(string $connectionName, Job $job, int $maxTries)

Mark the given job as failed if it has exceeded the maximum allowed attempts.

This will likely be because the job previously exceeded a timeout.

Parameters

string $connectionName
Job $job
int $maxTries

Return Value

void

Exceptions

Throwable

protected void markJobAsFailedIfWillExceedMaxAttempts(string $connectionName, Job $job, int $maxTries, Throwable $e)

Mark the given job as failed if it has exceeded the maximum allowed attempts.

Parameters

string $connectionName
Job $job
int $maxTries
Throwable $e

Return Value

void

protected void markJobAsFailedIfWillExceedMaxExceptions(string $connectionName, Job $job, Throwable $e)

Mark the given job as failed if it has exceeded the maximum allowed attempts.

Parameters

string $connectionName
Job $job
Throwable $e

Return Value

void

protected void markJobAsFailedIfItShouldFailOnTimeout(string $connectionName, Job $job, Throwable $e)

Mark the given job as failed if it should fail on timeouts.

Parameters

string $connectionName
Job $job
Throwable $e

Return Value

void

protected void failJob(Job $job, Throwable $e)

Mark the given job as failed and raise the relevant event.

Parameters

Job $job
Throwable $e

Return Value

void

protected int calculateBackoff(Job $job, WorkerOptions $options)

Calculate the backoff for the given job.

Parameters

Job $job
WorkerOptions $options

Return Value

int

protected void raiseBeforeJobEvent(string $connectionName, Job $job)

Raise the before queue job event.

Parameters

string $connectionName
Job $job

Return Value

void

protected void raiseAfterJobEvent(string $connectionName, Job $job)

Raise the after queue job event.

Parameters

string $connectionName
Job $job

Return Value

void

protected void raiseExceptionOccurredJobEvent(string $connectionName, Job $job, Throwable $e)

Raise the exception occurred queue job event.

Parameters

string $connectionName
Job $job
Throwable $e

Return Value

void

protected bool queueShouldRestart(int|null $lastRestart)

Determine if the queue worker should restart.

Parameters

int|null $lastRestart

Return Value

bool

protected int|null getTimestampOfLastQueueRestart()

Get the last queue restart timestamp, or null.

Return Value

int|null

protected void listenForSignals()

Enable async signals for the process.

Return Value

void

protected bool supportsAsyncSignals()

Determine if "async" signals are supported.

Return Value

bool

bool memoryExceeded(int $memoryLimit)

Determine if the memory limit has been exceeded.

Parameters

int $memoryLimit

Return Value

bool

int stop(int $status = 0)

Stop listening and bail out of the script.

Parameters

int $status

Return Value

int

never kill(int $status = 0)

Kill the process.

Parameters

int $status

Return Value

never

protected MaxAttemptsExceededException maxAttemptsExceededException(Job $job)

Create an instance of MaxAttemptsExceededException.

Parameters

Job $job

Return Value

MaxAttemptsExceededException

void sleep(int|float $seconds)

Sleep the script for a given number of seconds.

Parameters

int|float $seconds

Return Value

void

$this setCache(Repository $cache)

Set the cache repository implementation.

Parameters

Repository $cache

Return Value

$this

$this setName(string $name)

Set the name of the worker.

Parameters

string $name

Return Value

$this

static void popUsing(string $workerName, callable $callback)

Register a callback to be executed to pick jobs.

Parameters

string $workerName
callable $callback

Return Value

void

QueueManager getManager()

Get the queue manager instance.

Return Value

QueueManager

void setManager(Factory $manager)

Set the queue manager instance.

Parameters

Factory $manager

Return Value

void