For background task processing and deferred execution in Python with Django, Webdevelopers and system admins can try out Celery.
Asynchronous mass email delivery, clickstreams like the number of hotels being watched or the number of likes, image resizing, video processing, connecting to third-party APIs, enhancing the user experience by executing tasks in the background rather than in the request-response loop, or executing a large number of queries on a database for reporting… these are some of the scenarios in which you might want to use distributed job queues in order to scale your application. Job queues also separate the tasks to execute in real-time from those to be scheduled for later. There are many use cases in which job queues help you achieve better user experiences. This article introduces readers to the use of Celery to leverage this in Python and Django applications.
It is based on distributed message-passing for asynchronous task queues/job queues. It is capable of supporting both real-time operations as well as scheduled jobs. The tasks can be run on a single worker or multiple workers concurrently, to take advantage of multi-processing. It provides a powerful and flexible interface for defining, executing, managing and monitoring tasks. A Celery system can consist of multiple workers and brokers, yielding high availability and horizontal scaling. It is suitable for applications that need to achieve any of the following:
- Execute tasks asynchronously.
- Distributed execution of expensive processes.
- Third-party API usage.
- Periodic/ scheduled tasks.
- Retrying tasks.
- Enhancing the user experience.
Task queues are used to distribute work across workers. Celery task queues are based on the Advanced Message Queue Protocol. By default, it uses RabbitMQ as its message broker; however, users are not limited to RabbitMQ but can use Redis, MongoDB or Beanstalk too.
Figure 1 depicts this process.
Step 1: The AMQP receives a task from the client, which may be a Web app or a Python program.
Step 2: The workers constantly monitor the queue; as soon as a message is dropped in the queue, one of the workers picks it up and executes it.
Step 3: Depending on the configuration, it may or may not save the result, once it has finished processing the execution.
Setting up Celery
Although the choice of message broker is entirely your decision, for this article, I assume we are using RabbitMQ (it’s what I use in production, too). Before installing Celery, you must have RabbitMQ installed and running (start it with rabbitmq-server start). Then, all you need to install Celery is pip install U celery and you’re ready to create your first program using Celery.
Make a project folder, and in it, create a file tasks.py, which will contain the tasks you want to perform using Celery. Here’s a sample program I will be using to fetch JSON and read its contents:
from celery import Celery #Configure celery. celery = Celery('tasks', broker='amqp://guest@localhost//') @celery.task #Decorator which defines the underlying function as a celery task. def fetch_data(json_name): sleep(10) url_to_open = "http://localhost/%s" % json_name req = urllib2.Request(url_to_open) opener = urllib2.build_opener() f = opener.open(req) data_fetched = simplejson.load(f) print data_fetched return data_fetched
Now run the celery daemon from the terminal using the following command:
celery worker -A tasks --loglevel=INFO:
These are the minimum arguments you need to pass to start the service. Other options like events, concurrency levels and CeleryBeat can also be passed as arguments. You will learn about them later in the article.
In another terminal, use the Python interpreter to call the tasks module file
>>> from tasks import add, fetch_data >>> result = fetch_data.delay('sample.json')
Next, track the task state/fetch the result. There are a variety of ways to achieve this, depending on your use case. For example:
- You just want to execute the task, and don’t want to save the result.
- You might want to check if the task has finished executing, or is still pending.
- Do you want to save the result in the message queue itself, or in MySQL or a back-end of your choice?
- To achieve the third, you need to configure this setting in your tasks.py file, as follows:
celery = Celery('tasks', backend='amqp', broker='amqp://guest@localhost//')
Now the message queue is configured to save the result of the job. You can configure any back-end that you wish to use here. This is how an immediate task is executed, though there might be use-cases wherein you would want to run scheduled jobs. To run a task as a scheduled task, you need to define the schedule in the decorator of the task, as follows:
@periodic_task(run_every=datetime.timedelta(minutes=1)) def print_name(): print "Welcome to Tutorial"
The entry for period can be in the form of timedelta or in the form of cron too. Now the command for running the daemon would be:
celery worker -A tasks --loglevel=INFO B # Where B means running
CeleryBeat is used for periodic tasks; if the argument B is not passed, then it will not run periodic tasks. Next, let’s look at how we can integrate Celery with Web frameworks;in this case, Django.
Integrating Celery with Django
Create a Django project using django-admin.py startproject simple_django_project, and then create an app in the project with python manage.py start app celery_demo. Next, install django-celery with pip install django-celery and then modify the settings.py file to configure the message queue, as shown below:
import djcelery djcelery.setup_loader () INSTALLED_APPS = ( ... 'Djcelery' , ) BROKER_HOST = "localhost" BROKER_PORT = 5672 BROKER_USER = "myusername" BROKER_PASSWORD = "mypassword" BROKER_VHOST = "myvhost"
Next, sync the database with python manage.py syncdb, after which create tasks.py inside the app. Now you can create a URL entry in urls.py that maps to a function in view, which will be used to call the tasks that we have defined in tasks.py. Run the Celery daemon now, with the following commands:
python manage.py celeryd -l info -c 2 # Without CeleryBeat python manage.py celeryd -l info -c 2 B # With CeleryBeat
This is a simple method to integrate Celery with Django.
Now scaling workers is not a concern all you need is to ship your tasks app to a new machine, set up Celery and just start running it. The Celery daemon will start talking to the message queue, and multiple workers will start executing tasks. Celery makes sure that your task is executed once, and not by multiple workers.
As your application grows, so will the need to make it more stable and robust. To achieve this, you need to monitor all the components of your Celery set-up.
To check the number of messages in your queues via the console, simply run rabbitmqctl-status, which will list all queues, with the number of messages in each queue. For a GUI-based output, you can simply install the RabbitMQ management plug-in.
First of all, to manage Celery, you need to switch events on start the Celery daemon with the option -E, so the command would become python manage.py celeryd -l info -c 2 B E. This starts to capture events, and now you can monitor your workers, task states, etc, using:
- Celery command-line utilities
- Django-Celery admin
- Flower: A real-time Celery Web-monitor
Celery is one of the most stable systems available. It is very easy to get started with, very simple to configure, fast at executing millions of tasks, and flexible, as almost any component of Celery can be used on its own, changed, or configured as per requirements. Some other great features of Celery are:
- Designing workflows: To chain multiple tasks, you can use canvas to divide your tasks into sub-tasks.
- Webhooks: To enjoy the power of Celery using other languages like PHP, Ruby, etc.
- Routing: Send tasks to a particular queue rather than any queue, and to implement all the routing mechanisms that the message broker supports.
There are loads of other great features of Celery, which are beyond the scope of this article. I am sure that if you have a use-case, chances are that you can do it with Celery.