So why would you need a task queue? Let’s say you are building a Web application that is going to convert a fair amount of files (converting videos to Adobe Flash for instance). The other requirement is that a user must be notified when the video was converted and management tasks need to happen automatically: after a video was uploaded, it needs to be picked up, processed, moved from queue to repository and then published. Apart from displaying a view with the upload form, everything must happen in the background, so the Web application is still responsive enough.
One way of doing that is to use some kind of a storage (e.g. filesystem, database) to keep the record of PIDs of the processes spawned and write a piece of code that checks if a process terminated, still working or just living the happy zombie life. Additionally, you need cron or any other scheduler that will trigger these tasks in a given time sequence. The new, cool and very true term for that is “Ghetto Queue“.
The solution above is fine as long as your application is not going to be under heavy load, you don’t need concurrent access, there are no plans for scaling in the nearest future and you want to keep things dead simple. Otherwise you need more sophisticated technology.
That’s where Celery and AMQP comes with help. RabbitMQ is an open source AMQP (Advanced Message Queuing Protocol) implementation. Celery, originally built for use with Django, is now a general, distributed task queue system based on AMPQ server, multiprocessing Python module and a persistent or non-persistent storage backend. Combining these three solves the problem of building highly-available task queues.
This article is composed of following sections:
- Installation – On installing the basic components you need to start with Celery and Django.
- Working setup – Configuration files and step by step example of queueing a task.
- Task queue life cycle – A short analysis of what is going on behind the scenes.
Installation
Multiprocessing
Multiprocessing module was introduced in Python 2.6, however there is backport for Python 2.4/2.5 available. We assume you are using Python 2.5 and have subversion, patch, as well as header files for building Python 2.5 modules installed (comes for free with Mac OS X and MacPorts, on Ubuntu you may need to install python2.5-dev package) and pip, a replacement for easy_install.
First, let’s make sure multiprocessing is not already installed:
$ python2.5 -c "import multiprocessing"
If ImportError exception is raised, it means we need to install multiprocessing module. Download the source code:
$ svn co http://python-multiprocessing.googlecode.com/svn/trunk/ python-multiprocessing $ cd python-multiprocessing
Now we need to apply patches, otherwise we will run into KeyError: processName exception. Download them into python-multiprocessing directory.
$ patch -p1 < ./0002-Fix-logging-of-processName.patch patching file Lib/multiprocessing/patch.py $ patch -p1 < ./0001-Test-logging-of-processName.patch patching file Lib/multiprocessing/tests.py
The next step would be to install the module. You may need do adjust --prefix option (or just leave it default) depending on your configuration. In this case this was Mac OS X 10.5 with Python 2.5 installed using MacPorts instead of the default interpreter.
$ sudo python2.5 setup.py install --prefix=/opt/local/ [...] Installed /opt/local/lib/python2.5/site-packages/multiprocessing-2.6.2.1-py2.5-macosx-10.5-i386.egg Processing dependencies for multiprocessing==2.6.2.1 Finished processing dependencies for multiprocessing==2.6.2.1
We should run tests to make sure that everything works fine by typing:
$ python2.5 -m multiprocessing.tests [...] test_flushing (__main__.TestStdinBadfiledescriptor) ... ok test_pool_in_process (__main__.TestStdinBadfiledescriptor) ... ok test_queue_in_process (__main__.TestStdinBadfiledescriptor) ... ok ---------------------------------------------------------------------- Ran 133 tests in 16.951s OK
More information on installing multiprocessing module.
Celery, RabbitMQ and a basic Django application
Again, depending on your needs, whether you are using virtualenv (highly recommended), symlinking packages you need per project, or just using them globally, you may want to install celery module to different places. Here we are going to install everything in the base site-packages directory.
$ sudo pip install simplejson [...] Successfully installed simplejson $ sudo pip install celery [...] Successfully installed celery
The rest of the installation process is very well described in the original Celery docs, so there is no point in repeating it.
Working setup
Let’s summarize components and configuration files we need to start developing with RabbitMQ, Celery and Django.
- We need a Worker Server listening:
$ python2.5 ./manage.py celeryd [...] Celery has started. [INFO/PoolWorker-1] child process calling self.run()
- Task to be queued:
$ cat ./videos/tasks.py from celery.task import Task from celery.registry import tasks class ProcessVideoTask(Task): def run(self, object_id, **kwargs): logger = self.get_logger(**kwargs) logger.info("Processed video for %s." % object_id) return True tasks.register(ProcessVideoTask) - Celery and videos listed in settings.py as well as AMQP settings for your rabbitmq instance:
INSTALLED_APPS = ( [...] 'celery', 'videos', ) AMQP_SERVER = "192.168.1.2" AMQP_PORT = 5672 AMQP_USER = "videos_user" AMQP_PASSWORD = "videos_password" AMQP_VHOST = "videos_vhost" - A RabbitMQ server running on machine 192.168.1.2 in this case.
After executing following Python code:
from videos.tasks import ProcessVideoTask result = ProcessVideoTask.delay(object_id=2)
You should be able to see in the terminal window with celeryd running that a new task was received. It means that everything works fine.
[INFO/MainProcess] Got task from broker: videos.tasks.ProcessVideoTask[3eb62cf3-a04a-4726-8787-b7fa9e0223b9] [INFO/PoolWorker-1] Processed video for 2. [INFO/MainProcess] Task videos.tasks.ProcessVideoTask[3eb62cf3-a04a-4726-8787-b7fa9e0223b9] processed: True
Task queue life cycle
So now we have a very basic Django application, a database, RabbitMQ and celeryd (Worker Server) daemons listening somewhere. The question is: what is really going on? Let’s have a look at the following diagram:

- Web browser requests a resource with video upload form (e.g. “http://example.com/videos/my-new-video/upload/“).
- Application server responds with a view (e.g. “Please upload your video”).
- After the form is submitted, view triggers new task by importing it and calling its delay() method.
- AMQP transport layer publishes new message to the server. A new task is added.
- celeryd connects to the rabbitmq and waits for new messages.
- If a new message received, process the task.
- Save the result or exception to backend storage (Django’s default or the one chosen be developer).
The most interesting part is the one when Worker Server (celeryd) grabs new stuff from the message queue. What happens is that celeryd connects to the rabbitmq, says hello using AMQP protocol header, decides on security mechanism etc. and keeps reading the socket until it is filled with data. Nothing more happens here.
When a new task is called, the message is pushed to the server. After initial handshaking, the client (Web application) informs the server about the method (Publish in this case) and routing key it is going to use, and finally sends some data including Content-Header and Content-Body. If you have a look inside these packets, the header declares Content-Type: application/x-python-serialize and the body contains your celery.task.Task pickled! Extremely cool.

After rabbitmq gets the data, it starts looking for someone to pass it on. It already has a worker server connected (celeryd) waiting for new stuff, so it delivers the message, while the worker in turn acknowledges it has received it.
And that’s the whole cycle!
[...] This post was mentioned on Twitter by Ask Solem Hoel and Ask Solem Hoel. Ask Solem Hoel said: RT @rpogorzelski: New blog post: RabbitMQ, Celery and Django http://bit.ly/1qTTFE [...]
Hi Robert,
Thanks for the post. It is very useful. I am looking into setting up a task manager and I think, this will be very useful for me
Regards,
Krish
Doesn’t seem to work as described. I continue to get error messages such as:
[ERROR/MainProcess] Unknown task ignored: videos.tasks.ProcessVideoTask
[ERROR/MainProcess] Unknown task ignored: None
Also I noticed on the other celery tutorial they suggest adding a line to your tasks.py file:
>>> tasks.register(ProcessVideoTask)
However adding this didn’t seem to solve anything for me.
Oops! Looks like the problem was that in INSTALLED_APPS I was using package.appname.tasks instead of just appname.tasks.
I think you still might need the tasks.register bit though. Good times!
You’re absolutely right, missed that somehow – updated. Thanks for pointing this out!
[...] ちょっと駆け足でしたがPython Hack-a-thon #2で熱いネタだったのでとりあえず公開。突っ込み歓迎。 – もしmultiprocessing周りでエラーが出たら、以下のURLのパッチを適用する。 http://robertpogorzelski.com/blog/2009/09/10/rabbitmq-celery-and-django/ [...]
Nice writeup! One question though: how can the celery task report its status back to the web app? I’m thinking of some ajaxy notification like “xy% of your video converted”.
I believe that depends much on the way you want to retrieve data. If you need an API for accessing results remotely, and you’re using Django, I’d go for Piston. That could help you building a nice and flexible RESTful interface. On the other hand, if you expect your data to get into the browser in real-time, you may want to have a look at Comet-like frameworks, e.g. Orbited (currently in zombie-state, as core developers concentrate on Js.io) or APE (very cool, but I don’t “feel” it personally). Anyway, there’s a lot of cool stuff around, especially when dealing with STOMP/AMQP brokers. Please let me know about your further findings, I’m very much interested in this stuff recently!
Franz, as alternative to Roberts suggestion, one way to get the results to the front end is to write some status to the database after the task worker finishes. The web page can do a meta refresh every 5 seconds, checking the db if the task has completed successfully or not.
Franz, Robert,
You can also use celery’s own views, if using in tandem with Django: http://ask.github.com/celery/reference/celery.views.html
Good point. Better go with what Celery already offers.
Hi,
I’m probably missing something obvious but I’m wondering if there’s a
way to automatically reload tasks when the source changes? I’m using
celery in a django app, starting it with a supervisord script with the
command:
/path/manage.py celeryd –loglevel=INFO
I have to reload or restart it every time I change the source code. Is
there some way I can make it auto reload tasks when in development?
Thanks very much for any advice,
Mike
Just in case my question of of interest to others here:
http://groups.google.com/group/celery-users/browse_thread/thread/ec91e07011bc2733