Scheduled jobs with Celery, Django and Redis

Setting up a deferred task queue for your Django application can be a pain and it shouldn’t to be. Some “persons” use cron which is not only a bad solution, but this is a disaster. Personally, I use Celery. In this post, I’ll show you how to set-up a deferred task queue for your Django application using Celery.

What’s Celery ?

Celery is an asynchronous task queue/job queue based on distributed message passing. It is focused on real-time operation, but supports scheduling as well.

The promise of Celery is to allow you to run code later, or regularly according to a schedule. Unfortunately, running deferred tasks through Celery is not trivial. But it’s useful and beneficial, as it has a distributed architecture that scales as you need. Any Celery installation is composed of three core components:

  1. Celery client: which used to issue background jobs.
  2. Celery workers: these are the processes responsible to run jobs. Worker can be local or remote, so you can start with a single worker in the same web application server, and later add workers as your traffic and overload grow.
  3. Message broker: The client communicates with the the workers through a message queue, and Celery supports several ways to implement these queues. The most commonly used brokers are RabbitMQ and Redis.

Installing requirements

Fistable, let’s install Redis:

$ sudo apt-get install redis-server

Now, let’s install some python packages:

pip install celery
pip install django-celery

Configuring Django for Celery

Once the installation is completed, you’re ready to set up our scheduler. Let’s configure Celery:

INSTALLED_APPS = (
    'djcelery',
)

BROKER_URL = 'redis://127.0.0.1:6379/0'
BROKER_TRANSPORT = 'redis'
CELERYBEAT_SCHEDULER = 'djcelery.schedulers.DatabaseScheduler'

The above lines is used to configure Celery: which broker you’ll use? Which scheduler for heart beat event ?

As you added djcelery package to your INSTALLED_APPS, you need to create the celery database tables – instructions for that differ depending on your environment, If using South or Migrations (Django >= 1.7) for schema migrations:

$ python manage.py migrate

Otherwise:

$ python manage.py syncdb

Below, the celery.py file that is used for setting up the scheduler for your django project:

# celery.py file
from future import absolute_import

import os
import django

from celery import Celery
from django.conf import settings

os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'demo.settings')
django.setup()

app = Celery('Scheduler')

app.config_from_object('django.conf:settings')
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)

Write some tasks

Let’s assume that you have a task that should be executed periodically, a good example might be a twitter bot or a scraper.

import tweepy

api = tweepy.API()

def get_recent_tweets(query):
    for tweet in tweepy.Cursor(api.search, q=query,
                               rpp=100, result_type="recent",
                               include_entities=True,
                               lang="en").items():
        print tweet.created_at, tweet.text
        # Save tweet into database
        ...

Now, we need to create a Celery task for get_recent_tweets

    ## /project_name/app_name/tasks.py

    from celery.decorators import task

    from utils import twitter

    @task
    def get_recent_tweets(*args):
        # Just an example
        twitter.get_recent_tweets(*args)

N.B: Things can get a lot more complicated than this.

Scheduling it

Now, we have to schedule our tasks. For get_bigdata_tweets task, we will run it every hour, this is an interesting subject that I want to follow, For this purpose, I’ll use celery.beat scheduler. In settings.py file add this code:

from celery.schedules import crontab

CELERYBEAT_SCHEDULER = "djcelery.schedulers.DatabaseScheduler"
CELERYBEAT_SCHEDULE = {
    "get_bigdata_tweets": {
        'task': "bots.twitter.tasks.get_recent_tweets",
        # Every 1 hour
        'schedule': timedelta(seconds=6),
        'args': ("bigdata"),
    },
}
For further details, about scheduler configuration, see documentation.

Merge querysets from different django models

If you were in a situation where you need to merge two querysets from different models into one, you’ve surely see this error:

Cannot combine queries on two different base models.

The solution is to use itertools.chain which make an iterator that is the junction of the given iterators.

from itertools import chain

result_lst = list(chain(queryset1, queryset2))

Now, you can sort the resulting list by any common field, e.g. creation date

from itertools import chain
from operator import attrgetter

result_lst = sorted(
    chain(queryset1, queryset2),
    key=attrgetter('created_at'))

Django ORM: Optimize your code

In the most code that we write, we use Django queries that guarantee all CRUD operations. However, sometimes you will need to retrieve values that are derived by summarizing or aggregating a collection of objects.

Throughout this post, we’ll refer to the following models.

from django.db import models

class URL(models.Model):
    url = models.URLField()

class Tweet(models.Model):
    tweet_id = models.IntegerField()
    text = models.CharField(max_length=140)
    tweeple = models.CharField(max_length=200)
    urls = models.ManyToManyField(URL)

I want to get most popular URLs (most shared URLs between tweets)

def most_popular_url():
    urls = URL.objects.all()
    res = []
    for url in urls:
        res.append({"url": url.url, "count": url.tweet_set.count()})
    # Let's sort the result
    result = sorted(res, key=lambda x: -x["count"])

Let’s test this: We have two test cases:

  1. [TC1] Small database that contains 2000 tweets
  2. [TC2] Medium database that contains 3M tweets

The function takes 8.281s for TC1, and 175m:361s for TC2. This is not acceptable and should be improved. Fortunately, Django resolve this for us by providing a rich ORM API : look at the solution below using the annotate

def most_popular_url_optimized():
    urls = URL.objects.annotate(rank=Count("tweet")).order_by("-rank")
    res = []
    for url in urls:
        res.append({"url": url.url, "count": url.rank})

The second version of most_popular_url takes only 2.361s for TC1 and 3541.5s (59m & 0.25s) for TC2. This example show for us the interest of optimizing database query. As a general advice, try to do every treatment in models or forms instead of views

Django: can’t compare offset-naive and offset-aware datetimes error

I encountered an horrible problem today, and I wants to share the solution with you. So I decided to write this post. The problem was: can’t compare offset-naive and offset-aware datetimes. The origin of this error is a compare between an offset-aware date time, obtained from the database in my case, and an offset-naive date time, this is the default type of python datetime object. Below the line that caused this error:

from datetime import timedelta, datetime

one_hour_ago = datetime.now() - timedelta(hours=1)
if topic.created > one_hour_ago:
    # Do something

So the solution is to make both of compared datetime to either naive or aware datetime object. Fortunately, Django comes with a helper to solve this problem:

>>> from django.utils import timezone
>>> now = timezone.now()
datetime.datetime(2013, 12, 11, 9, 43, 44, 868854, tzinfo=<UTC>)

Now, “now” is an offset-aware datetime object. For non djangonauts developer, you can use pytz python module like that:

>>> from datetime import datetime
>>> import pytz
>>> utc = pytz.UTC
>>> now = utc.localize(datetime.now())
datetime.datetime(2013, 12, 11, 9, 43, 44, 868854, tzinfo=<UTC>)