Scheduling emails with celery in Django
After a long journey with Django, you come to a place where you feel the need to get some tasks done asynchronously without any supervision of human. Some tasks need to be scheduled to run once at a particular time or after some time and some tasks have to be run periodically like crontab. One of the tasks is sending emails on specific triggers.
Here at HackerEarth , one of the major chunk of emails is sent to recruiters and participants after a contest is finished or when participant triggers finish-test button. Till now we had done this using crontab. But things have changed now and scaling with such process is time and resource consuming. Also, looking in to database if there is any task that has to be done with crontab process is not a good method, atleast for those tasks those have to run only once in the lifetime.
comes to the rescue here.
Celery gets tasks done asynchronously and also supports scheduling of tasks as well.
Integrating Celery with Django codebase is easy enough, you just need to have some
patience and go through the steps given in the official Celery site.
There are two sides in Celery technology: Broker & Worker. Celery requires a
solution to send and receive messages, usually this comes in the form of a
separate service called a message broker. We use the default broker
RabbitMQ to get this done.
Worker fetches the tasks from the queue at time at which they were scheduled
to run asynchronously.
You will have to download celery init scripts to run the worker as daemon on
Production. You can get those init scripts from
This is the configuration we used to run celery in our project:
# Name of nodes to start CELERYD_NODES="w1 w2 w3" # Where to chdir at start. CELERYD_CHDIR="/hackerearth/" # How to call "manage.py celeryd_multi" CELERYD_MULTI="$CELERYD_CHDIR/manage.py celeryd_multi" # How to call "manage.py celeryctl" CELERYCTL="$CELERYD_CHDIR/manage.py celeryctl --settings=settings.hackerearth_settings" # Extra arguments to celeryd CELERYD_OPTS="--time-limit=300 --concurrency=8" # %n will be replaced with the nodename. CELERYD_LOG_FILE="/var/log/celery/%n.log" CELERYD_PID_FILE="/var/run/celery/%n.pid" # Workers should run as an unprivileged user. CELERYD_USER="hackerearth" CELERYD_GROUP="hackerearth" # Name of the projects settings module. export DJANGO_SETTINGS_MODULE="settings.hackerearth_settings"
After linking triggers to send emails after the contest time is finished or the participant has finished the test prematurely, all things were working properly. Now I could easily schedule a task to run asynchronously at any time. But I met a problem that there is no method to check if a particular task has already been scheduled that is assosiated with some Model instance. This happens when there are more than one triggers for the same task, and it can easily happen in a fairly complicated system. To get this done I had to store the task_id with that model instance into database using generic ContentType. So here is the hack that I came up with:
This model stores the information of the scheduled task(task_id, name) and the information of the Model instance to which the task is assossiated.
from django.contrib.contenttypes import generic from django.contrib.contenttypes.models import ContentType from django.db import models class ModelTask(models.Model): """ For storing all scheduled tasks """ task_id = models.CharField(max_length = 36) name = models.CharField(max_length = 200) content_type = models.ForeignKey(ContentType) object_id = models.PositiveIntegerField() content_object = generic.GenericForeignKey('content_type', 'object_id') def __unicode__(self): return "%s - %s" % (self.name, self.content_object) @staticmethod def create(async_result, instance): return ModelTask.objects.create(task_id=async_result.task_id, name=async_result.task_name, content_object=instance) @staticmethod def filter(task, instance): content_type = ContentType.objects.get_for_model(instance) object_id = instance.id return ModelTask.objects.filter(content_type=content_type, object_id=object_id, name=task.name)
A custom overridden task decorator 'model_task'
Overrides the methods : 'applyasync' & 'AsyncResult' And attaches a new method : 'existsfor'
import types from django.db import models from celery import task from appname.models import ModelTask def model_task(*args, **kwargs): def dec(func): task_dec = task(*args, **kwargs) task_instance = task_dec(func) def exists_for(self, instance): return ModelTask.filter(self,instance).exists() task_instance.exists_for = types.MethodType(exists_for, task_instance) def apply_async(self, *args, **kwargs): instance = kwargs.pop('instance',None) async_result = super(type(self), self).apply_async(*args, **kwargs) if instance and not self.exists_for(instance): ModelTask.create(async_result, instance) return async_result task_instance.apply_async = types.MethodType(apply_async, task_instance) def AsyncResult(self, *args, **kwargs): if args and isinstance(args, models.Model) and\ self.exists_for(args): task_id = ModelTask.filter(self, args).task_id return super(type(self), self).AsyncResult(task_id) else: return super(type(self), self).AsyncResult(*args, **kwargs) task_instance.AsyncResult = types.MethodType(AsyncResult, task_instance) return task_instance return dec
The Use Case
This model contains the information of a User participating in a Event.
class Participation(models.Model): user = models.ForeignKey(User) event = models.ForiegnKey(Event) ... ...
Task for sending email to participant
code for sending email
Scheduling the task
duration = calculate_duration_in_seconds(participation) # The extra keyword argument 'instance' is necessary as it will create a # ModelTask object. send_email_on_participation_complete.apply_async((participation,), countdown=duration, instance=participation)
Check if the task has already been scheduled assossiated with a participation object
is_scheduled_before = send_email_on_participation_complete.exists_for(participation)
Get the AsyncResult object
# Returns the async_result object of the scheduled task that is assossiated # with given Model instance (participation in our case) async_result = send_email_on_participation_complete.AsyncResult(participation) # gives the status of the scheduled task : PENDING/STARTED/SUCCESS/FAILURE aync_result.status # Contains the return value of the task (None in our case) async_result.result
All this replaced the cron jobs, custom scripts and some manual tasks with a robust task (email) scheduling mechanism. This also lays the foundation for triggering many other types of tasks on top of django-celery architecture set up by me. And this will certainly make us more efficient and help us to focus on other core products, while tasks are performed asynchronously and we can enjoy the awesome weather on a fine day! :)
P.S. I am an undergraduate student at IIT Roorkee.You can reach out to me at firstname.lastname@example.org for any suggestion, bug or improvement. You can also find me @ShubhamJain.
Posted by Shubham Jain, Summer Intern 2013 @HackerEarth