Flask-RQ2

A Flask extension for RQ (Redis Queue).

This is a continuation of Flask-RQ more in spirit than in code. Many thanks to Matt Wright for the inspiration and providing the shoulders to stand on.

Installation

pip install Flask-RQ2

Getting started

To quickly start using Flask-RQ2, simply create an RQ instance:

from flask import Flask
from flask_rq2 import RQ

app = Flask(__name__)
rq = RQ(app)

Alternatively, if you’re using the application factory pattern:

from flask_rq2 import RQ
rq = RQ()

and then later call init_app where you create your application object:

from flask import Flask

def create_app():
    app = Flask(__name__)

    from yourapplication.jobs import rq
    rq.init_app(app)

    # more here..
    return app

Decorators

@job

A decorator to mark a function as an RQ job and to add some helpers to the function to simplify enqueuing:

from flask_rq2 import RQ

rq = RQ()

@rq.job
def add(x, y):
    return x + y

Then in your app code:

job = add.queue(1, 2)

A specific queue name can also be passed as argument:

@rq.job('low', timeout=60)
def add(x, y):
    return x + y

Or if you decide to use a different queue and timeout dynamically during runtime:

job2 = add.queue(3, 4, queue='high', timeout=60 * 2)

Changed in version 17.2: The queue job function now takes a few more parameters. See the full API docs for more information.

Some other parameters are available as well:

@rq.job('low', timeout=180, result_ttl=60 * 60, ttl=60 * 60 * 24)
def add(x, y):
    return x + y

You can additionally schedule jobs to run at a certain time, after a certain timespan or by a cron-like plan:

@rq.job
def add(x, y):
    return x + y

# queue job in 60 seconds
add.schedule(timedelta(seconds=60), 1, 2)

# queue job at a certain datetime (UTC!)
add.schedule(datetime(2016, 12, 31, 23, 59, 59), 1, 2)

# queue job in 14 days and then repeat once 14 days later
add.schedule(timedelta(days=14), 1, 2, repeat=1)

# queue job in 12 hours with a different queue
add.schedule(timedelta(hours=12), 1, 2, queue='high', timeout=60 * 2)

# queue job every day at noon (UTC!)
add.cron('0 0 12 * * ?', 'add-one-two', 1, 2)

# queue job every minute with a different queue
add.cron('* * * * *', 'add-one-two', 1, 2, queue='high', timeout=55)

Changed in version 17.2: The schedule and cron functions now take a few more parameters. See the full API docs for more information.

See the full API docs for more information about the job functions.

@exception_handler

An optional decorator for custom exception handlers that the RQ worker should call when catching exceptions during job execution.

from flask_rq2 import RQ

rq = RQ()

@rq.exception_handler
def send_alert_to_ops(job, *exc_info):
    # call other code to send alert to OPs team

The exception handler will automatically be used when running the worker from the get_worker method or the CLI integration.

RQ backends

There are a few useful methods to fetch RQ backend objects for advanced patterns.

They will use the same Flask config values as the decorators and CLI integration and should be used instead of rq’s own functions with the same name.

get_queue

Returns default queue or specific queue for name given as argument:

from flask_rq2 import RQ

rq = RQ()

default_queue = rq.get_queue()
low_queue = rq.get_queue('low')

easy_job = default_queue.enqueue(add, args=(1, 2))
hard_job = low_queue.enqueue(add, args=(1e100, 2e200))

get_worker

Returns a worker for default queue or specific queues for names given as arguments:

from flask_rq2 import RQ

rq = RQ()

# Creates a worker that handle jobs in ``default`` queue.
default_worker = rq.get_worker()
default_worker.work(burst=True)

# Creates a worker that handle jobs in both ``simple`` and ``low`` queues.
low_n_simple_worker = rq.get_worker('low', 'simple')
low_n_simple_worker.work(burst=True)

get_scheduler

Returns an RQ Scheduler instance for periodically enqueuing jobs:

from flask_rq2 import RQ

rq = RQ()

# check every 10 seconds if there are any jobs to enqueue
scheduler = rq.get_scheduler(interval=10)
scheduler.run()

Changed in version 17.2: The get_scheduler method now takes an optional queue parameter to override the default scheduler queue.

CLI support

Flask-RQ2 only supports the Click based CLI feature in Flask >= 0.11. If you’re using Flask < 0.10 you’ll need to install a backport package called Flask-CLI as well, e.g.:

pip install Flask-CLI

Or install it in one go:

pip install Flask-RQ2[cli]

Please call flask rq --help for more information, assuming you’ve set the FLASK_APP environment variable to the Flask app path.

Commands

There isn’t an official overview of CLI commands in the RQ documentation, but these are the commands that Flask-RQ2 support.

Please call each command with the --help option to learn more about their required and optional paramaters.

Unit Testing

To use Flask-RQ2 in unit tests, set the variable RQ_CONNECTION_CLASS = 'fakeredis.FakeStrictRedis'. The fakeredis module is required and can be installed with pip install fakeredis. Excerpts of how this can be integrated into a Flask app are shown below.

# config.py

class Config:
    APP_NAME = os.environ.get('APP_NAME')
    RQ_CONNECTION_CLASS = 'fakeredis.FakeStrictRedis'
# app/__init__.py

rq = RQ()

def create_app(config_name):
    app = Flask(__name__)
    app.config.from_object(config[config_name])
    rq.init_app(app)

Configuration

RQ_REDIS_URL

The URL to pass to the from_url() method of the redis-py’s connection class as defind in RQ_CONNECTION_CLASS. Defaults to connecting to the local Redis instance, database 0.

app.config['RQ_REDIS_URL'] = 'redis://localhost:6379/0'

RQ_CONNECTION_CLASS

The dotted import path to the redis-py client class to connect to the Redis server using the RQ_REDIS_URL configuration value.

app.config['RQ_CONNECTION_CLASS'] = 'myproject.backends.MyStrictRedis'

Defaults to 'redis.StrictRedis'.

RQ_QUEUES

The default queues that the worker and CLI commands (empty, info and worker) act on by default.

app.config['RQ_QUEUES'] = ['default']

RQ_ASYNC

Whether or not to run jobs asynchronously. Defaults to True meaning that jobs only run when they are processed by the workers.

app.config['RQ_ASYNC'] = True

Set to False to run jobs immediatedly upon enqueuing in-process. This may be useful for testing purposes or other constrained environments. This is the main switch, use with discretion.

RQ_QUEUE_CLASS

New in version 17.1.

The dotted import path of the queue class to enqueue jobs with.

app.config['RQ_QUEUE_CLASS'] = 'myproject.queue.MyQueue'

Defaults to 'rq.queue.Queue'.

RQ_WORKER_CLASS

New in version 17.1.

The dotted import path of the worker class to process jobs with.

app.config['RQ_WORKER_CLASS'] = 'myproject.worker.MyWorker'

Defaults to 'rq.worker.Worker'.

RQ_JOB_CLASS

New in version 17.1.

The dotted import path of the job class for RQ jobs.

Note

This must be a subclass of flask_rq2.job.FlaskJob or otherwise Flask-RQ2 won’t work.

app.config['RQ_JOB_CLASS'] = 'myproject.job.MyFlaskJob'

Defaults to 'flask_rq2.job.FlaskJob'.

RQ_SCHEDULER_CLASS

New in version 17.1.

The dotted import path of the scheduler class to enqueue scheduled jobs with.

app.config['RQ_SCHEDULER_CLASS'] = 'myproject.scheduler.MyScheduler'

Defaults to 'rq_scheduler.Scheduler'.

RQ_SCHEDULER_QUEUE

The name of the queue to enqueue scheduled jobs in.

app.config['RQ_SCHEDULER_QUEUE'] = 'scheduled'

Defaults to 'default'.

RQ_SCHEDULER_INTERVAL

The default interval the RQ Scheduler checks for jobs to enqueue, in seconds.

app.config['RQ_SCHEDULER_INTERVAL'] = 1

Defaults to 60.

Changelog

18.3 (2018-12-20)

  • IMPORTANT! Reqires redis-py >= 3.0 since RQ and rq-scheduler have switched to that requirement. Please upgrade as soon as possible.

18.2.2 (2018-12-20)

  • Last release to support redis-py < 3.0.0! Fixes version incompatibility with rq-scheduler. Requires rq-scheduler < 0.9.0.

18.2, 18.2.1 (2018-11-29)

18.1 (2018-09-19)

  • Requires rq >= 0.12.0 and rq-scheduler >= 0.8.3 now.
  • Fixes imcompatibility with the new rq 0.12.0 release with which the flask rq worker command would raise an error because of changes in handling of the worker_ttl parameter defaults.
  • Added support for Python 3.7. Since ‘async’ is a keyword in Python 3.7, RQ(async=True) has been changed to RQ(is_async=True). The async keyword argument will still work, but raises a DeprecationWarning.
  • Documentation fixes.

18.0 (2018-03-02)

  • The project has been moved to the official RQ GitHub organization!

    New URL: https://github.com/rq/flask-rq2

  • Stop monkey-patching the scheduler module since rq-scheduler gained the ability to use custom job classes.

    Requires rq-scheduler 0.8.2 or higher.

  • Adds depends_on, at_front, meta and description parameters to job decorator.

    Requires rq==0.10.0 or higher.

  • Minor fixes for test infrastructure.

17.2 (2017-12-05)

  • Allow dynamically setting timeout, result TTL and job TTL and other parameters when enqueuing, scheduling or adding as a cron job.

17.1 (2017-12-05)

  • Require Flask >= 0.10, but it’s recommended to use at least 0.11.
  • Require rq 0.8.0 or later and rq-scheduler 0.7.0 or later.
  • Require setting FLASK_APP environment variable to load Flask app during job performing.
  • Add RQ_SCHEDULER_CLASS, RQ_WORKER_CLASS, RQ_JOB_CLASS and RQ_QUEUE_CLASS as configuration values.
  • Add support for rq-scheduler’s --burst option to automatically quit after all work is done.
  • Drop support for Flask-Script in favor of native Flask CLI support (or via Flask-CLI app for Flask < 0.11).
  • Drop support for Python 3.4.
  • Allow setting the queue dynamically when enqueuing, scheduling or adding as a cron job.
  • Handle the result_ttl and queue_name job overrides better.
  • Actually respect the RQ_SCHEDULER_INTERVAL config value.
  • Move flask_rq2.helpers module to flask_rq2.functions.
  • Use a central Redis client and require app initialization before connecting. You’ll have to run RQ.init_app before you can queue or schedule a job from now on.

17.0 (2017-02-15)

  • Pin the rq version Flask-RQ2 depends on to >=0.6.0,<0.7.0 for now. A bigger refactor will follow shortly that fixes those problems better.
  • Allow overriding the default_timeout in case of using the factory pattern.
  • Run tests on Python 3.6.

16.1.1 (2016-09-08)

  • Fix typos in docs.

16.1 (2016-09-08)

  • Official Support for Flask >= 0.11
  • Fix import paths to stop using flask.ext prefix.

16.0.2 (2016-05-20)

  • Fix package description.

16.0.1 (2016-05-20)

  • Make wheel file universal.

16.0 (2016-05-20)

  • Initial release.

Other content

API

flask_rq2.app

The core interface of Flask-RQ2.

class flask_rq2.app.RQ(app=None, default_timeout=None, is_async=None, **kwargs)

The main RQ object to be used in user apps.

__init__(app=None, default_timeout=None, is_async=None, **kwargs)

Initialize the RQ interface.

Parameters:
  • app (flask.Flask) – Flask application
  • default_timeout (int) – The default timeout in seconds to use for jobs, defaults to RQ’s default of 180 seconds per job
  • is_async (bool) – Whether or not to run jobs asynchronously or in-process, defaults to True
default_queue = 'default'

Name of the default queue.

default_result_ttl = 500

The fallback default result TTL.

New in version 17.1.

redis_url = 'redis://localhost:6379/0'

The DSN (URL) of the Redis connection.

Changed in version 17.1: Renamed from url to redis_url.

connection_class = 'redis.StrictRedis'

The Redis client class to use.

New in version 17.1.

queues = ['default']

List of queue names for RQ to work on.

queue_class = 'rq.queue.Queue'

Dotted import path to RQ Queue class to use as base class.

Changed in version 17.1: Renamed from queue_path to queue_class.

worker_class = 'rq.worker.Worker'

Dotted import path to RQ Workers class to use as base class.

Changed in version 17.1: Renamed from worker_path to worker_class.

job_class = 'flask_rq2.job.FlaskJob'

Dotted import path to RQ Job class to use as base class.

Changed in version 17.1: Renamed from job_path to job_class.

scheduler_class = 'flask_rq2.scheduler.FlaskScheduler'

Dotted import path to RQ Scheduler class.

Changed in version 17.1: Renamed from scheduler_path to scheduler_class.

Changed in version 18.0: Changed to use own scheduler class.

scheduler_queue = 'default'

Name of RQ queue to schedule jobs in by rq-scheduler.

scheduler_interval = 60

Time in seconds the scheduler checks for scheduled jobs periodicically.

functions_class = 'flask_rq2.functions.JobFunctions'

The default job functions class.

Changed in version 17.1: Renamed from functions_path to functions_class. Moved from flask_rq2.helpers.JobFunctions to

default_timeout = 180

The fallback default timeout value.

init_app(app)

Initialize the app, e.g. can be used if factory pattern is used.

init_cli(app)

Initialize the Flask CLI support in case it was enabled for the app.

Works with both Flask>=1.0’s CLI support as well as the backport in the Flask-CLI package for Flask<1.0.

exception_handler(callback)

Decorator to add an exception handler to the worker, e.g.:

rq = RQ()

@rq.exception_handler
def my_custom_handler(job, *exc_info):
    # do custom things here
    ...
job(func_or_queue=None, timeout=None, result_ttl=None, ttl=None, depends_on=None, at_front=None, meta=None, description=None)

Decorator to mark functions for queuing via RQ, e.g.:

rq = RQ()

@rq.job
def add(x, y):
    return x + y

or:

@rq.job(timeout=60, result_ttl=60 * 60)
def add(x, y):
    return x + y

Adds various functions to the job as documented in JobFunctions.

Changed in version 18.0: Adds depends_on, at_front, meta and description parameters.

Parameters:
  • queue (str) – Name of the queue to add job to, defaults to flask_rq2.app.RQ.default_queue.
  • timeout (int) – The maximum runtime in seconds of the job before it’s considered ‘lost’, defaults to 180.
  • result_ttl (int) – Time to persist the job results in Redis, in seconds.
  • ttl (int) – The maximum queued time of the job before it’ll be cancelled.
  • depends_on (FlaskJob or str) – A job instance or id that the new job depends on.
  • at_front (bool) – Whether or not the job is queued in front of all other enqueued jobs.
  • meta (dict) – Additional meta data about the job.
  • description (str) – Description of the job.
get_scheduler(interval=None, queue=None)

When installed returns a rq_scheduler.Scheduler instance to schedule job execution, e.g.:

scheduler = rq.get_scheduler(interval=10)
Parameters:
  • interval (int) – Time in seconds of the periodic check for scheduled jobs.
  • queue (str) – Name of the queue to enqueue in, defaults to scheduler_queue.
get_queue(name=None)

Returns an RQ queue instance with the given name, e.g.:

default_queue = rq.get_queue()
low_queue = rq.get_queue('low')
Parameters:name (str) – Name of the queue to return, defaults to default_queue.
Returns:An RQ queue instance.
Return type:rq.queue.Queue
get_worker(*queues)

Returns an RQ worker instance for the given queue names, e.g.:

configured_worker = rq.get_worker()
default_worker = rq.get_worker('default')
default_low_worker = rq.get_worker('default', 'low')
Parameters:*queues – Names of queues the worker should act on, falls back to the configured queues.

flask_rq2.cli

Support for the Click based Flask CLI via Flask-CLI.

flask_rq2.cli.empty(*args, **kwargs)

Empty given queues.

flask_rq2.cli.info(*args, **kwargs)

RQ command-line monitor.

flask_rq2.cli.requeue(*args, **kwargs)

Requeue failed jobs.

flask_rq2.cli.resume(*args, **kwargs)

Resumes all workers.

flask_rq2.cli.scheduler(*args, **kwargs)

Periodically checks for scheduled jobs.

flask_rq2.cli.shared_options(rq)

Default class options to pass to the CLI commands.

flask_rq2.cli.suspend(*args, **kwargs)

Suspends all workers.

flask_rq2.cli.worker(*args, **kwargs)

Starts an RQ worker.

flask_rq2.functions

class flask_rq2.functions.JobFunctions(rq, wrapped, queue_name, timeout, result_ttl, ttl, depends_on, at_front, meta, description)

Some helper functions that are added to a function decorated with a job() decorator.

functions = ['queue', 'schedule', 'cron']

the methods to add to jobs automatically

queue(*args, **kwargs)

A function to queue a RQ job, e.g.:

@rq.job(timeout=60)
def add(x, y):
    return x + y

add.queue(1, 2, timeout=30)
Parameters:
  • *args – The positional arguments to pass to the queued job.
  • **kwargs – The keyword arguments to pass to the queued job.
  • queue (str) – Name of the queue to queue in, defaults to queue of of job or default_queue.
  • timeout (int) – The job timeout in seconds. If not provided uses the job’s timeout or default_timeout.
  • description (str) – Description of the job.
  • result_ttl (int) – The result TTL in seconds. If not provided uses the job’s result TTL or default_result_ttl.
  • ttl (int) – The job TTL in seconds. If not provided uses the job’s TTL or no TTL at all.
  • depends_on (FlaskJob or str) – A job instance or id that the new job depends on.
  • job_id (str) – A custom ID for the new job. Defaults to an UUID.
  • at_front (bool) – Whether or not the job is queued in front of all other enqueued jobs.
  • meta (dict) – Additional meta data about the job.
Returns:

An RQ job instance.

Return type:

FlaskJob

schedule(time_or_delta, *args, **kwargs)

A function to schedule running a RQ job at a given time or after a given timespan:

@rq.job
def add(x, y):
    return x + y

add.schedule(timedelta(hours=2), 1, 2, timeout=10)
add.schedule(datetime(2016, 12, 31, 23, 59, 59), 1, 2)
add.schedule(timedelta(days=14), 1, 2, repeat=1)
Parameters:
  • *args – The positional arguments to pass to the queued job.
  • **kwargs – The keyword arguments to pass to the queued job.
  • queue (str) – Name of the queue to queue in, defaults to queue of of job or default_queue.
  • timeout (int) – The job timeout in seconds. If not provided uses the job’s timeout or default_timeout.
  • description (str) – Description of the job.
  • result_ttl (int) – The result TTL in seconds. If not provided uses the job’s result TTL or default_result_ttl.
  • ttl (int) – The job TTL in seconds. If not provided uses the job’s TTL or no TTL at all.
  • repeat (int) – The number of times the job needs to be repeatedly queued. Requires setting the interval parameter.
  • interval (int) – The interval of repetition as defined by the repeat parameter in seconds.
  • job_id (str) – A custom ID for the new job. Defaults to a UUID.
Returns:

An RQ job instance.

Return type:

FlaskJob

cron(pattern, name, *args, **kwargs)

A function to setup a RQ job as a cronjob:

@rq.job('low', timeout=60)
def add(x, y):
    return x + y

add.cron('* * * * *', 'add-some-numbers', 1, 2, timeout=10)
Parameters:
  • *args – The positional arguments to pass to the queued job.
  • **kwargs – The keyword arguments to pass to the queued job.
  • pattern (str) – A Crontab pattern.
  • name (str) – The name of the cronjob.
  • queue (str) – Name of the queue to queue in, defaults to queue of of job or default_queue.
  • timeout (int) – The job timeout in seconds. If not provided uses the job’s timeout or default_timeout.
  • description (str) – Description of the job.
  • repeat (int) – The number of times the job needs to be repeatedly queued via the cronjob. Take care only using this for cronjob that don’t already repeat themselves natively due to their crontab.
Returns:

An RQ job instance.

Return type:

FlaskJob

flask_rq2.job

The Flask application aware RQ job class.

class flask_rq2.job.FlaskJob(*args, **kwargs)

The RQ Job class that is capable to running with a Flask app context. This requires setting the FLASK_APP environment variable.