Flask-RQ2

A Flask extension for RQ (Redis Queue).

This is a continuation of Flask-RQ more in spirit than in code. Many thanks to Matt Wright for the inspiration and providing the shoulders to stand on.

Installation

pip install Flask-RQ2

Getting started

To quickly start using Flask-RQ2, simply create an RQ instance:

from flask import Flask
from flask_rq2 import RQ

app = Flask(__name__)
rq = RQ(app)

Alternatively, if you’re using the application factory pattern:

from flask_rq2 import RQ
rq = RQ()

and then later call init_app where you create your application object:

from flask import Flask

def create_app():
    app = Flask(__name__)

    from yourapplication.jobs import rq
    rq.init_app(app)

    # more here..
    return app

Decorators

@job

A decorator to mark a function as an RQ job and to add some helpers to the function to simplify enqueuing:

from flask_rq2 import RQ

rq = RQ()

@rq.job
def add(x, y):
    return x + y

Then in your app code:

job = add.queue(1, 2)

A specific queue name can also be passed as argument:

@rq.job('low', timeout=60)
def add(x, y):
    return x + y

Or if you decide to use a different queue and timeout dynamically during runtime:

job2 = add.queue(3, 4, queue='high', timeout=60 * 2)

Changed in version 17.2: The queue job function now takes a few more parameters. See the full API docs for more information.

Some other parameters are available as well:

@rq.job('low', timeout=180, result_ttl=60 * 60, ttl=60 * 60 * 24)
def add(x, y):
    return x + y

You can additionally schedule jobs to run at a certain time, after a certain timespan or by a cron-like plan:

@rq.job
def add(x, y):
    return x + y

# queue job in 60 seconds
add.schedule(timedelta(seconds=60), 1, 2)

# queue job at a certain datetime (UTC!)
add.schedule(datetime(2016, 12, 31, 23, 59, 59), 1, 2)

# queue job in 14 days and then repeat once 14 days later
add.schedule(timedelta(days=14), 1, 2, repeat=1)

# queue job in 12 hours with a different queue
add.schedule(timedelta(hours=12), 1, 2, queue='high', timeout=60 * 2)

# queue job every day at noon (UTC!)
add.cron('0 0 12 * * ?', 'add-one-two', 1, 2)

# queue job every minute with a different queue
add.cron('* * * * *', 'add-one-two', 1, 2, queue='high', timeout=55)

Changed in version 17.2: The schedule and cron functions now take a few more parameters. See the full API docs for more information.

See the full API docs for more information about the job functions.

@exception_handler

An optional decorator for custom exception handlers that the RQ worker should call when catching exceptions during job execution.

from flask_rq2 import RQ

rq = RQ()

@rq.exception_handler
def send_alert_to_ops(job, *exc_info):
    # call other code to send alert to OPs team

The exception handler will automatically be used when running the worker from the get_worker method or the CLI integration.

RQ backends

There are a few useful methods to fetch RQ backend objects for advanced patterns.

They will use the same Flask config values as the decorators and CLI integration and should be used instead of rq’s own functions with the same name.

get_queue

Returns default queue or specific queue for name given as argument:

from flask_rq2 import RQ

rq = RQ()

default_queue = rq.get_queue()
low_queue = rq.get_queue('low')

easy_job = default_queue.enqueue(add, args=(1, 2))
hard_job = low_queue.enqueue(add, args=(1e100, 2e200))

get_worker

Returns a worker for default queue or specific queues for names given as arguments:

from flask_rq2 import RQ

rq = RQ()

# Creates a worker that handle jobs in ``default`` queue.
default_worker = rq.get_worker()
default_worker.work(burst=True)

# Creates a worker that handle jobs in both ``simple`` and ``low`` queues.
low_n_simple_worker = rq.get_worker('low', 'simple')
low_n_simple_worker.work(burst=True)

get_scheduler

Returns an RQ Scheduler instance for periodically enqueuing jobs:

from flask_rq2 import RQ

rq = RQ()

# check every 10 seconds if there are any jobs to enqueue
scheduler = rq.get_scheduler(interval=10)
scheduler.run()

Changed in version 17.2: The get_scheduler method now takes an optional queue parameter to override the default scheduler queue.

CLI support

Flask-RQ2 only supports the Click based CLI feature in Flask >= 0.11. If you’re using Flask < 0.10 you’ll need to install a backport package called Flask-CLI as well, e.g.:

pip install Flask-CLI

Or install it in one go:

pip install Flask-RQ2[cli]

Please call flask rq --help for more information, assuming you’ve set the FLASK_APP environment variable to the Flask app path.

Commands

There isn’t an official overview of CLI commands in the RQ documentation, but these are the commands that Flask-RQ2 support.

Please call each command with the --help option to learn more about their required and optional paramaters.

Unit Testing

To use Flask-RQ2 in unit tests, set the variable RQ_CONNECTION_CLASS = 'fakeredis.FakeStrictRedis'. The fakeredis module is required and can be installed with pip install fakeredis. Excerpts of how this can be integrated into a Flask app are shown below.

# config.py

class Config:
    APP_NAME = os.environ.get('APP_NAME')
    RQ_CONNECTION_CLASS = 'fakeredis.FakeStrictRedis'
# app/__init__.py

rq = RQ()

def create_app(config_name):
    app = Flask(__name__)
    app.config.from_object(config[config_name])
    rq.init_app(app)

Configuration

RQ_REDIS_URL

The URL to pass to the from_url() method of the redis-py’s connection class as defind in RQ_CONNECTION_CLASS. Defaults to connecting to the local Redis instance, database 0.

app.config['RQ_REDIS_URL'] = 'redis://localhost:6379/0'

RQ_CONNECTION_CLASS

The dotted import path to the redis-py client class to connect to the Redis server using the RQ_REDIS_URL configuration value.

app.config['RQ_CONNECTION_CLASS'] = 'myproject.backends.MyStrictRedis'

Defaults to 'redis.StrictRedis'.

RQ_QUEUES

The default queues that the worker and CLI commands (empty, info and worker) act on by default.

app.config['RQ_QUEUES'] = ['default']

RQ_ASYNC

Whether or not to run jobs asynchronously. Defaults to True meaning that jobs only run when they are processed by the workers.

app.config['RQ_ASYNC'] = True

Set to False to run jobs immediatedly upon enqueuing in-process. This may be useful for testing purposes or other constrained environments. This is the main switch, use with discretion.

RQ_QUEUE_CLASS

New in version 17.1.

The dotted import path of the queue class to enqueue jobs with.

app.config['RQ_QUEUE_CLASS'] = 'myproject.queue.MyQueue'

Defaults to 'rq.queue.Queue'.

RQ_WORKER_CLASS

New in version 17.1.

The dotted import path of the worker class to process jobs with.

app.config['RQ_WORKER_CLASS'] = 'myproject.worker.MyWorker'

Defaults to 'rq.worker.Worker'.

RQ_JOB_CLASS

New in version 17.1.

The dotted import path of the job class for RQ jobs.

Note

This must be a subclass of flask_rq2.job.FlaskJob or otherwise Flask-RQ2 won’t work.

app.config['RQ_JOB_CLASS'] = 'myproject.job.MyFlaskJob'

Defaults to 'flask_rq2.job.FlaskJob'.

RQ_SCHEDULER_CLASS

New in version 17.1.

The dotted import path of the scheduler class to enqueue scheduled jobs with.

app.config['RQ_SCHEDULER_CLASS'] = 'myproject.scheduler.MyScheduler'

Defaults to 'rq_scheduler.Scheduler'.

RQ_SCHEDULER_QUEUE

The name of the queue to enqueue scheduled jobs in.

app.config['RQ_SCHEDULER_QUEUE'] = 'scheduled'

Defaults to 'default'.

RQ_SCHEDULER_INTERVAL

The default interval the RQ Scheduler checks for jobs to enqueue, in seconds.

app.config['RQ_SCHEDULER_INTERVAL'] = 1

Defaults to 60.

Changelog

18.3 (2018-12-20)

  • IMPORTANT! Reqires redis-py >= 3.0 since RQ and rq-scheduler have switched to that requirement. Please upgrade as soon as possible.

18.2.2 (2018-12-20)

  • Last release to support redis-py < 3.0.0! Fixes version incompatibility with rq-scheduler. Requires rq-scheduler < 0.9.0.

18.2, 18.2.1 (2018-11-29)

18.1 (2018-09-19)

  • Requires rq >= 0.12.0 and rq-scheduler >= 0.8.3 now.
  • Fixes imcompatibility with the new rq 0.12.0 release with which the flask rq worker command would raise an error because of changes in handling of the worker_ttl parameter defaults.
  • Added support for Python 3.7. Since ‘async’ is a keyword in Python 3.7, RQ(async=True) has been changed to RQ(is_async=True). The async keyword argument will still work, but raises a DeprecationWarning.
  • Documentation fixes.

18.0 (2018-03-02)

  • The project has been moved to the official RQ GitHub organization!

    New URL: https://github.com/rq/flask-rq2

  • Stop monkey-patching the scheduler module since rq-scheduler gained the ability to use custom job classes.

    Requires rq-scheduler 0.8.2 or higher.

  • Adds depends_on, at_front, meta and description parameters to job decorator.

    Requires rq==0.10.0 or higher.

  • Minor fixes for test infrastructure.

17.2 (2017-12-05)

  • Allow dynamically setting timeout, result TTL and job TTL and other parameters when enqueuing, scheduling or adding as a cron job.

17.1 (2017-12-05)

  • Require Flask >= 0.10, but it’s recommended to use at least 0.11.
  • Require rq 0.8.0 or later and rq-scheduler 0.7.0 or later.
  • Require setting FLASK_APP environment variable to load Flask app during job performing.
  • Add RQ_SCHEDULER_CLASS, RQ_WORKER_CLASS, RQ_JOB_CLASS and RQ_QUEUE_CLASS as configuration values.
  • Add support for rq-scheduler’s --burst option to automatically quit after all work is done.
  • Drop support for Flask-Script in favor of native Flask CLI support (or via Flask-CLI app for Flask < 0.11).
  • Drop support for Python 3.4.
  • Allow setting the queue dynamically when enqueuing, scheduling or adding as a cron job.
  • Handle the result_ttl and queue_name job overrides better.
  • Actually respect the RQ_SCHEDULER_INTERVAL config value.
  • Move flask_rq2.helpers module to flask_rq2.functions.
  • Use a central Redis client and require app initialization before connecting. You’ll have to run RQ.init_app before you can queue or schedule a job from now on.

17.0 (2017-02-15)

  • Pin the rq version Flask-RQ2 depends on to >=0.6.0,<0.7.0 for now. A bigger refactor will follow shortly that fixes those problems better.
  • Allow overriding the default_timeout in case of using the factory pattern.
  • Run tests on Python 3.6.

16.1.1 (2016-09-08)

  • Fix typos in docs.

16.1 (2016-09-08)

  • Official Support for Flask >= 0.11
  • Fix import paths to stop using flask.ext prefix.

16.0.2 (2016-05-20)

  • Fix package description.

16.0.1 (2016-05-20)

  • Make wheel file universal.

16.0 (2016-05-20)

  • Initial release.