Flask-RQ2¶
A Flask extension for RQ (Redis Queue).
This is a continuation of Flask-RQ more in spirit than in code. Many thanks to Matt Wright for the inspiration and providing the shoulders to stand on.
Installation¶
pip install Flask-RQ2
Getting started¶
To quickly start using Flask-RQ2, simply create an RQ
instance:
from flask import Flask
from flask_rq2 import RQ
app = Flask(__name__)
rq = RQ(app)
Alternatively, if you’re using the application factory pattern:
from flask_rq2 import RQ
rq = RQ()
and then later call init_app
where you create your application object:
from flask import Flask
def create_app():
app = Flask(__name__)
from yourapplication.jobs import rq
rq.init_app(app)
# more here..
return app
Decorators¶
@job
¶
A decorator to mark a function as an RQ job and to add some helpers to the function to simplify enqueuing:
from flask_rq2 import RQ
rq = RQ()
@rq.job
def add(x, y):
return x + y
Then in your app code:
job = add.queue(1, 2)
A specific queue name can also be passed as argument:
@rq.job('low', timeout=60)
def add(x, y):
return x + y
Or if you decide to use a different queue and timeout dynamically during runtime:
job2 = add.queue(3, 4, queue='high', timeout=60 * 2)
Changed in version 17.2: The queue
job function now takes a few more parameters.
See the full API docs for more information.
Some other parameters are available as well:
@rq.job('low', timeout=180, result_ttl=60 * 60, ttl=60 * 60 * 24)
def add(x, y):
return x + y
You can additionally schedule jobs to run at a certain time, after a certain timespan or by a cron-like plan:
@rq.job
def add(x, y):
return x + y
# queue job in 60 seconds
add.schedule(timedelta(seconds=60), 1, 2)
# queue job at a certain datetime (UTC!)
add.schedule(datetime(2016, 12, 31, 23, 59, 59), 1, 2)
# queue job in 14 days and then repeat once 14 days later
add.schedule(timedelta(days=14), 1, 2, repeat=1)
# queue job in 12 hours with a different queue
add.schedule(timedelta(hours=12), 1, 2, queue='high', timeout=60 * 2)
# queue job every day at noon (UTC!)
add.cron('0 0 12 * * ?', 'add-one-two', 1, 2)
# queue job every minute with a different queue
add.cron('* * * * *', 'add-one-two', 1, 2, queue='high', timeout=55)
Changed in version 17.2: The schedule
and cron
functions now take a few more parameters.
See the full API docs for more information.
See the full API docs for more information about the job functions.
@exception_handler
¶
An optional decorator for custom exception handlers that the RQ worker should call when catching exceptions during job execution.
from flask_rq2 import RQ
rq = RQ()
@rq.exception_handler
def send_alert_to_ops(job, *exc_info):
# call other code to send alert to OPs team
The exception handler will automatically be used when running the worker
from the get_worker
method or the CLI integration.
RQ backends¶
There are a few useful methods to fetch RQ backend objects for advanced patterns.
They will use the same Flask config values as the decorators and CLI integration and should be used instead of rq’s own functions with the same name.
get_queue
¶
Returns default queue or specific queue for name given as argument:
from flask_rq2 import RQ
rq = RQ()
default_queue = rq.get_queue()
low_queue = rq.get_queue('low')
easy_job = default_queue.enqueue(add, args=(1, 2))
hard_job = low_queue.enqueue(add, args=(1e100, 2e200))
get_worker
¶
Returns a worker for default queue or specific queues for names given as arguments:
from flask_rq2 import RQ
rq = RQ()
# Creates a worker that handle jobs in ``default`` queue.
default_worker = rq.get_worker()
default_worker.work(burst=True)
# Creates a worker that handle jobs in both ``simple`` and ``low`` queues.
low_n_simple_worker = rq.get_worker('low', 'simple')
low_n_simple_worker.work(burst=True)
get_scheduler
¶
Returns an RQ Scheduler instance for periodically enqueuing jobs:
from flask_rq2 import RQ
rq = RQ()
# check every 10 seconds if there are any jobs to enqueue
scheduler = rq.get_scheduler(interval=10)
scheduler.run()
Changed in version 17.2: The get_scheduler
method now takes an optional queue
parameter
to override the default scheduler queue.
CLI support¶
Flask-RQ2 only supports the Click based CLI feature in Flask >= 0.11. If you’re using Flask < 0.10 you’ll need to install a backport package called Flask-CLI as well, e.g.:
pip install Flask-CLI
Or install it in one go:
pip install Flask-RQ2[cli]
Please call flask rq --help
for more information, assuming
you’ve set the FLASK_APP
environment variable to the Flask app path.
Commands¶
There isn’t an official overview of CLI commands in the RQ documentation, but these are the commands that Flask-RQ2 support.
worker
– Starts an RQ worker (required to run jobs).scheduler
– Starts an RQ Scheduler (optional for scheduled jobs).info
– Shows an RQ command-line monitor.empty
– Empty the given RQ queues.requeue
– Requeues failed jobs.suspend
– Suspends all workers.resume
– Resumes all workers.
Please call each command with the --help
option to learn more about their
required and optional paramaters.
Unit Testing¶
To use Flask-RQ2 in unit tests, set the variable RQ_CONNECTION_CLASS = 'fakeredis.FakeStrictRedis'
. The fakeredis module is required and can be installed with pip install fakeredis
. Excerpts of how this can be integrated into a Flask app are shown below.
# config.py
class Config:
APP_NAME = os.environ.get('APP_NAME')
RQ_CONNECTION_CLASS = 'fakeredis.FakeStrictRedis'
# app/__init__.py
rq = RQ()
def create_app(config_name):
app = Flask(__name__)
app.config.from_object(config[config_name])
rq.init_app(app)
Configuration¶
RQ_REDIS_URL
¶
The URL to pass to the from_url()
method of the
redis-py’s connection class as defind in RQ_CONNECTION_CLASS
.
Defaults to connecting to the local Redis instance, database 0.
app.config['RQ_REDIS_URL'] = 'redis://localhost:6379/0'
RQ_CONNECTION_CLASS
¶
The dotted import path to the redis-py client class to connect to the Redis
server using the RQ_REDIS_URL
configuration value.
app.config['RQ_CONNECTION_CLASS'] = 'myproject.backends.MyStrictRedis'
Defaults to 'redis.StrictRedis'
.
RQ_QUEUES
¶
The default queues that the worker and CLI commands (empty
, info
and
worker
) act on by default.
app.config['RQ_QUEUES'] = ['default']
RQ_ASYNC
¶
Whether or not to run jobs asynchronously. Defaults to True
meaning
that jobs only run when they are processed by the workers.
app.config['RQ_ASYNC'] = True
Set to False
to run jobs immediatedly upon enqueuing in-process.
This may be useful for testing purposes or other constrained environments.
This is the main switch, use with discretion.
RQ_QUEUE_CLASS
¶
New in version 17.1.
The dotted import path of the queue class to enqueue jobs with.
app.config['RQ_QUEUE_CLASS'] = 'myproject.queue.MyQueue'
Defaults to 'rq.queue.Queue'
.
RQ_WORKER_CLASS
¶
New in version 17.1.
The dotted import path of the worker class to process jobs with.
app.config['RQ_WORKER_CLASS'] = 'myproject.worker.MyWorker'
Defaults to 'rq.worker.Worker'
.
RQ_JOB_CLASS
¶
New in version 17.1.
The dotted import path of the job class for RQ jobs.
Note
This must be a subclass of flask_rq2.job.FlaskJob
or
otherwise Flask-RQ2 won’t work.
app.config['RQ_JOB_CLASS'] = 'myproject.job.MyFlaskJob'
Defaults to 'flask_rq2.job.FlaskJob'
.
RQ_SCHEDULER_CLASS
¶
New in version 17.1.
The dotted import path of the scheduler class to enqueue scheduled jobs with.
app.config['RQ_SCHEDULER_CLASS'] = 'myproject.scheduler.MyScheduler'
Defaults to 'rq_scheduler.Scheduler'
.
RQ_SCHEDULER_QUEUE
¶
The name of the queue to enqueue scheduled jobs in.
app.config['RQ_SCHEDULER_QUEUE'] = 'scheduled'
Defaults to 'default'
.
RQ_SCHEDULER_INTERVAL
¶
The default interval the RQ Scheduler checks for jobs to enqueue, in seconds.
app.config['RQ_SCHEDULER_INTERVAL'] = 1
Defaults to 60
.
Changelog¶
18.3 (2018-12-20)¶
- IMPORTANT! Reqires redis-py >= 3.0 since RQ and rq-scheduler have switched to that requirement. Please upgrade as soon as possible.
18.2.2 (2018-12-20)¶
- Last release to support redis-py < 3.0.0! Fixes version incompatibility with rq-scheduler. Requires rq-scheduler < 0.9.0.
18.2, 18.2.1 (2018-11-29)¶
Requires redis-py < 3.0.0 as long as RQ hasn’t been made compatible to that version. Please don’t update redis-py to 3.x yet, it will break using RQ.
More infos:
Require rq < 0.13.0 to cater to a possible Redis 3.0.0 compatible version.
18.1 (2018-09-19)¶
- Requires rq >= 0.12.0 and rq-scheduler >= 0.8.3 now.
- Fixes imcompatibility with the new rq 0.12.0 release with which the
flask rq worker
command would raise an error because of changes in handling of theworker_ttl
parameter defaults. - Added support for Python 3.7. Since ‘async’ is a keyword in Python 3.7, RQ(async=True) has been changed to RQ(is_async=True). The async keyword argument will still work, but raises a DeprecationWarning.
- Documentation fixes.
18.0 (2018-03-02)¶
The project has been moved to the official RQ GitHub organization!
New URL: https://github.com/rq/flask-rq2
Stop monkey-patching the scheduler module since rq-scheduler gained the ability to use custom job classes.
Requires rq-scheduler 0.8.2 or higher.
Adds depends_on, at_front, meta and description parameters to job decorator.
Requires rq==0.10.0 or higher.
Minor fixes for test infrastructure.
17.2 (2017-12-05)¶
- Allow dynamically setting timeout, result TTL and job TTL and other parameters when enqueuing, scheduling or adding as a cron job.
17.1 (2017-12-05)¶
- Require Flask >= 0.10, but it’s recommended to use at least 0.11.
- Require rq 0.8.0 or later and rq-scheduler 0.7.0 or later.
- Require setting
FLASK_APP
environment variable to load Flask app during job performing. - Add
RQ_SCHEDULER_CLASS
,RQ_WORKER_CLASS
,RQ_JOB_CLASS
andRQ_QUEUE_CLASS
as configuration values. - Add support for rq-scheduler’s
--burst
option to automatically quit after all work is done. - Drop support for Flask-Script in favor of native Flask CLI support (or via Flask-CLI app for Flask < 0.11).
- Drop support for Python 3.4.
- Allow setting the queue dynamically when enqueuing, scheduling or adding as a cron job.
- Handle the result_ttl and queue_name job overrides better.
- Actually respect the
RQ_SCHEDULER_INTERVAL
config value. - Move
flask_rq2.helpers
module toflask_rq2.functions
. - Use a central Redis client and require app initialization before connecting.
You’ll have to run
RQ.init_app
before you can queue or schedule a job from now on.
17.0 (2017-02-15)¶
- Pin the rq version Flask-RQ2 depends on to >=0.6.0,<0.7.0 for now. A bigger refactor will follow shortly that fixes those problems better.
- Allow overriding the default_timeout in case of using the factory pattern.
- Run tests on Python 3.6.
16.1.1 (2016-09-08)¶
- Fix typos in docs.
16.1 (2016-09-08)¶
- Official Support for Flask >= 0.11
- Fix import paths to stop using
flask.ext
prefix.
16.0.2 (2016-05-20)¶
- Fix package description.
16.0.1 (2016-05-20)¶
- Make wheel file universal.
16.0 (2016-05-20)¶
- Initial release.
Other content¶
API¶
flask_rq2.app¶
The core interface of Flask-RQ2.
-
class
flask_rq2.app.
RQ
(app=None, default_timeout=None, is_async=None, **kwargs)¶ The main RQ object to be used in user apps.
-
__init__
(app=None, default_timeout=None, is_async=None, **kwargs)¶ Initialize the RQ interface.
Parameters: - app (
flask.Flask
) – Flask application - default_timeout (int) – The default timeout in seconds to use for jobs, defaults to RQ’s default of 180 seconds per job
- is_async (bool) – Whether or not to run jobs asynchronously or
in-process, defaults to
True
- app (
-
default_queue
= 'default'¶ Name of the default queue.
-
default_result_ttl
= 500¶ The fallback default result TTL.
New in version 17.1.
-
redis_url
= 'redis://localhost:6379/0'¶ The DSN (URL) of the Redis connection.
Changed in version 17.1: Renamed from
url
toredis_url
.
-
connection_class
= 'redis.StrictRedis'¶ The Redis client class to use.
New in version 17.1.
-
queues
= ['default']¶ List of queue names for RQ to work on.
-
queue_class
= 'rq.queue.Queue'¶ Dotted import path to RQ Queue class to use as base class.
Changed in version 17.1: Renamed from
queue_path
toqueue_class
.
-
worker_class
= 'rq.worker.Worker'¶ Dotted import path to RQ Workers class to use as base class.
Changed in version 17.1: Renamed from
worker_path
toworker_class
.
-
job_class
= 'flask_rq2.job.FlaskJob'¶ Dotted import path to RQ Job class to use as base class.
Changed in version 17.1: Renamed from
job_path
tojob_class
.
-
scheduler_class
= 'flask_rq2.scheduler.FlaskScheduler'¶ Dotted import path to RQ Scheduler class.
Changed in version 17.1: Renamed from
scheduler_path
toscheduler_class
.Changed in version 18.0: Changed to use own scheduler class.
-
scheduler_queue
= 'default'¶ Name of RQ queue to schedule jobs in by rq-scheduler.
-
scheduler_interval
= 60¶ Time in seconds the scheduler checks for scheduled jobs periodicically.
-
functions_class
= 'flask_rq2.functions.JobFunctions'¶ The default job functions class.
Changed in version 17.1: Renamed from
functions_path
tofunctions_class
. Moved fromflask_rq2.helpers.JobFunctions
to
-
default_timeout
= 180¶ The fallback default timeout value.
-
init_app
(app)¶ Initialize the app, e.g. can be used if factory pattern is used.
-
init_cli
(app)¶ Initialize the Flask CLI support in case it was enabled for the app.
Works with both Flask>=1.0’s CLI support as well as the backport in the Flask-CLI package for Flask<1.0.
-
exception_handler
(callback)¶ Decorator to add an exception handler to the worker, e.g.:
rq = RQ() @rq.exception_handler def my_custom_handler(job, *exc_info): # do custom things here ...
-
job
(func_or_queue=None, timeout=None, result_ttl=None, ttl=None, depends_on=None, at_front=None, meta=None, description=None)¶ Decorator to mark functions for queuing via RQ, e.g.:
rq = RQ() @rq.job def add(x, y): return x + y
or:
@rq.job(timeout=60, result_ttl=60 * 60) def add(x, y): return x + y
Adds various functions to the job as documented in
JobFunctions
.Changed in version 18.0: Adds
depends_on
,at_front
,meta
anddescription
parameters.Parameters: - queue (str) – Name of the queue to add job to, defaults to
flask_rq2.app.RQ.default_queue
. - timeout (int) – The maximum runtime in seconds of the job before it’s considered ‘lost’, defaults to 180.
- result_ttl (int) – Time to persist the job results in Redis, in seconds.
- ttl (int) – The maximum queued time of the job before it’ll be cancelled.
- depends_on (FlaskJob or str) – A job instance or id that the new job depends on.
- at_front (bool) – Whether or not the job is queued in front of all other enqueued jobs.
- meta (dict) – Additional meta data about the job.
- description (str) – Description of the job.
- queue (str) – Name of the queue to add job to, defaults to
-
get_scheduler
(interval=None, queue=None)¶ When installed returns a
rq_scheduler.Scheduler
instance to schedule job execution, e.g.:scheduler = rq.get_scheduler(interval=10)
Parameters:
-
get_queue
(name=None)¶ Returns an RQ queue instance with the given name, e.g.:
default_queue = rq.get_queue() low_queue = rq.get_queue('low')
Parameters: name (str) – Name of the queue to return, defaults to default_queue
.Returns: An RQ queue instance. Return type: rq.queue.Queue
-
get_worker
(*queues)¶ Returns an RQ worker instance for the given queue names, e.g.:
configured_worker = rq.get_worker() default_worker = rq.get_worker('default') default_low_worker = rq.get_worker('default', 'low')
Parameters: *queues – Names of queues the worker should act on, falls back to the configured queues.
-
flask_rq2.cli¶
Support for the Click based Flask CLI via Flask-CLI.
-
flask_rq2.cli.
empty
(*args, **kwargs)¶ Empty given queues.
-
flask_rq2.cli.
info
(*args, **kwargs)¶ RQ command-line monitor.
-
flask_rq2.cli.
requeue
(*args, **kwargs)¶ Requeue failed jobs.
-
flask_rq2.cli.
resume
(*args, **kwargs)¶ Resumes all workers.
-
flask_rq2.cli.
scheduler
(*args, **kwargs)¶ Periodically checks for scheduled jobs.
Default class options to pass to the CLI commands.
-
flask_rq2.cli.
suspend
(*args, **kwargs)¶ Suspends all workers.
-
flask_rq2.cli.
worker
(*args, **kwargs)¶ Starts an RQ worker.
flask_rq2.functions¶
-
class
flask_rq2.functions.
JobFunctions
(rq, wrapped, queue_name, timeout, result_ttl, ttl, depends_on, at_front, meta, description)¶ Some helper functions that are added to a function decorated with a
job()
decorator.-
functions
= ['queue', 'schedule', 'cron']¶ the methods to add to jobs automatically
-
queue
(*args, **kwargs)¶ A function to queue a RQ job, e.g.:
@rq.job(timeout=60) def add(x, y): return x + y add.queue(1, 2, timeout=30)
Parameters: - *args – The positional arguments to pass to the queued job.
- **kwargs – The keyword arguments to pass to the queued job.
- queue (str) – Name of the queue to queue in, defaults to
queue of of job or
default_queue
. - timeout (int) – The job timeout in seconds.
If not provided uses the job’s timeout or
default_timeout
. - description (str) – Description of the job.
- result_ttl (int) – The result TTL in seconds. If not provided
uses the job’s result TTL or
default_result_ttl
. - ttl (int) – The job TTL in seconds. If not provided uses the job’s TTL or no TTL at all.
- depends_on (FlaskJob or str) – A job instance or id that the new job depends on.
- job_id (str) – A custom ID for the new job. Defaults to an
UUID
. - at_front (bool) – Whether or not the job is queued in front of all other enqueued jobs.
- meta (dict) – Additional meta data about the job.
Returns: An RQ job instance.
Return type:
-
schedule
(time_or_delta, *args, **kwargs)¶ A function to schedule running a RQ job at a given time or after a given timespan:
@rq.job def add(x, y): return x + y add.schedule(timedelta(hours=2), 1, 2, timeout=10) add.schedule(datetime(2016, 12, 31, 23, 59, 59), 1, 2) add.schedule(timedelta(days=14), 1, 2, repeat=1)
Parameters: - *args – The positional arguments to pass to the queued job.
- **kwargs – The keyword arguments to pass to the queued job.
- queue (str) – Name of the queue to queue in, defaults to
queue of of job or
default_queue
. - timeout (int) – The job timeout in seconds.
If not provided uses the job’s timeout or
default_timeout
. - description (str) – Description of the job.
- result_ttl (int) – The result TTL in seconds. If not provided
uses the job’s result TTL or
default_result_ttl
. - ttl (int) – The job TTL in seconds. If not provided uses the job’s TTL or no TTL at all.
- repeat (int) – The number of times the job needs to be repeatedly
queued. Requires setting the
interval
parameter. - interval (int) – The interval of repetition as defined by the
repeat
parameter in seconds. - job_id (str) – A custom ID for the new job. Defaults to a UUID.
Returns: An RQ job instance.
Return type:
-
cron
(pattern, name, *args, **kwargs)¶ A function to setup a RQ job as a cronjob:
@rq.job('low', timeout=60) def add(x, y): return x + y add.cron('* * * * *', 'add-some-numbers', 1, 2, timeout=10)
Parameters: - *args – The positional arguments to pass to the queued job.
- **kwargs – The keyword arguments to pass to the queued job.
- pattern (str) – A Crontab pattern.
- name (str) – The name of the cronjob.
- queue (str) – Name of the queue to queue in, defaults to
queue of of job or
default_queue
. - timeout (int) – The job timeout in seconds.
If not provided uses the job’s timeout or
default_timeout
. - description (str) – Description of the job.
- repeat (int) – The number of times the job needs to be repeatedly queued via the cronjob. Take care only using this for cronjob that don’t already repeat themselves natively due to their crontab.
Returns: An RQ job instance.
Return type:
-